In [None]:
import os
import numpy as np
import librosa
import pandas as pd
import warnings
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, f1_score, classification_report
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import (Conv2D, MaxPooling2D, Dropout, Flatten, 
                                    Dense, Reshape, Bidirectional, LSTM, 
                                    InputLayer, BatchNormalization, Activation,
                                    Add, Lambda, Input, Concatenate)
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.utils import to_categorical
import tensorflow as tf
import csv
import pickle
from datetime import datetime
warnings.filterwarnings('ignore')

In [None]:
class AttentionLayer(tf.keras.layers.Layer):
    """Self-attention mechanism for driver-specific features"""
    def __init__(self, **kwargs):
        super(AttentionLayer, self).__init__(**kwargs)
        
    def build(self, input_shape):
        self.W = self.add_weight(name='att_weight', shape=(input_shape[-1], input_shape[-1]),
                                initializer='glorot_uniform', trainable=True)
        self.b = self.add_weight(name='att_bias', shape=(input_shape[-1],),
                                initializer='zeros', trainable=True)
        super(AttentionLayer, self).build(input_shape)
        
    def call(self, x):
        et = tf.nn.tanh(tf.tensordot(x, self.W, axes=1) + self.b)
        at = tf.nn.softmax(et, axis=1)
        output = x * at
        return tf.reduce_sum(output, axis=1)
        
    def compute_output_shape(self, input_shape):
        return (input_shape[0], input_shape[-1])

In [None]:
class FrequencyAttention(tf.keras.layers.Layer):
    """Frequency-wise attention for track features"""
    def __init__(self, **kwargs):
        super(FrequencyAttention, self).__init__(**kwargs)
        
    def build(self, input_shape):
        self.conv1 = Conv2D(1, (1, 1), activation='sigmoid')
        super(FrequencyAttention, self).build(input_shape)
        
    def call(self, x):
        attention = self.conv1(x)
        return x * attention
        
    def compute_output_shape(self, input_shape):
        return input_shape

In [None]:
def robust_csv_reader(csv_path):
    """Handle problematic CSV files with manual parsing"""
    data = []
    with open(csv_path, 'r', encoding='utf-8') as f:
        reader = csv.reader(f)
        header = next(reader)  # Read header
        for row in reader:
            if len(row) != len(header):
                # Handle rows with incorrect field count
                row = [field.strip() for field in ','.join(row).split(',')[:len(header)]]
            data.append(row)
    return pd.DataFrame(data, columns=header)

In [None]:
def extract_features(audio_path, sr=22050):
    """Simplified feature extraction with error handling"""
    try:
        y, sr = librosa.load(audio_path, sr=sr)
        features = {
            'mfcc': librosa.feature.mfcc(y=y, sr=sr, n_mfcc=20),
            'mel': librosa.power_to_db(librosa.feature.melspectrogram(y=y, sr=sr)),
            'chroma': librosa.feature.chroma_stft(y=y, sr=sr)
        }
        # Standardize feature lengths
        return {k: librosa.util.fix_length(v, size=128, axis=1).T for k, v in features.items()}
    except Exception as e:
        print(f"Error processing {audio_path}: {str(e)}")
        return None

In [None]:
def load_dataset(csv_path):
    """Load dataset with robust CSV handling and progress tracking"""
    try:
        # Try multiple CSV reading methods
        try:
            print("Attempting to read CSV with pandas...")
            df = pd.read_csv(csv_path)
        except Exception as pd_error:
            print(f"Pandas read failed ({str(pd_error)}), trying manual parser...")
            df = robust_csv_reader(csv_path)
        
        # Clean column names
        df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_')
        required_cols = ['team_name', 'driver_name', 'track_name', 'file_path']
        
        # Validate columns
        missing = [col for col in required_cols if col not in df.columns]
        if missing:
            raise ValueError(f"Missing required columns: {missing}")
        
        print(f"\nFound {len(df)} entries in CSV. Starting processing...")
        
        # Process files
        results = {'X_team': [], 'X_driver': [], 'X_track': [],
                   'y_team': [], 'y_driver': [], 'y_track': []}
        processed_files = 0
        skipped_files = 0
        
        for idx, row in df.iterrows():
            file_path = row['file_path']
            if not isinstance(file_path, str):
                print(f"Row {idx+1}: Invalid file path (not string)")
                skipped_files += 1
                continue
                
            if not os.path.exists(file_path):
                print(f"Row {idx+1}: File not found - {file_path}")
                skipped_files += 1
                continue
            
            print(f"Processing {idx+1}/{len(df)}: {os.path.basename(file_path)}")
            features = extract_features(file_path)
            
            if not features:
                skipped_files += 1
                continue
                
            results['X_team'].append(features['mfcc'])
            results['X_driver'].append(features['chroma'])
            results['X_track'].append(features['mel'])
            results['y_team'].append(row['team_name'])
            results['y_driver'].append(row['driver_name'])
            results['y_track'].append(row['track_name'])
            processed_files += 1
        
        # Conversion and validation
        for key in results:
            if key.startswith('X_'):
                results[key] = np.array(results[key])
                if results[key].ndim == 3:
                    results[key] = np.expand_dims(results[key], -1)
        
        print(f"\nProcessing complete!")
        print(f"Successfully processed: {processed_files} files")
        print(f"Skipped: {skipped_files} files")
        
        if len(results['X_team']) == 0:
            raise ValueError("No valid audio files processed")
            
        return results
        
    except Exception as e:
        print(f"\nDataset loading failed: {str(e)}")
        return None

In [None]:
def build_model(input_shape, num_classes):
    """Enhanced CNN-BiLSTM model architecture with task-specific improvements"""
    inputs = tf.keras.Input(shape=input_shape)
    
    # Common initial layers
    x = Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = MaxPooling2D((2, 2))(x)
    x = Dropout(0.25)(x)
    
    # Task-specific architectures
    if num_classes <= 10:  # Team model (assuming <=10 teams)
        # Enhanced team model with residual connections
        x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        
        # Residual block
        residual = x
        x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.add([residual, x])
        
        x = MaxPooling2D((2, 2))(x)
        x = Dropout(0.3)(x)
        x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        
    elif num_classes <= 30:  # Driver model (assuming <=30 drivers)
        # Hierarchical-aware driver model with attention
        x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = MaxPooling2D((2, 2))(x)
        x = Dropout(0.3)(x)
        
        # Attention mechanism
        attention = Conv2D(1, (1, 1), activation='sigmoid')(x)
        x = tf.keras.layers.Multiply()([x, attention])
        
        x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        
    else:  # Track model
        # Enhanced track model with frequency attention
        x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = MaxPooling2D((2, 2))(x)
        x = Dropout(0.3)(x)
        
        # Frequency attention block
        freq_attention = tf.keras.layers.GlobalAveragePooling2D()(x)
        freq_attention = Dense(input_shape[0], activation='softmax')(freq_attention)
        freq_attention = Reshape((input_shape[0], 1, 1))(freq_attention)
        x = tf.keras.layers.Multiply()([x, freq_attention])
        
        x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
    
    # Common final layers
    x = Reshape((-1, 128 if num_classes > 30 else 64))(x)
    x = Bidirectional(LSTM(128 if num_classes > 10 else 64, return_sequences=True))(x)
    
    if num_classes <= 30:  # For team and driver models
        x = tf.keras.layers.Attention()([x, x])
        x = tf.keras.layers.GlobalAveragePooling1D()(x)
    else:  # For track model
        x = tf.keras.layers.GlobalAveragePooling1D()(x)
    
    x = Dense(256 if num_classes > 30 else 128, activation='relu')(x)
    x = Dropout(0.4)(x)
    outputs = Dense(num_classes, activation='softmax')(x)
    
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    
    model.compile(optimizer='adam',
                 loss='categorical_crossentropy',
                 metrics=['accuracy'])
    return model

In [None]:
def train_model(X, y, label_type, epochs=75): 
    """Complete training pipeline"""
    # Encode labels
    le = LabelEncoder()
    y_enc = to_categorical(le.fit_transform(y))
    
    # Train/Val/Test split (60/20/20)
    X_train, X_test, y_train, y_test = train_test_split(
        X, y_enc, test_size=0.2, random_state=42, stratify=y)
    X_train, X_val, y_train, y_val = train_test_split(
        X_train, y_train, test_size=0.25, random_state=42, stratify=np.argmax(y_train, axis=1))
    
    # Verify splits
    print(f"\n{label_type.upper()} Data Split:")
    print(f"Train samples: {len(X_train)}")
    print(f"Validation samples: {len(X_val)}")
    print(f"Test samples: {len(X_test)}")
    
    # Build model
    model = build_model(X_train.shape[1:], y_train.shape[1])
    
    # Callbacks
    callbacks = [
        EarlyStopping(patience=15, restore_best_weights=True, monitor='val_loss'),
        ModelCheckpoint(f'best_{label_type}_model.keras', 
                       save_best_only=True,
                       monitor='val_loss',
                       mode='min'),
        ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6)
    ]
    
    # Training
    print(f"\nTraining {label_type} model...")
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=32,
        callbacks=callbacks,
        verbose=1
    )
    
    # Evaluation
    test_pred = model.predict(X_test)
    test_pred_classes = np.argmax(test_pred, axis=1)
    test_true_classes = np.argmax(y_test, axis=1)
    
    # Results
    test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
    results = {
        'accuracy': accuracy_score(test_true_classes, test_pred_classes),
        'f1_score': f1_score(test_true_classes, test_pred_classes, average='weighted'),
        'report': classification_report(test_true_classes, test_pred_classes, target_names=le.classes_),
        'model': model,
        'test_loss': test_loss,
        'test_accuracy': test_acc,
        'history': history.history
    }
    
    print(f"\n{label_type.upper()} Results:")
    print(f"Test Accuracy: {results['accuracy']:.4f}")
    print(f"F1 Score: {results['f1_score']:.4f}")
    print(f"Test Loss: {results['test_loss']:.4f}")
    print("\nClassification Report:")
    print(results['report'])
    
    return results

In [None]:
def main():
    # Configuration
    CSV_PATH = "/Volumes/Sandhya TB2/F1 MAIN/F1/f1_12race_dataset.csv" # path to the csv containing lap-wise metadata
    OUTPUT_DIR = "/Users/govindamadhavabs/Desktop/F1_newModels"        # path for the models and encoders to be saved
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    
    # Load data
    print("Loading dataset...")
    dataset = load_dataset(CSV_PATH)
    if not dataset:
        return
    
    # Initialize storage for results and histories
    all_results = {}
    training_histories = {}
    label_encoders = {}
    
    # Train models
    for label in ['team', 'driver', 'track']:
        X = dataset[f'X_{label}']
        y = dataset[f'y_{label}']
        
        if len(X) == 0:
            print(f"\nNo data available for {label}")
            continue
            
        print(f"\n{'='*40}")
        print(f"Training {label} model")
        print(f"Sample shape: {X[0].shape}")
        print(f"Total samples: {len(X)}")
        print(f"Classes: {len(set(y))}")
        
        # Train and store results
        results = train_model(X, y, label, epochs=75)
        all_results[label] = results
        training_histories[label] = results['model'].history.history
        
        # Save encoder
        le = LabelEncoder()
        le.fit(y)
        label_encoders[label] = le
        
        # Save model and features
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        model_path = os.path.join(OUTPUT_DIR, f"{label}_model_{timestamp}.keras")
        features_path = os.path.join(OUTPUT_DIR, f"{label}_features_{timestamp}.npy")
        
        results['model'].save(model_path)
        np.save(features_path, X)
        print(f"Saved {label} model to {model_path}")
        print(f"Saved {label} features to {features_path}")
    
    # Save training histories and encoders
    with open(os.path.join(OUTPUT_DIR, 'training_histories.pkl'), 'wb') as f:
        pickle.dump(training_histories, f)
    
    with open(os.path.join(OUTPUT_DIR, 'label_encoders.pkl'), 'wb') as f:
        pickle.dump(label_encoders, f)
    
    # Print final summary
    print("\n\n" + "="*50)
    print("FINAL MODEL SUMMARY".center(50))
    print("="*50)
    for label, results in all_results.items():
        print(f"\n{label.upper()} MODEL:")
        print(f"- Test Accuracy: {results['accuracy']:.4f}")
        print(f"- F1 Score: {results['f1_score']:.4f}")
        print(f"- Test Loss: {results['test_loss']:.4f}")
        print("- Class Distribution:")
        print(pd.Series(dataset[f'y_{label}']).value_counts())
    
    print("\nAll artifacts saved to:", os.path.abspath(OUTPUT_DIR))

if __name__ == "__main__":
    main()