In [None]:
import pandas as pd
import numpy as np
import re

In [None]:
from Bio.SeqUtils import gc_fraction
from Bio.Seq import Seq
from Bio.SeqUtils import MeltingTemp
from scipy.stats import entropy
import lightgbm as lgb
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import RandomizedSearchCV
from sklearn.model_selection import cross_val_score
from sklearn.multioutput import MultiOutputRegressor
import warnings
import json
from collections import Counter
warnings.filterwarnings('ignore')

In [None]:
train = pd.read_csv('train.csv') #import your training data

In [None]:
class CRISPRFeaturePipeline:
    def __init__(self):
        self.scaler = StandardScaler()
        self.feature_columns = None
        
    class CRISPRFeatureExtractor:
        def __init__(self, sequence):
            self.sequence = sequence.upper()
            self.guide = self.sequence[:-3]  # 20nt guide sequence
            self.pam = self.sequence[-3:]    # 3bp PAM
            
            # Define all possible dinucleotides
            self.all_dinucs = ['AA', 'AC', 'AG', 'AT', 'CA', 'CC', 'CG', 'CT', 
                              'GA', 'GC', 'GG', 'GT', 'TA', 'TC', 'TG', 'TT']
            
            # Cut site is typically between positions 17 and 18 of guide
            self.cut_site_index = len(self.guide) - 3
            
        def get_position_features(self):
            """One-hot encode guide sequence and PAM"""
            nucleotides = ['A', 'T', 'C', 'G']
            features = {}
            
            # One-hot encode guide sequence
            for i, nt in enumerate(self.guide):
                for n in nucleotides:
                    features[f'guide_pos_{i}_{n}'] = 1 if nt == n else 0
                    
            # One-hot encode PAM
            for i, nt in enumerate(self.pam):
                for n in nucleotides:
                    features[f'pam_pos_{i}_{n}'] = 1 if nt == n else 0
                    
            return features
        
        def get_cut_site_context(self):
            """Get features around cut site"""
            features = {}
            cut_start = max(0, self.cut_site_index - 3)
            cut_end = min(len(self.guide), self.cut_site_index + 4)
            cut_context = self.guide[cut_start:cut_end]
            
            nucleotides = ['A', 'T', 'C', 'G']
            for i, nt in enumerate(cut_context):
                pos = i - 3  # Position relative to cut site
                for n in nucleotides:
                    features[f'cut_site_{pos}_{n}'] = 1 if nt == n else 0
                    
            return features
        
        
        def get_sequence_features(self):
            """Get sequence features"""
            features = {}
            
            features['gc_content'] = gc_fraction(self.guide)
            
            for dinuc in self.all_dinucs:
                features[f'dinuc_{dinuc}'] = 0
                
            for i in range(len(self.guide)-1):
                dinuc = self.guide[i:i+2]
                features[f'dinuc_{dinuc}'] += 1
            
            return features
        
        def get_all_features(self):
            """Combine all features"""
            features = {}
            features.update(self.get_sequence_features())
            features.update(self.get_position_features())
            features.update(self.get_cut_site_context())
            return features
    
    def extract_features(self, df, fit_scaler=True):
        """
        Extract features from DataFrame and optionally scale them
        
        Args:
            df: DataFrame with 'Id' and 'GuideSeq' columns
            fit_scaler: Whether to fit the scaler (True for training, False for test)
        
        Returns:
            DataFrame with extracted features and original Id
        """
        # Extract features
        features_list = []
        ids = []
        
        for idx, row in df.iterrows():
            extractor = self.CRISPRFeatureExtractor(row['GuideSeq'])
            features = extractor.get_all_features()
            features_list.append(features)
            ids.append(row['Id'])
        
        # Convert to DataFrame
        feature_df = pd.DataFrame(features_list)
        feature_df['Id'] = ids
        
        # Store feature columns if not already stored
        if self.feature_columns is None:
            self.feature_columns = [col for col in feature_df.columns if col != 'Id']
        
        # Scale features
        if fit_scaler:
            scaled_features = self.scaler.fit_transform(feature_df[self.feature_columns])
        else:
            scaled_features = self.scaler.transform(feature_df[self.feature_columns])
            
        scaled_df = pd.DataFrame(scaled_features, columns=self.feature_columns)
        scaled_df['Id'] = ids
        
        return scaled_df
    
    def process_data(self, df, is_training=True):
        """
        Process data and merge with original DataFrame if training
        
        Args:
            df: Input DataFrame
            is_training: Whether this is training data (True) or test data (False)
        
        Returns:
            Processed DataFrame
        """
        # Extract and scale features
        features_df = self.extract_features(df, fit_scaler=is_training)
        
        if is_training:
            # For training data, merge with original targets
            targets = ['Fraction_Insertions', 'Avg_Deletion_Length',
                      'Indel_Diversity', 'Fraction_Frameshifts']
            result_df = pd.merge(features_df, 
                               df[['Id'] + targets],
                               on='Id')
        else:
            # For test data, just return features
            result_df = features_df
            
        print(f"Processed {len(result_df)} sequences")
        print(f"Total features: {len(self.feature_columns)}")
        print(f"Missing values: {result_df.isnull().sum().sum()}")
        
        return result_df

In [None]:
# Initialize pipeline
pipeline = CRISPRFeaturePipeline()

# Process training data
train_processed = pipeline.process_data(train_df, is_training=True)

In [None]:
train_processed.head()

In [None]:
# train_processed is the DataFrame
train_processed = train_processed.set_index('Id').reset_index()

# OR alternatively:
cols = ['Id'] + [col for col in train_processed.columns if col != 'Id']
train_processed = train_processed[cols]

In [None]:
train_processed.head()

In [None]:
# Define targets
targets = ['Fraction_Insertions', 'Avg_Deletion_Length', 
           'Indel_Diversity', 'Fraction_Frameshifts']

# Get feature columns
feature_cols = [col for col in train_processed.columns 
               if col not in targets + ['Id']]

# Split data
X = train_processed[feature_cols]
y = train_processed[targets]

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# Model parameters
params = {
    'objective': 'regression',
    'n_estimators': 1000,
    'learning_rate': 0.01,
    'max_depth': 6,
    'num_leaves': 31,
    'min_child_samples': 20,
    'subsample': 0.8,
    'colsample_bytree': 0.8,
    'random_state': 42,
    'verbose': -1,
    'metric': 'r2'
}

# Train models for each target
models = {}
results = {}

for target in targets:
   print(f"\nTraining model for {target}")
   
   model = lgb.LGBMRegressor(**params)
   
   # Cross validation
   scores = cross_val_score(model, X_train, y_train[target], cv=5, scoring='r2')
   print(f"CV R² scores: {np.mean(scores):.4f} ± {np.std(scores):.4f}")
   
   # Train final model
   model.fit(
       X_train, 
       y_train[target],
       eval_set=[(X_train, y_train[target]), (X_val, y_val[target])],
       eval_metric='r2'
   )
   
   # Get R2 scores
   train_r2 = model.score(X_train, y_train[target])
   val_r2 = model.score(X_val, y_val[target])
   
   # Store results
   results[target] = {
       'train_r2': train_r2,
       'val_r2': val_r2,
       'cv_mean': np.mean(scores),
       'cv_std': np.std(scores)
   }
   
   models[target] = model

# Print summary of all results
print("\nSummary of model performance:")
for target, metrics in results.items():
   print(f"\n{target}:")
   print(f"Training R²: {metrics['train_r2']:.4f}")
   print(f"Validation R²: {metrics['val_r2']:.4f}")
   print(f"CV R²: {metrics['cv_mean']:.4f} ± {metrics['cv_std']:.4f}")

In [None]:
from sklearn.model_selection import GridSearchCV

# Define parameter grid
param_grid = {
    'n_estimators': [100,200,300, 500],
    'learning_rate': [0.01, 0.05, 0.1],
    'max_depth': [4, 6, 8],
    'num_leaves': [15, 31, 63],
    'min_child_samples': [10, 20, 30]
}

# Base model configuration
base_params = {
    'objective': 'regression',
    'random_state': 42,
    'verbose': -1,
    'metric': 'r2'
}

# Grid search for each target
models = {}
for target in targets:
    print(f"\nTuning model for {target}")
    
    # Initialize model
    model = lgb.LGBMRegressor(**base_params)
    
    # Grid search with cross validation
    grid_search = GridSearchCV(
        estimator=model,
        param_grid=param_grid,
        cv=5,
        scoring='r2',
        n_jobs=-1,
        verbose=1
    )
    
    # Fit grid search
    grid_search.fit(X_train, y_train[target])
    
    # Print results
    print(f"Best R² score: {grid_search.best_score_:.4f}")
    print("Best parameters:", grid_search.best_params_)
    
    # Train final model with best parameters
    final_model = lgb.LGBMRegressor(**base_params, **grid_search.best_params_)
    final_model.fit(
        X_train,
        y_train[target],
        eval_set=[(X_val, y_val[target])],
        eval_metric='r2'
    )
    
    models[target] = final_model


In [None]:
from sklearn.metrics import r2_score
from sklearn.ensemble import BaggingRegressor

# Create target-specific base models based on tuning results
base_models = {
    'Fraction_Insertions': lgb.LGBMRegressor(
        n_estimators=200,
        learning_rate=0.05,
        max_depth=6,
        num_leaves=15,
        min_child_samples=20,
        subsample=0.8,
        random_state=42,
        verbose=-1
    ),
    'Avg_Deletion_Length': lgb.LGBMRegressor(
        n_estimators=100,
        learning_rate=0.05,
        max_depth=4,
        num_leaves=15,
        min_child_samples=30,
        subsample=0.8,
        random_state=42,
        verbose=-1
    ),
    'Indel_Diversity': lgb.LGBMRegressor(
        n_estimators=300,
        learning_rate=0.01,
        max_depth=6,
        num_leaves=15,
        min_child_samples=10,
        subsample=0.8,
        random_state=42,
        verbose=-1
    ),
    'Fraction_Frameshifts': lgb.LGBMRegressor(
        n_estimators=300,
        learning_rate=0.01,
        max_depth=8,
        num_leaves=15,
        min_child_samples=30,
        subsample=0.8,
        random_state=42,
        verbose=-1
    )
}

# Create and train bagged models for each target
bagged_models = {}
for target, base_model in base_models.items():
    print(f"\nTraining bagged model for {target}")
    
    # Create bagging wrapper
    bagged_model = BaggingRegressor(
        base_estimator=base_model,
        n_estimators=15,  # increased from 10 for more robust ensemble
        max_samples=0.8,  # each bag uses 80% of samples
        max_features=0.8, # each bag uses 80% of features
        random_state=42,
        n_jobs=-1,
        verbose=1
    )
    
    # Train model
    bagged_model.fit(X_train, y_train[target])
    
    # Store model
    bagged_models[target] = bagged_model
    
    # Calculate validation score
    val_pred = bagged_model.predict(X_val)
    val_score = r2_score(y_val[target], val_pred)
    print(f"Validation R² score: {val_score:.4f}")

In [None]:
# Function to make predictions with all models
def predict_all(X):
    predictions = {}
    for target, model in bagged_models.items():
        predictions[target] = model.predict(X)
    return predictions

# Get validation predictions
val_predictions = predict_all(X_val)

In [None]:
# Print overall validation performance
print("\nOverall Validation Performance:")
for target in base_models.keys():
    score = r2_score(y_val[target], val_predictions[target])
    print(f"{target}: R² = {score:.4f}")

In [None]:
# Get feature names
feature_names = X_train.columns.tolist()

# For each target and model
for target, bagged_model in bagged_models.items():
    print(f"\nFeature Importance for {target}")
    
    # Get the number of features in the original dataset
    n_features = len(feature_names)
    
    # Initialize importance array
    importance_scores = np.zeros(n_features)
    
    # Count how many times each feature is used
    feature_counts = np.zeros(n_features)
    
    # Sum importance from all base estimators
    for estimator in bagged_model.estimators_:
        # Get the selected feature indices for this estimator
        feature_indices = bagged_model.estimators_features_[
            bagged_model.estimators_.index(estimator)
        ]
        
        # Add importance scores for used features
        importance_scores[feature_indices] += estimator.feature_importances_
        feature_counts[feature_indices] += 1
    
    # Average the scores (avoiding division by zero)
    feature_counts = np.maximum(feature_counts, 1)  # Avoid division by zero
    importance_scores /= feature_counts
    
    # Create and sort importance DataFrame
    importance_df = pd.DataFrame({
        'Feature': feature_names,
        'Importance': importance_scores
    })
    importance_df = importance_df.sort_values('Importance', ascending=False).head(20)
    importance_df['Importance_Percentage'] = (importance_df['Importance'] / importance_df['Importance'].sum() * 100).round(2)
    
    # Display results
    print(importance_df.to_string(index=False))

In [None]:
# Save all importances to a single Excel file with multiple sheets
with pd.ExcelWriter('all_feature_importances.xlsx') as writer:
   for target, bagged_model in bagged_models.items():
       n_features = len(X_train.columns)
       importance_scores = np.zeros(n_features)
       feature_counts = np.zeros(n_features)
       
       for estimator in bagged_model.estimators_:
           feature_indices = bagged_model.estimators_features_[
               bagged_model.estimators_.index(estimator)
           ]
           importance_scores[feature_indices] += estimator.feature_importances_
           feature_counts[feature_indices] += 1
       
       feature_counts = np.maximum(feature_counts, 1)
       importance_scores /= feature_counts
       
       importance_df = pd.DataFrame({
           'Feature': X_train.columns,
           'Importance': importance_scores
       }).sort_values('Importance', ascending=False)
       
       importance_df.to_excel(writer, sheet_name=target, index=False)