# Import Libraries

In [None]:
import os
import zipfile
import gdown
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns

# Data Preprocessing

In [None]:
# Set url and download paths
file_id = '1ZEyNMEO43u3qhJAwJeBZxFBEYc_pVYZQ'
zip_file = 'dataset.zip'
url = f'https://drive.google.com/uc?id={file_id}'
extract_dir = 'extracted_contents'

In [None]:
# Function to download and extract dataset
def download_and_extract_data(url, zip_file, extract_dir):
    gdown.download(url, zip_file, quiet=False)
    os.makedirs(extract_dir, exist_ok=True)
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)

In [None]:
# Download and extract dataset
download_and_extract_data(url=url, zip_file=zip_file, extract_dir=extract_dir)

In [None]:
# Set paths
BASE_DIR = os.path.join(extract_dir, 'dataset')
TRAIN_DIR = os.path.join(BASE_DIR, 'train')
VAL_DIR = os.path.join(BASE_DIR, 'val')
CLASSES = ['no', 'sphere', 'vort']


In [None]:
# Data loading function
def load_data(directory, classes):
    images = []
    labels = []
    
    for idx, class_name in enumerate(classes):
        class_dir = os.path.join(directory, class_name)
        for file_name in os.listdir(class_dir):
            if file_name.endswith('.npy'):
                file_path = os.path.join(class_dir, file_name)
                img = np.load(file_path)
                # Reshape from (1, 150, 150) to (150, 150, 1) for CNN input
                img = np.transpose(img, (1, 2, 0))
                images.append(img)
                labels.append(idx)
    
    return np.array(images), np.array(labels)

In [None]:
# Load training and validation data
X_train, y_train = load_data(TRAIN_DIR, CLASSES)
X_val, y_val = load_data(VAL_DIR, CLASSES)

In [None]:
print(f"Training data shape: {X_train.shape}")
print(f"Training labels shape: {y_train.shape}")
print(f"Validation data shape: {X_val.shape}")
print(f"Validation labels shape: {y_val.shape}")

In [None]:
# Convert labels to one-hot encoding
y_train = tf.keras.utils.to_categorical(y_train, num_classes=len(CLASSES))
y_val = tf.keras.utils.to_categorical(y_val, num_classes=len(CLASSES))

In [None]:
# Data normalization
X_train = X_train / np.max(X_train)
X_val = X_val / np.max(X_val)

In [None]:
#Visualization function
def visualize_samples(images, labels, num_samples=5):
    fig, axes = plt.subplots(1, num_samples, figsize=(15, 3))
    for i in range(num_samples):
        idx = np.random.randint(0, len(images))
        axes[i].imshow(images[idx].squeeze(), cmap='viridis')
        class_idx = np.argmax(labels[idx]) if len(labels.shape) > 1 else labels[idx]
        axes[i].set_title(f"Class: {CLASSES[class_idx]}")
        axes[i].axis('off')
    plt.tight_layout()
    plt.show()

# Model 

In [None]:
# Model architecture - using a CNN approach suitable for astronomical images
def create_model(input_shape=(150, 150, 1)):
    model = models.Sequential([
        # First convolutional block
        layers.Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=input_shape),
        layers.BatchNormalization(),
        layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),
        
        # Second convolutional block
        layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),
        
        # Third convolutional block
        layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),
        
        # Fourth convolutional block
        layers.Conv2D(256, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.Conv2D(256, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),
        
        # Flatten and dense layers
        layers.Flatten(),
        layers.Dense(512, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(len(CLASSES), activation='softmax')
    ])
    
    return model


In [None]:

# Create the model
model = create_model()
model.summary()


In [None]:

# Compile the model
model.compile(
    optimizer=optimizers.Adam(learning_rate=0.001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)


In [None]:

# Define callbacks for training
callbacks = [
    ModelCheckpoint('best_model.h5', save_best_only=True, monitor='val_accuracy'),
    EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6)
]


In [None]:

# Data augmentation to improve model generalization
data_augmentation = tf.keras.Sequential([
    layers.RandomFlip("horizontal"),
    layers.RandomRotation(0.1),
    layers.RandomZoom(0.1),
])

# Define a custom training procedure with data augmentation
def train_with_augmentation(model, X_train, y_train, X_val, y_val, batch_size=32, epochs=50):
    # Create datasets
    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
    train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)
    
    # Apply augmentation only to training data
    train_dataset = train_dataset.map(
        lambda x, y: (data_augmentation(x, training=True), y)
    )
    
    val_dataset = tf.data.Dataset.from_tensor_slices((X_val, y_val)).batch(batch_size)
    
    # Train the model
    history = model.fit(
        train_dataset,
        epochs=epochs,
        validation_data=val_dataset,
        callbacks=callbacks,
        verbose=1
    )
    
    return history


In [None]:

# Alternative approach: use the standard fit method
def train_standard(model, X_train, y_train, X_val, y_val, batch_size=32, epochs=50):
    history = model.fit(
        X_train, y_train,
        batch_size=batch_size,
        epochs=epochs,
        validation_data=(X_val, y_val),
        callbacks=callbacks,
        verbose=1
    )
    
    return history


In [None]:

# Choose one training approach
history = train_with_augmentation(model, X_train, y_train, X_val, y_val)
# history = train_standard(model, X_train, y_train, X_val, y_val)


In [None]:

# Plot training history
def plot_history(history):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # Plot accuracy
    ax1.plot(history.history['accuracy'])
    ax1.plot(history.history['val_accuracy'])
    ax1.set_title('Model Accuracy')
    ax1.set_ylabel('Accuracy')
    ax1.set_xlabel('Epoch')
    ax1.legend(['Train', 'Validation'], loc='lower right')
    
    # Plot loss
    ax2.plot(history.history['loss'])
    ax2.plot(history.history['val_loss'])
    ax2.set_title('Model Loss')
    ax2.set_ylabel('Loss')
    ax2.set_xlabel('Epoch')
    ax2.legend(['Train', 'Validation'], loc='upper right')
    
    plt.tight_layout()
    plt.show()


In [None]:

# Evaluate the model
def evaluate_model(model, X_val, y_val):
    # Get predictions
    y_pred = model.predict(X_val)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true_classes = np.argmax(y_val, axis=1)
    
    # Print classification report
    print("\nClassification Report:")
    print(classification_report(y_true_classes, y_pred_classes, target_names=CLASSES))
    
    # Plot confusion matrix
    cm = confusion_matrix(y_true_classes, y_pred_classes)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=CLASSES, yticklabels=CLASSES)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.tight_layout()
    plt.show()


In [None]:

# Try different model architectures

# Transfer learning approach with ResNet50
def create_transfer_model(input_shape=(150, 150, 1)):
    # Convert grayscale to RGB by repeating channels
    inputs = layers.Input(shape=input_shape)
    x = layers.Concatenate()([inputs, inputs, inputs])  # Duplicate the channel 3 times
    
    # Use pre-trained ResNet50 (without top layers)
    base_model = tf.keras.applications.ResNet50(
        include_top=False,
        weights='imagenet',
        input_shape=(150, 150, 3)
    )
    
    # Freeze the base model layers
    base_model.trainable = False
    
    # Add custom classification head
    x = base_model(x)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(512, activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(len(CLASSES), activation='softmax')(x)
    
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    return model
