In [1]:

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (classification_report, confusion_matrix,roc_auc_score, roc_curve, precision_recall_curve)
from sklearn.utils import class_weight
from imblearn.over_sampling import SMOTE
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models, callbacks
from tensorflow.keras.utils import to_categorical

In [2]:
# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

In [3]:
class KeplerPreprocessor:
    """
    Preprocessing pipeline for Kepler light curve data
    """
    
    def __init__(self, sequence_length=3197):
        self.sequence_length = sequence_length
        self.scaler = StandardScaler()
    
    def handle_missing_values(self, X):
        """
        Handle missing values through interpolation
        """
        X_processed = []
        for curve in X:
            # Linear interpolation for NaN values
            mask = np.isnan(curve)
            if mask.any():
                indices = np.arange(len(curve))
                curve[mask] = np.interp(indices[mask], indices[~mask], curve[~mask])
            X_processed.append(curve)
        return np.array(X_processed)
    
    def remove_outliers(self, X, threshold=5):
        """
        Clip extreme outliers using sigma clipping
        """
        X_processed = []
        for curve in X:
            median = np.median(curve)
            std = np.std(curve)
            clipped = np.clip(curve, median - threshold*std, median + threshold*std)
            X_processed.append(clipped)
        return np.array(X_processed)
    
    def normalize_curves(self, X, fit=True):
        """
        Normalize each light curve to zero mean and unit variance
        """
        X_normalized = []
        for curve in X:
            curve_reshaped = curve.reshape(-1, 1)
            if fit:
                normalized = self.scaler.fit_transform(curve_reshaped).flatten()
            else:
                normalized = self.scaler.transform(curve_reshaped).flatten()
            X_normalized.append(normalized)
        return np.array(X_normalized)
    
    def pad_or_truncate(self, X):
        """
        Ensure all sequences have the same length
        """
        X_processed = []
        for curve in X:
            if len(curve) > self.sequence_length:
                # Truncate
                curve = curve[:self.sequence_length]
            elif len(curve) < self.sequence_length:
                # Pad with zeros
                padding = np.zeros(self.sequence_length - len(curve))
                curve = np.concatenate([curve, padding])
            X_processed.append(curve)
        return np.array(X_processed)
    
    def preprocess(self, X, fit=True):
        """
        Complete preprocessing pipeline
        """
        print("Handling missing values...")
        X = self.handle_missing_values(X)
        
        print("Removing outliers...")
        X = self.remove_outliers(X)
        
        print("Normalizing light curves...")
        X = self.normalize_curves(X, fit=fit)
        
        print("Padding/truncating sequences...")
        X = self.pad_or_truncate(X)
        
        # Add channel dimension for CNN
        X = X.reshape(X.shape[0], X.shape[1], 1)
        
        return X

In [4]:
class DataAugmentor:
    """
    Data augmentation techniques for light curves
    """
    
    @staticmethod
    def add_noise(X, noise_level=0.01):
        """Add Gaussian noise"""
        noise = np.random.normal(0, noise_level, X.shape)
        return X + noise
    
    @staticmethod
    def phase_shift(X, max_shift=100):
        """Random circular shift along time axis"""
        X_shifted = []
        for curve in X:
            shift = np.random.randint(-max_shift, max_shift)
            shifted_curve = np.roll(curve, shift, axis=0)
            X_shifted.append(shifted_curve)
        return np.array(X_shifted)
    
    @staticmethod
    def amplitude_scaling(X, scale_range=(0.95, 1.05)):
        """Random amplitude scaling"""
        scales = np.random.uniform(scale_range[0], scale_range[1], (X.shape[0], 1, 1))
        return X * scales
    
    @staticmethod
    def augment_batch(X, y, augmentation_factor=2):
        """
        Apply multiple augmentation techniques
        """
        X_aug_list = [X]
        y_aug_list = [y]
        
        for _ in range(augmentation_factor - 1):
            X_temp = X.copy()
            
            # Randomly apply augmentations
            if np.random.random() > 0.5:
                X_temp = DataAugmentor.add_noise(X_temp)
            if np.random.random() > 0.5:
                X_temp = DataAugmentor.phase_shift(X_temp)
            if np.random.random() > 0.5:
                X_temp = DataAugmentor.amplitude_scaling(X_temp)
            
            X_aug_list.append(X_temp)
            y_aug_list.append(y)
        
        return np.vstack(X_aug_list), np.hstack(y_aug_list)


def build_cnn_model(input_shape, num_classes=2):
    """
    Build 1D CNN architecture for exoplanet detection
    
    Architecture inspired by successful Kepler classification papers:
    - Multiple convolutional blocks with increasing filters
    - Batch normalization for training stability
    - Dropout for regularization
    - Global pooling to handle variable patterns
    """
    
    model = models.Sequential([
        # First Convolutional Block
        layers.Conv1D(64, kernel_size=5, activation='relu', 
                     padding='same', input_shape=input_shape),
        layers.BatchNormalization(),
        layers.MaxPooling1D(pool_size=2),
        layers.Dropout(0.2),
        
        # Second Convolutional Block
        layers.Conv1D(128, kernel_size=5, activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling1D(pool_size=2),
        layers.Dropout(0.2),
        
        # Third Convolutional Block
        layers.Conv1D(256, kernel_size=3, activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling1D(pool_size=2),
        layers.Dropout(0.3),
        
        # Fourth Convolutional Block
        layers.Conv1D(256, kernel_size=3, activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.GlobalMaxPooling1D(),  # Aggregate features
        
        # Dense Layers
        layers.Dense(128, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.3),
        
        # Output Layer
        layers.Dense(num_classes, activation='softmax')
    ])
    
    return model

In [5]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, callbacks
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

class ExoplanetClassifier:
    def __init__(self, sequence_length=3197, learning_rate=0.001):
        self.sequence_length = sequence_length
        self.learning_rate = learning_rate
        self.model = self.build_cnn_model()

    def build_cnn_model(self):
        model = models.Sequential([
            layers.Conv1D(32, 5, activation='relu', input_shape=(self.sequence_length, 1)),
            layers.MaxPooling1D(pool_size=2),
            layers.Conv1D(64, 5, activation='relu'),
            layers.MaxPooling1D(pool_size=2),
            layers.Flatten(),
            layers.Dense(64, activation='relu'),
            layers.Dropout(0.3),
            layers.Dense(2, activation='softmax')
        ])

        model.compile(
            optimizer=optimizers.Adam(learning_rate=self.learning_rate),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )

        return model

    def train(self, X_train, y_train, X_val, y_val, epochs=15, batch_size=32, patience=3):
        if len(X_train.shape) == 2:
            X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
            X_val = X_val.reshape((X_val.shape[0], X_val.shape[1], 1))

        print("✅ X_train shape:", X_train.shape)
        print("✅ y_train shape:", y_train.shape)
        print("✅ X_val shape:", X_val.shape)
        print("✅ y_val shape:", y_val.shape)

        early_stop = callbacks.EarlyStopping(monitor='val_loss', patience=patience, restore_best_weights=True)

        history = self.model.fit(
            X_train, y_train,
            epochs=epochs,
            batch_size=batch_size,
            validation_data=(X_val, y_val),
            callbacks=[early_stop],
            verbose=1
        )

        return history

    def evaluate(self, X_test, y_test):
        if len(X_test.shape) == 2:
            X_test = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))

        loss, acc = self.model.evaluate(X_test, y_test)
        print(f"\n📊 Test Accuracy: {acc:.4f}")
        y_pred = np.argmax(self.model.predict(X_test), axis=1)
        print("\nClassification Report:")
        print(classification_report(y_test, y_pred, digits=4))

    def save_model(self, path="best_exoplanet_model.h5"):
        self.model.save(path)
        print(f"✅ Model saved at {path}")


def main():
    print("="*60)
    print("KEPLER EXOPLANET DETECTION PIPELINE")
    print("="*60)

    # Try loading dataset; fallback to synthetic data if not found
    try:
        data = np.loadtxt("your_dataset.csv", delimiter=',', skiprows=1)
        X = data[:, :-1]
        y = data[:, -1].astype(int)
    except Exception as e:
        print(f"\n❌ Dataset not found or failed to load: {e}")
        print("\n🔧 Generating synthetic data for demonstration...")
        n_samples = 1000
        n_timesteps = 3197
        X = np.random.randn(n_samples, n_timesteps)
        y = np.random.randint(0, 2, n_samples)

    print(f"\nDataset size: {len(X)}")
    print(f"Class distribution: {np.bincount(y)}")
    print(f"Sequence length: {X.shape[1]}")

    n_timesteps = X.shape[1]
    classifier = ExoplanetClassifier(sequence_length=n_timesteps)

    X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
    X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

    print("\n" + "="*60)
    print("TRAINING")
    print("="*60)
    classifier.train(X_train, y_train, X_val, y_val, epochs=10, batch_size=32, patience=3)

    print("\n" + "="*60)
    print("TESTING")
    print("="*60)
    classifier.evaluate(X_test, y_test)
    classifier.save_model()

    print("\n" + "="*60)
    print("PIPELINE COMPLETE")
    print("="*60)


if __name__ == "__main__":
    main()


KEPLER EXOPLANET DETECTION PIPELINE

❌ Dataset not found or failed to load: your_dataset.csv not found.

🔧 Generating synthetic data for demonstration...

Dataset size: 1000
Class distribution: [487 513]
Sequence length: 3197

TRAINING
✅ X_train shape: (700, 3197, 1)
✅ y_train shape: (700,)
✅ X_val shape: (150, 3197, 1)
✅ y_val shape: (150,)
Epoch 1/10


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 96ms/step - accuracy: 0.5371 - loss: 1.0605 - val_accuracy: 0.5067 - val_loss: 0.6949
Epoch 2/10
[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 95ms/step - accuracy: 0.5486 - loss: 0.6832 - val_accuracy: 0.5067 - val_loss: 0.6977
Epoch 3/10
[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 59ms/step - accuracy: 0.6429 - loss: 0.6466 - val_accuracy: 0.5067 - val_loss: 0.7166
Epoch 4/10
[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 42ms/step - accuracy: 0.7529 - loss: 0.5348 - val_accuracy: 0.4533 - val_loss: 0.7370

TESTING
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step - accuracy: 0.4800 - loss: 0.6967

📊 Test Accuracy: 0.4800
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step

Classification Report:


  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])


              precision    recall  f1-score   support

           0     0.0000    0.0000    0.0000        78
           1     0.4800    1.0000    0.6486        72

    accuracy                         0.4800       150
   macro avg     0.2400    0.5000    0.3243       150
weighted avg     0.2304    0.4800    0.3114       150

✅ Model saved at best_exoplanet_model.h5

PIPELINE COMPLETE


In [6]:
import pandas as pd
import numpy as np

def main():
    """
    Complete pipeline execution example
    """
    
    print("="*60)
    print("KEPLER EXOPLANET DETECTION PIPELINE")
    print("="*60)

    # ✅ Keep file path as a string, not a DataFrame
    csv_file_path = 'keras_dataset_sorted copy.csv'
    label_col = 'LABEL'
    
    try:
        # ✅ Properly load data
        X, y = load_kepler_data("your_dataset.csv", label_column='koi_disposition')
    except FileNotFoundError:
        print(f"\n❌ ERROR: File '{csv_file_path}' not found!")
        print("\n📝 Please update the 'csv_file_path' variable with your actual file path.")
        print("   Example: csv_file_path = '/path/to/your/kepler_data.csv'")
        print("\n   Or use synthetic data for testing:")
        
        # Generate synthetic data for testing
        print("\n🔧 Generating synthetic data for demonstration...")
        n_samples = 5000
        n_timesteps = 3197
        X = np.random.randn(n_samples, n_timesteps)
        y = np.random.randint(0, 2, n_samples)
        
        # Make class imbalance (typical for exoplanet data)
        positive_indices = np.where(y == 1)[0]
        keep_positive = np.random.choice(positive_indices, 
                                         size=int(len(positive_indices) * 0.1), 
                                         replace=False)
        keep_negative = np.where(y == 0)[0]
        keep_indices = np.concatenate([keep_positive, keep_negative])
        X = X[keep_indices]
        y = y[keep_indices]
    
    print(f"\nDataset size: {len(X)}")
    print(f"Class distribution: {np.bincount(y)}")
    print(f"Sequence length: {X.shape[1]}")
    
    # ✅ Dynamically set sequence length
    n_timesteps = X.shape[1]
    
    # Initialize classifier
    classifier = ExoplanetClassifier(sequence_length=n_timesteps)
    
    # Prepare data
    X_train, X_val, X_test, y_train, y_val, y_test = classifier.prepare_data(
        X, y, 
        test_size=0.15,
        val_size=0.15,
        use_smote=False,  # Set to True for SMOTE oversampling
        augment_train=True  # Apply data augmentation
    )
    
    # Build model
    classifier.build_and_compile(learning_rate=0.001)
    
    # Train model
    print("\n" + "="*60)
    print("TRAINING")
    print("="*60)
    classifier.train(
        X_train, y_train,
        X_val, y_val,
        epochs=50,
        batch_size=32,
        patience=10
    )
    
    # Plot training history
    classifier.plot_training_history()
    
    # Evaluate on test set
    print("\n" + "="*60)
    print("TESTING")
    print("="*60)
    y_pred, y_pred_proba = classifier.evaluate(X_test, y_test)
    
    print("\n" + "="*60)
    print("PIPELINE COMPLETE")
    print("="*60)
    print("\nModel saved as: best_exoplanet_model.h5")
    print("Training history saved as: training_history.png")
    print("Evaluation results saved as: evaluation_results.png")


if __name__ == "__main__":
    main()


KEPLER EXOPLANET DETECTION PIPELINE


NameError: name 'load_kepler_data' is not defined