In [2]:
import numpy as np
import matplotlib.pyplot as plt

from astropy.io import fits
from astropy.stats import sigma_clipped_stats

from sklearn.preprocessing import RobustScaler
from sklearn.preprocessing import StandardScaler
import cv2

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score, precision_recall_curve, auc
from sklearn.utils import class_weight

from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier

import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.applications import ResNet50

print("All libraries imported successfully.")

All libraries imported successfully.


In [3]:
def load_fits_image(filepath):
    try:
        with fits.open(filepath) as hdul:
            image_data = hdul[0].data
            header = hdul[0].header
            
            if image_data is not None:
                image_data = image_data.astype(np.float32)
            return image_data, header
    except Exception as e:
        print(f"Error loading FITS file {filepath}: {e}")
        return None, None

def subtract_background(image_data):
    if image_data is None:
        return None
    mean, median_bkg, stddev = sigma_clipped_stats(image_data, sigma=3.0)
    return image_data - median_bkg

def robust_scale_image(image_data):
    if image_data is None:
        return None
    scaler = RobustScaler()
    
    pixels = image_data.flatten().reshape(-1, 1)
    
    scaled_pixels = scaler.fit_transform(pixels)
    
    return scaled_pixels.reshape(image_data.shape)

def resize_image(image_data, target_size=(64, 64)):
    if image_data is None:
        return None
    return cv2.resize(image_data, target_size, interpolation=cv2.INTER_AREA)

print("Preprocessing functions defined.")

Preprocessing functions defined.


In [4]:
# Define a sequential model for data augmentation
data_augmentation_pipeline = tf.keras.Sequential([
    layers.RandomFlip("horizontal_and_vertical"),  # Randomly flip images horizontally and vertically
    layers.RandomRotation(0.5),  # Corresponds to 180 degrees
    # Small translations and gaussian noise can also be added here
], name="data_augmentation")

print("Data augmentation pipeline defined.")


Data augmentation pipeline defined.


In [5]:
import pandas as pd # Make sure pandas is imported for reading CSV files

# --- 1. Load the Real Data ---
# Paths are set for your "FLARE/dataset/" folder structure
ATLAS_IMG_PATH = 'dataset/ATLAS_images.npy'
ATLAS_LBL_PATH = 'dataset/ATLAS_labels.csv'
MEER_IMG_PATH  = 'dataset/MeerLICHT_images.npy'
MEER_LBL_PATH  = 'dataset/MeerLICHT_labels.csv'
PAN_IMG_PATH   = 'dataset/PANSTARRS_images.npy'
PAN_LBL_PATH   = 'dataset/PANSTARRS_labels.csv'

print("Loading real dataset files...")
try:
    # Load image arrays
    atlas_images = np.load(ATLAS_IMG_PATH)
    meer_images = np.load(MEER_IMG_PATH)
    pan_images = np.load(PAN_IMG_PATH)
    
    # Load label CSVs
    atlas_labels_df = pd.read_csv(ATLAS_LBL_PATH)
    meer_labels_df = pd.read_csv(MEER_LBL_PATH)
    pan_labels_df = pd.read_csv(PAN_LBL_PATH)

    # Combine all data into single arrays
    X_raw = np.concatenate([atlas_images, meer_images, pan_images], axis=0)
    labels_df = pd.concat([atlas_labels_df, meer_labels_df, pan_labels_df], ignore_index=True)
    
    print(f"Loaded {len(X_raw)} total images.")
    
    # --- Process Labels ---
    LABEL_COLUMN = 'label' 
    
    # Convert text labels "Real" / "Bogus" to 1 / 0
    # Your project guide identifies "Real" as the positive class
    y_raw = labels_df[LABEL_COLUMN].map({'Real': 1, 'Bogus': 0}).values
    
    # Check if all labels were mapped (i.e., no NaNs)
    nan_count = np.isnan(y_raw).sum()
    if nan_count > 0:
        print(f"Warning: {nan_count} labels were not 'Real' or 'Bogus' and are now NaN.")
        # For simplicity, we'll drop these rows with missing labels.
        print("Dropping rows with missing labels...")
        X_raw = X_raw[~np.isnan(y_raw)]
        y_raw = y_raw[~np.isnan(y_raw)]
        
except FileNotFoundError as e:
    print(f"--- ERROR: File not found. ---")
    print(f"Could not find: {e.filename}")
    print("Please make sure your 'dataset' folder is in the same directory as your 'FLARE.ipynb' notebook.")
    # Create tiny placeholder data so the notebook can still run for testing
    X_raw = np.random.rand(10, 100, 100, 3) 
    y_raw = np.array([0,1,0,0,0,0,0,0,1,0])
except KeyError as e:
    print(f"--- ERROR: Label column not found. ---")
    print(f"Could not find column {e} in your CSV files.")
    print(f"Please update the 'LABEL_COLUMN' variable in this cell if 'label' is incorrect.")
    # Create tiny placeholder data so the notebook can still run for testing
    X_raw = np.random.rand(10, 100, 100, 3) 
    y_raw = np.array([0,1,0,0,0,0,0,0,1,0])
except Exception as e:
    print(f"An unexpected error occurred: {e}")


# --- 2. Process the data using your pipeline ---
# The loaded .npy files are (N, 100, 100, 3)
# The preprocessing functions (subtract_background, robust_scale_image)
# are designed for 2D images. We must process each of the 3 channels individually.
print("Processing real image data... (This may take a minute)")
TARGET_SIZE = (64, 64) # Target size from your project guide
processed_triplets = []

for i in range(len(X_raw)):
    triplet = X_raw[i] # This is one (100, 100, 3) image
    
    # Separate the three channels (science, reference, difference)
    sci_raw = triplet[:, :, 0]
    ref_raw = triplet[:, :, 1]
    diff_raw = triplet[:, :, 2]
    
    # Apply the full preprocessing pipeline (from Cell 2) to each 2D channel
    sci_proc = resize_image(robust_scale_image(subtract_background(sci_raw)), TARGET_SIZE)
    ref_proc = resize_image(robust_scale_image(subtract_background(ref_raw)), TARGET_SIZE)
    diff_proc = resize_image(robust_scale_image(subtract_background(diff_raw)), TARGET_SIZE)
    
    # Re-stack the processed 2D channels back into a (64, 64, 3) triplet
    processed_triplet = np.stack([sci_proc, ref_proc, diff_proc], axis=-1)
    processed_triplets.append(processed_triplet)

# Convert the list of processed images into a single NumPy array
X = np.array(processed_triplets)
y = np.array(y_raw).astype(int) # Ensure labels are integers

print(f"Data shape (X): {X.shape}")
print(f"Labels shape (y): {y.shape}")
print(f"Class distribution (0=Bogus, 1=Real): {np.bincount(y)}")


# --- 3. Create Stratified Splits ---
# Use stratified sampling to maintain the same percentage of "real"/"bogus" in all splits
# This is critical for imbalanced datasets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, 
    test_size=0.2,       # 20% of data will be for the test set
    random_state=42,     # For reproducible results
    stratify=y           # Use labels for stratification
)

# Split the training data again to create a validation set
X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, 
    test_size=0.2,       # 20% of the *remaining* 80% (i.e., 16% of total)
    random_state=42,     
    stratify=y_train     # Stratify again
)

print(f"Train shapes: X={X_train.shape}, y={y_train.shape}")
print(f"Val shapes:   X={X_val.shape}, y={y_val.shape}")
print(f"Test shapes:  X={X_test.shape}, y={y_test.shape}")

Loading real dataset files...
Loaded 7215 total images.
Processing real image data... (This may take a minute)
Data shape (X): (7215, 64, 64, 3)
Labels shape (y): (7215,)
Class distribution (0=Bogus, 1=Real): [3571 3644]
Train shapes: X=(4617, 64, 64, 3), y=(4617,)
Val shapes:   X=(1155, 64, 64, 3), y=(1155,)
Test shapes:  X=(1443, 64, 64, 3), y=(1443,)


In [6]:
print("--- Method 1: Classical Baselines ---")

# 1. Flatten the images [cite: 232, 238]
# Note: The guide [cite: 241-242] has a typo. This is the correct way to reshape.
X_train_flat = X_train.reshape(X_train.shape[0], -1)
X_val_flat = X_val.reshape(X_val.shape[0], -1)
X_test_flat = X_test.reshape(X_test.shape[0], -1)
print(f"Flattened shape: {X_train_flat.shape}")

# 2. Scale data before PCA [cite: 243-244]
scaler_pca = StandardScaler()
X_train_scaled = scaler_pca.fit_transform(X_train_flat) # [cite: 245]
X_val_scaled = scaler_pca.transform(X_val_flat) # [cite: 246]
X_test_scaled = scaler_pca.transform(X_test_flat) # [cite: 246]

# 3. Apply PCA [cite: 247]
# Choose n_components to explain 95% of variance [cite: 248]
pca = PCA(n_components=0.95)
# Fit PCA ONLY on the training data [cite: 249]
pca.fit(X_train_scaled) # [cite: 249]
print(f"Number of components chosen by PCA: {pca.n_components_}") # [cite: 250]

# 4. Transform all datasets [cite: 251-252]
X_train_pca = pca.transform(X_train_scaled)
X_val_pca = pca.transform(X_val_scaled)
X_test_pca = pca.transform(X_test_scaled)

print(f"Original feature dimension: {X_train_flat.shape[1]}") # [cite: 253]
print(f"Reduced feature dimension: {X_train_pca.shape[1]}") # [cite: 254]

--- Method 1: Classical Baselines ---
Flattened shape: (4617, 12288)
Number of components chosen by PCA: 418
Original feature dimension: 12288
Reduced feature dimension: 418


In [7]:
# Initialize and train the model
# Use class_weight='balanced' to handle imbalance
lr_model = LogisticRegression(max_iter=1000, class_weight='balanced')
lr_model.fit(X_train_pca, y_train) # [cite: 264]

# Evaluate on the validation set
y_val_pred_lr = lr_model.predict(X_val_pca)
print("--- Logistic Regression Performance ---")
print(classification_report(y_val, y_val_pred_lr))

--- Logistic Regression Performance ---
              precision    recall  f1-score   support

           0       0.80      0.87      0.83       572
           1       0.86      0.78      0.82       583

    accuracy                           0.83      1155
   macro avg       0.83      0.83      0.83      1155
weighted avg       0.83      0.83      0.83      1155



In [8]:
# Initialize and train the model
# Use class_weight='balanced' to handle imbalance [cite: 213, 273]
rf_model = RandomForestClassifier(
    n_estimators=100, 
    class_weight='balanced', # [cite: 273]
    random_state=42, 
    n_jobs=-1
) # [cite: 273-274]
rf_model.fit(X_train_pca, y_train) # [cite: 274]

# Evaluate on the validation set
y_val_pred_rf = rf_model.predict(X_val_pca) # [cite: 275]
print("--- Random Forest Performance ---") # [cite: 275]
print(classification_report(y_val, y_val_pred_rf)) # [cite: 276]

--- Random Forest Performance ---
              precision    recall  f1-score   support

           0       0.82      0.90      0.86       572
           1       0.89      0.81      0.85       583

    accuracy                           0.85      1155
   macro avg       0.85      0.85      0.85      1155
weighted avg       0.85      0.85      0.85      1155



In [9]:
from tensorflow.keras import models, layers

def build_custom_cnn(input_shape=(64, 64, 3), num_classes=1):
    """
    Builds a custom CNN model.
    """
    model = models.Sequential(name="CustomCNN")
    model.add(layers.Input(shape=input_shape))
    
    # Block 1
    model.add(layers.Conv2D(32, (3, 3), activation='relu', padding='same'))
    model.add(layers.MaxPooling2D((2, 2)))
    
    # Block 2
    model.add(layers.Conv2D(64, (3, 3), activation='relu', padding='same'))
    model.add(layers.MaxPooling2D((2, 2)))
    
    # Block 3
    model.add(layers.Conv2D(128, (3, 3), activation='relu', padding='same'))
    model.add(layers.MaxPooling2D((2, 2)))
    
    # Classifier Head
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    
    # Output layer
    if num_classes == 1:
        activation = 'sigmoid'
        loss = 'binary_crossentropy'
    else:
        activation = 'softmax'
        loss = 'categorical_crossentropy'
        
    model.add(layers.Dense(num_classes, activation=activation))
    
    # Compile the model
    model.compile(optimizer='adam',
                  loss=loss,
                  metrics=['accuracy'])
    
    return model

# Example usage for binary classification
cnn_model = build_custom_cnn(input_shape=(64, 64, 3), num_classes=1)
cnn_model.summary()


In [10]:
print("--- Method 2: Training Custom CNN ---")

# Calculate class weights to handle imbalance
weights = class_weight.compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
class_weights_dict = dict(enumerate(weights))
print(f"Using class weights: {class_weights_dict}")

# Create an augmented data pipeline
# Apply augmentation only to the training data
train_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train))
train_ds = train_ds.shuffle(buffer_size=len(y_train))
train_ds = train_ds.batch(32)
train_ds = train_ds.map(lambda x, y: (data_augmentation_pipeline(x, training=True), y),
                         num_parallel_calls=tf.data.AUTOTUNE)
train_ds = train_ds.prefetch(buffer_size=tf.data.AUTOTUNE)

# Create validation dataset (no augmentation)
val_ds = tf.data.Dataset.from_tensor_slices((X_val, y_val)).batch(32).prefetch(buffer_size=tf.data.AUTOTUNE)

# Train the model
history_cnn = cnn_model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=5,  # Use a small number for this demo
    class_weight=class_weights_dict,
    verbose=1
)

print("Custom CNN training complete.")


--- Method 2: Training Custom CNN ---
Using class weights: {0: 1.010284463894967, 1: 0.9899228130360206}
Epoch 1/5
[1m145/145[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 54ms/step - accuracy: 0.6158 - loss: 1.5449 - val_accuracy: 0.6996 - val_loss: 0.5804
Epoch 2/5
[1m145/145[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 51ms/step - accuracy: 0.7966 - loss: 0.5199 - val_accuracy: 0.8563 - val_loss: 0.3536
Epoch 3/5
[1m145/145[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 45ms/step - accuracy: 0.8527 - loss: 0.4036 - val_accuracy: 0.8442 - val_loss: 0.3117
Epoch 4/5
[1m145/145[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 46ms/step - accuracy: 0.8830 - loss: 0.3113 - val_accuracy: 0.8537 - val_loss: 0.3465
Epoch 5/5
[1m145/145[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 49ms/step - accuracy: 0.8969 - loss: 0.2757 - val_accuracy: 0.9177 - val_loss: 0.2520
Custom CNN training complete.


In [11]:
from tensorflow.keras import layers
import tensorflow as tf

def build_transfer_model(input_shape=(64, 64, 3), num_classes=1):
    """
    Builds a transfer learning model using ResNet50.
    """
    # 1. Load the pre-trained base model
    base_model = tf.keras.applications.ResNet50(
        input_shape=input_shape,
        include_top=False,
        weights='imagenet'
    )
    
    # 2. Freeze the base model
    base_model.trainable = False
    
    # 3. Add a custom classification head
    inputs = tf.keras.Input(shape=input_shape)
    
    # Preprocess inputs for ResNet
    x = tf.keras.applications.resnet.preprocess_input(inputs)
    
    x = base_model(x, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(0.5)(x)
    
    if num_classes == 1:
        activation = 'sigmoid'
    else:
        activation = 'softmax'
        
    outputs = layers.Dense(num_classes, activation=activation)(x)
    model = tf.keras.Model(inputs, outputs, name="TransferLearningModel")
    
    return model, base_model

transfer_model, base_model = build_transfer_model(input_shape=(64, 64, 3), num_classes=1)
transfer_model.summary()


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m94765736/94765736[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 1us/step


In [12]:
print("--- Method 3: Training Transfer Learning Model ---")

# --- Phase 1: Train only the new head ---
print("--- Phase 1: Training Head ---")
transfer_model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy']
)

history_tl = transfer_model.fit(
    train_ds,  # Using the augmented dataset from previous cells
    validation_data=val_ds,
    epochs=3,  # Train head for a few epochs
    class_weight=class_weights_dict,
    verbose=1
)

# --- Phase 2: Fine-Tuning ---
print("--- Phase 2: Fine-Tuning ---")
# Unfreeze the base model (or top layers)
base_model.trainable = True

# Re-compile with a very low learning rate
transfer_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5),
    loss='binary_crossentropy',
    metrics=['accuracy']
)

history_ft = transfer_model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=3,  # Continue training
    class_weight=class_weights_dict,
    verbose=1
)

print("Transfer learning training complete.")


--- Method 3: Training Transfer Learning Model ---
--- Phase 1: Training Head ---
Epoch 1/3
[1m145/145[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m43s[0m 238ms/step - accuracy: 0.7414 - loss: 0.7707 - val_accuracy: 0.8216 - val_loss: 0.4088
Epoch 2/3
[1m145/145[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 224ms/step - accuracy: 0.8183 - loss: 0.5579 - val_accuracy: 0.8580 - val_loss: 0.3743
Epoch 3/3
[1m145/145[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m46s[0m 257ms/step - accuracy: 0.8265 - loss: 0.5234 - val_accuracy: 0.8684 - val_loss: 0.3228
--- Phase 2: Fine-Tuning ---
Epoch 1/3
[1m145/145[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m154s[0m 770ms/step - accuracy: 0.6117 - loss: 0.7822 - val_accuracy: 0.5879 - val_loss: 0.6616
Epoch 2/3
[1m145/145[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m107s[0m 738ms/step - accuracy: 0.7033 - loss: 0.6630 - val_accuracy: 0.7255 - val_loss: 0.5809
Epoch 3/3
[1m145/145[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [

In [13]:
print("--- Method 4: Convolutional Autoencoder ---")

# --- 1. Define the CAE Architecture ---
latent_dim = 64
input_shape = (64, 64, 3)

# Encoder
encoder_inputs = tf.keras.Input(shape=input_shape)
x = layers.Conv2D(32, 3, activation='relu', padding='same')(encoder_inputs)
x = layers.MaxPooling2D(2, padding='same')(x)
x = layers.Conv2D(64, 3, activation='relu', padding='same')(x)
x = layers.MaxPooling2D(2, padding='same')(x)
x = layers.Flatten()(x)
encoder_outputs = layers.Dense(latent_dim, activation='relu')(x)
encoder = tf.keras.Model(encoder_inputs, encoder_outputs, name="encoder")

# Decoder
decoder_inputs = tf.keras.Input(shape=(latent_dim,))
x = layers.Dense(16 * 16 * 64, activation='relu')(decoder_inputs)
x = layers.Reshape((16, 16, 64))(x)
x = layers.Conv2DTranspose(64, 3, activation='relu', padding='same')(x)
x = layers.UpSampling2D(2)(x)
x = layers.Conv2DTranspose(32, 3, activation='relu', padding='same')(x)
x = layers.UpSampling2D(2)(x)
decoder_outputs = layers.Conv2D(3, 3, activation='sigmoid', padding='same')(x)
decoder = tf.keras.Model(decoder_inputs, decoder_outputs, name="decoder")

# Autoencoder
autoencoder = tf.keras.Model(encoder_inputs, decoder(encoder_outputs), name="autoencoder")
autoencoder.compile(optimizer='adam', loss='mean_squared_error')
autoencoder.summary()

# --- 2. Train the CAE ---
# Train ONLY on "bogus" (normal) images
X_train_bogus = X_train[y_train == 0]
print(f"Training autoencoder on {len(X_train_bogus)} 'bogus' images...")

# The input (x) and target (y) are the same images
autoencoder.fit(
    X_train_bogus, X_train_bogus,
    epochs=5,  # Use small number for demo
    batch_size=32,
    validation_data=(X_val[y_val == 0], X_val[y_val == 0]),  # Validate on bogus images
    verbose=1
)
print("Autoencoder training complete.")


--- Method 4: Convolutional Autoencoder ---


Training autoencoder on 2285 'bogus' images...
Epoch 1/5
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 108ms/step - loss: 7487.4370 - val_loss: 7625.7812
Epoch 2/5
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 103ms/step - loss: 7487.1221 - val_loss: 7625.6006
Epoch 3/5
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 102ms/step - loss: 7486.9854 - val_loss: 7625.5698
Epoch 4/5
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 90ms/step - loss: 7486.9131 - val_loss: 7626.0063
Epoch 5/5
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 95ms/step - loss: 7486.7930 - val_loss: 7625.5542
Autoencoder training complete.
