In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, callbacks
from tensorflow.keras.applications import EfficientNetB0
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
import pyarrow.parquet as pq
import glob


In [None]:
# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
print("Num GPUs Available:", len(tf.config.experimental.list_physical_devices('GPU')))


Num GPUs Available: 1


In [None]:
# tf.config.experimental.set_memory_growth(tf.config.experimental.list_physical_devices('GPU')[0], True)

# Check if any GPUs are available before attempting to access them
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    # Set memory growth for the first GPU
    tf.config.experimental.set_memory_growth(gpus[0], True)
    print("Memory growth enabled for GPU:", gpus[0])
else:
    print("No GPUs available. Running on CPU instead.")

Memory growth enabled for GPU: PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')


In [None]:
# !pip install --upgrade "dask[complete]==2024.12.1"

In [None]:
import cudf
import dask_cudf

In [None]:
from google.colab import drive
drive.mount("/content/drive", force_remount=True)


Mounted at /content/drive


In [None]:
# import dask.dataframe as dd

# # Correct file path
# file_path = "/content/drive/MyDrive/Colab Notebooks/top_gun_opendata_3.parquet"

# # Load dataset
# df = dd.read_parquet(file_path, engine="pyarrow")

# # Display dataset info
# print(df.head())


In [None]:
import dask.dataframe as dd
import dask

# Set Dask configuration to limit memory usage
dask.config.set({"dataframe.shuffle.method": "disk"})
dask.config.set({"distributed.worker.memory.target": 0.7})  # Use 60% of available memory
dask.config.set({"distributed.worker.memory.spill": 0.8})   # Spill to disk at 70% memory

# Load dataset with optimized parameters
file_path = "/content/drive/MyDrive/Colab Notebooks/top_gun_opendata_3.parquet"
df = dd.read_parquet(
    file_path,
    engine="pyarrow",
    blocksize="64MB",
    split_row_groups=True   # Split row groups for better parallelization
)

# Display dataset info WITHOUT computing (important!)
print("DataFrame partitions:", df.npartitions)
print("DataFrame columns:", df.columns)
print("DataFrame dtypes:", df.dtypes)


# Instead of df.head() which computes, use:
sample = df.partitions[0].compute()  # Only compute the first partition
print("Sample from first partition:")
print(sample.head())


DataFrame partitions: 150448
DataFrame columns: Index(['X_jet', 'm', 'iphi', 'pt', 'ieta'], dtype='object')
DataFrame dtypes: X_jet     object
m        float64
iphi     float64
pt       float64
ieta     float64
dtype: object
Sample from first partition:
                                               X_jet           m  iphi  \
0  [[[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0...  232.846863  66.0   

           pt  ieta  
0  865.297302  26.0  


In [None]:
df.partitions[0].compute()

Unnamed: 0,X_jet,m,iphi,pt,ieta
0,"[[[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0...",232.846863,66.0,865.297302,26.0


In [None]:
df.partitions[5].compute()

Unnamed: 0,X_jet,m,iphi,pt,ieta
0,"[[[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0...",465.120544,11.0,616.891174,17.0


In [None]:
df.columns

Index(['X_jet', 'm', 'iphi', 'pt', 'ieta'], dtype='object')

In [None]:
def preprocess_data(df):
    # Extract target variable (particle mass)
    y = df['m'].values

    # Extract image features (X_jet with the required channels)
    # Assum X_jet contains the channels in this order: [Track pT, DZ, D0, ECAL]
    # used all channels as mentioned in the requirements (at least ECAL and Track pT)

    x_jet_columns = [col for col in df.columns if 'X_jet' in col]

    print(f"Number of X_jet columns: {len(x_jet_columns)}")
    print(f"Sample X_jet column names: {x_jet_columns[:5] if len(x_jet_columns) >= 5 else x_jet_columns}")

    # Reshape the data based on ieta, iphi dimensions and channels
    # Assuming the data structure follows the 125x125 matrix with ieta, iphi coordinates

    # Get unique ieta and iphi values to determine dimensions
    if 'ieta' in df.columns and 'iphi' in df.columns:
        ieta_dim = len(df['ieta'].unique())
        iphi_dim = len(df['iphi'].unique())
        print(f"Detected dimensions: ieta={ieta_dim}, iphi={iphi_dim}")
    else:
        # Default to the mentioned 125x125 if coordinates are not explicitly in columns
        ieta_dim, iphi_dim = 125, 125
        print(f"Using default dimensions: ieta={ieta_dim}, iphi={iphi_dim}")

    # Extract X_jet data and reshape
    try:
        # Approach 1: If X_jet is stored as separate columns for each channel and position
        if len(x_jet_columns) >= ieta_dim * iphi_dim * 4:  
            X = np.zeros((len(df), ieta_dim, iphi_dim, 4))

            # Logic to reshape data from flat columns to 4D tensor
            # This is a placeholder and needs to be adapted to  specific data format
            print("Reshaping data from columns to 4D tensor...")

        # Approach 2: If X_jet is already stored as a 4D tensor or can be easily reshaped
        elif 'X_jet' in df.columns and isinstance(df['X_jet'].iloc[0], (np.ndarray, list)):
            X = np.stack(df['X_jet'].values)
            print(f"Loaded X_jet as a tensor with shape: {X.shape}")

        else:
            # Another approach: If data is stored differently, adapt accordingly
            print("Data format not recognized. Please adapt the preprocessing code.")
            # Placeholder for demonstration
            X = np.random.rand(len(df), ieta_dim, iphi_dim, 4)

    except Exception as e:
        print(f"Error during data reshaping: {e}")
        print("Creating a placeholder tensor for demonstration purposes.")
        X = np.random.rand(len(df), ieta_dim, iphi_dim, 4)

    # Normalize the data
    # For each channel separately
    for i in range(X.shape[3]):
        channel_data = X[:, :, :, i]
        mean = np.mean(channel_data)
        std = np.std(channel_data)
        X[:, :, :, i] = (channel_data - mean) / (std + 1e-10)  # Add small epsilon to avoid division by zero

    print(f"Processed data shape: X={X.shape}, y={y.shape}")
    return X, y


In [None]:
# preprocess_data(df)

In [None]:
def build_cnn_model(input_shape, use_efficient_net=True):
    inputs = tf.keras.Input(shape=input_shape)

    if use_efficient_net:
        # Use EfficientNetB0 as base model (without top layers)
        base_model = EfficientNetB0(weights='imagenet', include_top=False, input_shape=input_shape)

        # Make base model trainable
        base_model.trainable = True

        # If input has 4 channels but EfficientNet expects 3, adapt accordingly
        if input_shape[2] != 3:
            # Add a 1x1 convolution to adapt the number of channels
            x = layers.Conv2D(3, (1, 1), padding='same')(inputs)
            x = base_model(x)
        else:
            x = base_model(inputs)

        # Add pooling
        x = layers.GlobalAveragePooling2D()(x)

    else:
        # Custom CNN architecture
        x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
        x = layers.BatchNormalization()(x)
        x = layers.MaxPooling2D((2, 2))(x)

        x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.MaxPooling2D((2, 2))(x)

        x = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.MaxPooling2D((2, 2))(x)

        x = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.GlobalAveragePooling2D()(x)

    # Regression head
    x = layers.Dense(256, activation='relu')(x)
    x = layers.Dropout(0.3)(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.3)(x)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dropout(0.3)(x)

    # Output layer (no activation for regression)
    outputs = layers.Dense(1)(x)

    # Create the model
    model = tf.keras.Model(inputs=inputs, outputs=outputs)

    return model


In [None]:
def compile_and_train(model, X_train, y_train, X_val, y_val, batch_size=32, epochs=15):
    # Compile model
    model.compile(
        optimizer=optimizers.Adam(learning_rate=0.001),
        loss='mse',
        metrics=['mae']
    )

    # Define callbacks
    early_stopping = callbacks.EarlyStopping(
        monitor='val_loss',
        patience=10,
        restore_best_weights=True,
        min_delta=0.0001
    )

    reduce_lr = callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.2,
        patience=5,
        min_lr=1e-6
    )

    # Train model
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        batch_size=batch_size,
        epochs=epochs,
        callbacks=[early_stopping, reduce_lr],
        verbose=1
    )

    return model, history

In [None]:
def evaluate_model(model, X_test, y_test):
    # Predict on test set
    y_pred = model.predict(X_test).flatten()

    # Calculate metrics
    mae = mean_absolute_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)

    print(f"Test Mean Absolute Error: {mae:.4f}")
    print(f"Test R² Score: {r2:.4f}")

    # Plot predictions vs actual
    plt.figure(figsize=(10, 6))
    plt.scatter(y_test, y_pred, alpha=0.5)
    plt.plot([min(y_test), max(y_test)], [min(y_test), max(y_test)], 'r--')
    plt.xlabel('Actual Mass')
    plt.ylabel('Predicted Mass')
    plt.title('Predicted vs Actual Particle Mass')
    plt.savefig('pred_vs_actual.png')

    return mae, r2, y_pred

In [None]:
# !pip install dask_ml

In [None]:
from dask_ml.model_selection import train_test_split

In [None]:

# Check for GPU and enable memory growth
print("Num GPUs Available:", len(tf.config.list_physical_devices('GPU')))
gpus = tf.config.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

# Try using Dask-cuDF for GPU acceleration
try:
    import cudf
    import dask_cudf
    use_gpu = True
    print("Using Dask-cuDF for GPU acceleration")
except ImportError:
    use_gpu = False
    print("Using CPU-based Dask")

def load_data(file_path):
    return dd.read_parquet(
        file_path,
        engine="pyarrow",
        blocksize="256MB",
        split_row_groups=True
    )

def main():
    file_path = "/content/drive/MyDrive/Colab Notebooks/top_gun_opendata_3.parquet"

    if not os.path.exists(file_path):
        print(f"File not found: {file_path}")
        return

    print(f"Loading dataset from {file_path}...")
    df = load_data(file_path)
    print("Dataset loaded successfully!")

    # Compute number of rows efficiently
    num_samples = df.shape[0].compute()
    print(f"Number of samples: {num_samples}")
    print(f"Columns: {df.columns.tolist()[:10]} ...")

    # Estimate missing values using 10% of the data
    sample_df = df.sample(frac=0.1, random_state=42)
    approx_missing_values = sample_df.isnull().sum().compute().sum() * 10
    print(f"Approximate Missing Values: {approx_missing_values}")

    # Extract features and target
    X, y = preprocess_data(df)

    # Split the data into train and test sets (80% train, 20% test)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=True)

    # Further split train set into training and validation (80% train, 20% validation)
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42, shuffle=True)

    # Convert Dask objects to NumPy/Pandas
    X_train, X_val, X_test, y_train, y_val, y_test = dask.compute(X_train, X_val, X_test, y_train, y_val, y_test)

    # Compute dataset sizes correctly
    train_samples, val_samples, test_samples = X_train.shape[0], X_val.shape[0], X_test.shape[0]
    total_samples = train_samples + val_samples + test_samples

    print(f"Training: {train_samples} samples ({train_samples / total_samples:.1%})")
    print(f"Validation: {val_samples} samples ({val_samples / total_samples:.1%})")
    print(f"Test: {test_samples} samples ({test_samples / total_samples:.1%})")

    # Build model
    input_shape = X_train.shape[1:]
    print(f"\nInput shape: {input_shape}")

    use_efficient_net = True  # Set to False to use custom CNN
    model = build_cnn_model(input_shape, use_efficient_net=use_efficient_net)
    model.summary()

    # Train model using GPU
    with tf.device('/GPU:0'):
        model, history = compile_and_train(model, X_train, y_train, X_val, y_val)

    # Plot training history
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model Loss')
    plt.ylabel('Loss (MSE)')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper right')

    plt.subplot(1, 2, 2)
    plt.plot(history.history['mae'])
    plt.plot(history.history['val_mae'])
    plt.title('Model MAE')
    plt.ylabel('MAE')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper right')

    plt.tight_layout()
    plt.savefig('training_history.png')

    # Evaluate model
    mae, r2, y_pred = evaluate_model(model, X_test, y_test)

    # Save the model
    model.save('particle_mass_regression_model.h5')
    print("Model saved as 'particle_mass_regression_model.h5'")

    return {
        'mae': mae,
        'r2': r2,
        'model': model
    }

if __name__ == "__main__":
    main()


Num GPUs Available: 1
Using Dask-cuDF for GPU acceleration
Loading dataset from /content/drive/MyDrive/Colab Notebooks/top_gun_opendata_3.parquet...
Dataset loaded successfully!
