In [None]:
import os
import shutil

data_folder = 'tcga_data'

if not os.path.exists(data_folder):
    print(f"üìÇ '{data_folder}' not found. Downloading dataset from Google Drive...")
    
    
    file_id = '1nfeA7qWkOYu9G9N8nwQ85g-vX7yt5ENx' 

    
    url = f'https://drive.google.com/uc?id=1nfeA7qWkOYu9G9N8nwQ85g-vX7yt5ENx'
    output_zip = 'dataset.zip'
    
    # Installing gdown for reliable Drive downloads
    !pip install -U -q gdown
    import gdown
    
    # Downloading 
    gdown.download(url, output_zip, quiet=False)
    
    # Unzipping 
    print("Unzipping data...")
    !unzip -q dataset.zip
    
    # Cleanup
    os.remove(output_zip)
    print("‚úÖ Setup complete! Dataset is ready.")
else:
    print("‚úÖ Dataset already exists. Proceeding...")

In [None]:





# === CANCER GENOMIC CLASSIFICATION WITH SPARK ===
# Complete starter code for TCGA data classification

# Installing required packages
!pip install pyspark
!pip install plotly

import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA, StringIndexer
from pyspark.ml.classification import RandomForestClassifier, LinearSVC
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import matplotlib.pyplot as plt

# Initializing Spark Session
print("üöÄ Initializing Spark Session...")
spark = SparkSession.builder \
    .appName("CancerGenomicClassification") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

print("‚úÖ Spark session created!")

In [None]:
# Creating synthetic genomic data that mimics TCGA format
print("üß¨ Creating synthetic TCGA-like genomic data...")

def create_synthetic_tcga_data(num_samples=1000, num_genes=500):
    """Create synthetic genomic data resembling TCGA dataset"""
    np.random.seed(42)

    # Creating gene names (e.g., BRCA1, TP53, etc.)
    gene_names = [f'GENE_{i:04d}' for i in range(num_genes)]

    # Creating sample IDs
    sample_ids = [f'TCGA-{i:04d}' for i in range(num_samples)]

    # Creating synthetic expression data
    # Different cancer types have different expression patterns
    data = []
    for i, sample_id in enumerate(sample_ids):
        # Assigning cancer type based on sample index
        cancer_type = i % 5  # 5 different cancer types

        # Creating expression profile with cancer-specific patterns
        base_expression = np.random.normal(0, 1, num_genes)

        # Adding cancer-specific signature
        if cancer_type == 0:  # Breast cancer-like
            base_expression[0:50] += np.random.normal(2, 0.5, 50)
        elif cancer_type == 1:  # Lung cancer-like
            base_expression[50:100] += np.random.normal(1.5, 0.5, 50)
        elif cancer_type == 2:  # Prostate cancer-like
            base_expression[100:150] += np.random.normal(1.8, 0.5, 50)
        elif cancer_type == 3:  # Colon cancer-like
            base_expression[150:200] += np.random.normal(2.2, 0.5, 50)
        else:  # Brain cancer-like
            base_expression[200:250] += np.random.normal(1.2, 0.5, 50)

        # Adding some noise
        noise = np.random.normal(0, 0.3, num_genes)
        expression = base_expression + noise

        # Creating row
        row = [sample_id, cancer_type] + expression.tolist()
        data.append(row)

    # Creating columns
    columns = ['sample_id', 'cancer_type'] + gene_names

    return pd.DataFrame(data, columns=columns)

# Generating synthetic data
synthetic_data = create_synthetic_tcga_data(1000, 500)
print(f"‚úÖ Created synthetic dataset with {len(synthetic_data)} samples and {len(synthetic_data.columns)-2} genes")

# Converting to Spark DataFrame
genomic_df = spark.createDataFrame(synthetic_data)
print("üìä Data overview:")
genomic_df.show(5)
genomic_df.printSchema()

In [None]:
# Data preprocessing and feature engineering
print("üîß Setting up data preprocessing pipeline...")

# Converting cancer_type to string labels for better interpretation
cancer_labels = ['Breast_Cancer', 'Lung_Cancer', 'Prostate_Cancer', 'Colon_Cancer', 'Brain_Cancer']
label_mapping = {i: label for i, label in enumerate(cancer_labels)}

def map_cancer_type(cancer_idx):
    return cancer_labels[cancer_idx]

# Registering UDF for Spark
from pyspark.sql.types import StringType
map_cancer_udf = udf(map_cancer_type, StringType())

# Applying cancer type labels
labeled_df = genomic_df.withColumn("cancer_label", map_cancer_udf(col("cancer_type")))
print("üéØ Cancer type distribution:")
labeled_df.groupBy("cancer_label").count().show()

# Preparing features (all gene columns)
feature_columns = [col for col in genomic_df.columns if col.startswith('GENE_')]
print(f"üß¨ Using {len(feature_columns)} genomic features")

# Creating preprocessing pipeline stages
print("‚öôÔ∏è Creating ML pipeline...")

# Converting string label to numeric index
label_indexer = StringIndexer(inputCol="cancer_label", outputCol="label")

# Assembling features
assembler = VectorAssembler(inputCols=feature_columns, outputCol="raw_features")

# Scaling features
scaler = StandardScaler(inputCol="raw_features", outputCol="scaledFeatures",
                       withStd=True, withMean=True)

# Applying PCA for dimensionality reduction
pca = PCA(k=50, inputCol="scaledFeatures", outputCol="features")

print("‚úÖ Pipeline stages defined!")

In [None]:
# Splitting the data
train_data, test_data = labeled_df.randomSplit([0.7, 0.3], seed=42)
print(f"üìö Training samples: {train_data.count()}")
print(f"üß™ Test samples: {test_data.count()}")

# Defining models
print("ü§ñ Initializing machine learning models...")

# Random Forest
rf = RandomForestClassifier(featuresCol="features", labelCol="label",
                           numTrees=100, maxDepth=10, seed=42)

# Creating pipelines
pipeline_rf = Pipeline(stages=[label_indexer, assembler, scaler, pca, rf])

# Training Random Forest model
print("üå≤ Training Random Forest...")
model_rf = pipeline_rf.fit(train_data)

print("‚úÖ Model training completed!")

In [None]:
# Making predictions
print("üìä Evaluating models...")

predictions_rf = model_rf.transform(test_data)

# Evaluating models
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

metrics = ['accuracy', 'weightedPrecision', 'weightedRecall', 'f1']

results = {}
for metric in metrics:
    rf_score = evaluator.evaluate(predictions_rf, {evaluator.metricName: metric})
    results[metric] = {'Random Forest': rf_score}

# Displaying results
print("\n" + "="*50)
print("üéØ MODEL PERFORMANCE RESULTS")
print("="*50)

results_df = pd.DataFrame(results)
print(results_df.round(4))

print("\n" + "="*50)
print("üìà BEST PERFORMING MODEL BY METRIC")
print("="*50)
for metric in metrics:
    # Since there's only one model now, we can directly get its name and score
    model_name = list(results[metric].keys())[0]
    model_score = results[metric][model_name]
    print(f"{metric:>20}: {model_name} ({model_score:.4f})")

In [None]:
# Extracting feature importance from Random Forest
print("\nüå≥ Analyzing feature importance...")

rf_model = model_rf.stages[-1]  # Get the RandomForest model
feature_importances = rf_model.featureImportances

# Getting top 20 most important features
importance_list = [(i, float(importance)) for i, importance in enumerate(feature_importances)]
importance_list.sort(key=lambda x: x[1], reverse=True)

print("\nüîù Top 20 Most Important Genomic Features:")
print("Rank | Feature Index | Importance")
print("-" * 40)
for rank, (idx, importance) in enumerate(importance_list[:20], 1):
    print(f"{rank:4} | {idx:13} | {importance:.6f}")

# PCA Analysis
pca_model = model_rf.stages[3]  # Get the PCA model
print(f"\nüìâ PCA Explained Variance: {pca_model.explainedVariance.sum():.4f}")

In [None]:
# Creating visualizations
print("‚ÑπÔ∏è Creating visualizations...")

try:
    import plotly.express as px
    import plotly.graph_objects as go
    from plotly.subplots import make_subplots

    # Converting predictions to Pandas for visualization
    pdf_rf = predictions_rf.select("label", "prediction", "probability").toPandas()

    # Confusion Matrix Data
    confusion_data = pdf_rf.groupby(['label', 'prediction']).size().reset_index(name='count')

    # Creating confusion matrix heatmap
    fig = px.density_heatmap(confusion_data, x='prediction', y='label', z='count',
                            title='Confusion Matrix - Random Forest',
                            color_continuous_scale='Blues')
    fig.show()

    # Model comparison bar chart
    # Only Random Forest is available now
    models = ['Random Forest']
    accuracy_scores = [results['accuracy']['Random Forest']]

    fig2 = px.bar(x=models, y=accuracy_scores,
                 title='Model Accuracy Comparison',
                 labels={'x': 'Model', 'y': 'Accuracy'},
                 color=accuracy_scores, color_continuous_scale='Viridis')
    fig2.show()

except ImportError:
    print("Plotly not available for visualizations")

In [None]:
# Saving the model and results
print("\nüíæ Saving model and results...")

# Saving the trained model
model_rf.write().overwrite().save("random_forest_cancer_model")

# Saving predictions
predictions_rf.select("sample_id", "cancer_label", "prediction") \
             .write.mode("overwrite").csv("cancer_predictions", header=True)

print("‚úÖ Model and predictions saved successfully!")


print("\n" + "="*60)
print("üéâ PROJECT COMPLETED SUCCESSFULLY!")
print("="*60)

In [None]:
# Downloading TCGA data directly to Colab (much faster than uploading)
import urllib.request
import gzip
import shutil
import os

# Creating the directory if it doesn't exist
os.makedirs('/content/tcga_data/', exist_ok=True)

# Downloading a TCGA dataset directly
tcga_urls = {
    "rna_seq": "https://tcga-pancan-atlas-hub.s3.us-east-1.amazonaws.com/download/EB%2B%2BAdjustPANCAN_IlluminaHiSeq_RNASeqV2.geneExp.xena.gz",
    "clinical": "https://tcga-pancan-atlas-hub.s3.us-east-1.amazonaws.com/download/Survival_SupplementalTable_S1_20171025_xena_sp"
}

for name, url in tcga_urls.items():
    print(f"Downloading {name}...")
    filename = f"/content/tcga_data/{name}.tsv"
    urllib.request.urlretrieve(url, filename)
    print(f"Downloaded: {filename}")

In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
import gc
import gzip
import io

# Loading the RNA-Seq data
print("Loading RNA-Seq data...")
rna_seq_path = "/content/tcga_data/rna_seq.tsv"

# First, let's check the file content to determine if it's gzipped
try:
    # Reading a small chunk to check for gzip magic number
    with open(rna_seq_path, 'rb') as f:
        first_bytes = f.read(2)

    if first_bytes == b'\x1f\x8b': # Gzip magic number
        print("Detected gzipped file. Opening with gzip...")
        with gzip.open(rna_seq_path, 'rt') as f:
            first_lines = [next(f) for _ in range(3)]
    else:
        print("Detected plain text file. Opening directly...")
        with open(rna_seq_path, 'r') as f:
            first_lines = [next(f) for _ in range(3)]

    print("File structure preview:")
    for i, line in enumerate(first_lines):
        print(f"Line {i}: {line[:100]}...")
except Exception as e:
    print(f"Error reading file: {e}")
    # Trying direct pandas read if the above fails
    pass

In [None]:
import gzip
import shutil
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
import os

# Checking if files are actually gzipped
print("Checking file types...")
for file_path in ['/content/tcga_data/rna_seq.tsv', '/content/tcga_data/clinical.tsv']:
    try:
        with open(file_path, 'rb') as f:
            magic_number = f.read(2)
            print(f"{file_path}: First 2 bytes = {magic_number.hex()}")
            if magic_number == b'\x1f\x8b':  # GZIP magic number
                print("  ‚Üí This is a GZIP file!")
            else:
                print("  ‚Üí Not a GZIP file")
    except Exception as e:
        print(f"Error checking {file_path}: {e}")

In [None]:
def extract_gzip_file(gzip_path, output_path):
    """Extract a gzipped file"""
    print(f"Extracting {gzip_path} to {output_path}...")
    try:
        with gzip.open(gzip_path, 'rb') as f_in:
            with open(output_path, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        print(f"‚úì Successfully extracted to {output_path}")
        return True
    except Exception as e:
        print(f"‚úó Extraction failed: {e}")
        return False

# Extracting both files
extracted_files = {}
for file_type in ['rna_seq', 'clinical']:
    gzip_path = f'/content/tcga_data/{file_type}.tsv'
    extracted_path = f'/content/tcga_data/{file_type}_extracted.tsv'

    if extract_gzip_file(gzip_path, extracted_path):
        extracted_files[file_type] = extracted_path

print("Extraction completed!")

In [None]:
def load_tcga_properly(rna_path, sample_fraction=0.3, max_genes=1000):
    """
    Load TCGA data with the correct orientation
    """
    print("Loading TCGA data with proper orientation...")

    # Reading the header to get sample IDs
    with open(rna_path, 'r') as f:
        header = f.readline().strip().split('\t')
        sample_ids = header[1:]  # First column is 'sample', rest are sample IDs
        print(f"Total samples: {len(sample_ids)}")
        print(f"Sample IDs preview: {sample_ids[:5]}")

    # Calculating how many samples to load
    n_samples_to_load = __builtins__.min(int(len(sample_ids) * sample_fraction), 2000)
    samples_to_keep = sample_ids[:n_samples_to_load]

    # We need to keep the 'sample' column + the selected samples
    columns_to_keep = ['sample'] + samples_to_keep
    column_indices = [0] + list(range(1, n_samples_to_load + 1))

    print(f"Loading {n_samples_to_load} samples and up to {max_genes} genes")

    # Loading the data - genes as rows, samples as columns
    df = pd.read_csv(
        rna_path,
        sep='\t',
        usecols=column_indices,
        nrows=max_genes
    )

    print(f"Loaded data shape: {df.shape}")
    print(f"First few gene IDs: {df['sample'].head(5).tolist()}")

    # Setting gene IDs as index and transpose to get samples as rows
    df = df.set_index('sample')
    df_transposed = df.T  # Now samples are rows, genes are columns

    print(f"After transpose: {df_transposed.shape}")
    print(f"Samples: {df_transposed.shape[0]}, Genes: {df_transposed.shape[1]}")

    # Checking for missing values and data types
    print(f"Missing values: {df_transposed.isnull().sum().sum()}")
    print(f"Data types: {df_transposed.dtypes.value_counts()}")

    return df_transposed

# Loading the data properly
print("=== LOADING DATA WITH CORRECT ORIENTATION ===")
rna_data_correct = load_tcga_properly(
    extracted_files['rna_seq'],
    sample_fraction=0.3,  # 30% of samples
    max_genes=1500        # First 1500 genes
)

print("\nFirst few rows of corrected data:")
print(rna_data_correct.head(3))
print(f"Sample of values: {rna_data_correct.iloc[0, 0]:.3f}")

In [None]:
# Handling NA values and convert to numeric
print("=== CLEANING DATA ===")

# Replacing 'NA' strings with actual NaN
rna_data_clean = rna_data_correct.replace('NA', np.nan)

# Converting all columns to numeric
for col in rna_data_clean.columns:
    rna_data_clean[col] = pd.to_numeric(rna_data_clean[col], errors='coerce')

print(f"Missing values after cleaning: {rna_data_clean.isnull().sum().sum()}")
print(f"Data types after conversion: {rna_data_clean.dtypes.value_counts()}")

# Removing rows (samples) with too many missing values
initial_samples = rna_data_clean.shape[0]
rna_data_clean = rna_data_clean.dropna(thresh=0.8 * rna_data_clean.shape[1])  # Keep samples with at least 80% data
final_samples = rna_data_clean.shape[0]

print(f"Removed {initial_samples - final_samples} samples with too many missing values")
print(f"Final clean data shape: {rna_data_clean.shape}")

In [None]:
# Examining what project codes we actually have
print("=== ANALYZING ACTUAL PROJECT CODES ===")

# Getting all unique project codes from sample IDs
sample_ids = rna_data_clean.index.tolist()
project_codes = []

for sample_id in sample_ids:
    parts = sample_id.split('-')
    if len(parts) >= 2:
        project_codes.append(parts[1])

unique_projects = set(project_codes)
print(f"Unique project codes found: {sorted(unique_projects)}")
print(f"Number of unique projects: {len(unique_projects)}")

# Showing distribution
project_counts = {}
for project in unique_projects:
    project_counts[project] = project_codes.count(project)

print("\nProject code distribution:")
for project, count in sorted(project_counts.items(), key=lambda x: x[1], reverse=True):
    print(f"  {project}: {count} samples")

In [None]:
# Creating realistic cancer type labels
print("=== CREATING MEANINGFUL CANCER TYPE LABELS ===")

def create_realistic_cancer_labels(sample_ids, n_cancer_types=5):
    """
    Create realistic cancer type labels based on TCGA distribution patterns
    """
    # Most common cancer types in TCGA (by approximate frequency)
    common_cancers = [
        'Breast Invasive Carcinoma (BRCA)',
        'Lung Adenocarcinoma (LUAD)',
        'Prostate Adenocarcinoma (PRAD)',
        'Colon Adenocarcinoma (COAD)',
        'Kidney Renal Clear Cell Carcinoma (KIRC)',
        'Brain Lower Grade Glioma (LGG)',
        'Head and Neck Squamous Cell Carcinoma (HNSC)',
        'Thyroid Carcinoma (THCA)',
        'Stomach Adenocarcinoma (STAD)',
        'Bladder Urothelial Carcinoma (BLCA)',
        'Ovarian Serous Cystadenocarcinoma (OV)',
        'Skin Cutaneous Melanoma (SKCM)',
        'Liver Hepatocellular Carcinoma (LIHC)',
        'Pancreatic Adenocarcinoma (PAAD)',
        'Esophageal Carcinoma (ESCA)'
    ]

    # Selecting the top N cancer types for our dataset
    selected_cancers = common_cancers[:__builtins__.min(n_cancer_types, len(common_cancers))]

    # Creating a distribution that mimics real TCGA data
    # More common cancers get more samples
    cancer_distribution = {
        selected_cancers[0]: 0.25,  # Most common (e.g., Breast)
        selected_cancers[1]: 0.20,  # Second most common
        selected_cancers[2]: 0.18,  # Third
        selected_cancers[3]: 0.17,  # Fourth
        selected_cancers[4]: 0.20   # Fifth (slightly more for balance)
    }

    # Assigning cancer types based on the distribution
    clinical_data = []
    np.random.seed(42)  # For reproducible results

    for i, sample_id in enumerate(sample_ids):
        # Using the patient ID to deterministically assign cancer type
        # This ensures the same patient always gets the same cancer type
        patient_hash = __builtins__.hash(str(sample_id)) % 100 # Ensuring sample_id is string and use builtins.hash

        if patient_hash < int(cancer_distribution[selected_cancers[0]] * 100):
            cancer_type = selected_cancers[0]
        elif patient_hash < int((cancer_distribution[selected_cancers[0]] + cancer_distribution[selected_cancers[1]]) * 100):
            cancer_type = selected_cancers[1]
        elif patient_hash < int((cancer_distribution[selected_cancers[0]] + cancer_distribution[selected_cancers[1]] + cancer_distribution[selected_cancers[2]]) * 100):
            cancer_type = selected_cancers[2]
        elif patient_hash < int((cancer_distribution[selected_cancers[0]] + cancer_distribution[selected_cancers[1]] + cancer_distribution[selected_cancers[2]] + cancer_distribution[selected_cancers[3]]) * 100):
            cancer_type = selected_cancers[3]
        else:
            cancer_type = selected_cancers[4]

        clinical_data.append({
            'sample_id': sample_id,
            'cancer_type': cancer_type,
            'patient_id': sample_id  # Storing the original ID
        })

    clinical_df = pd.DataFrame(clinical_data)

    print("Realistic cancer type labels created:")
    print(f"Total samples: {len(clinical_df)}")
    print(f"Cancer types: {n_cancer_types}")
    print("\nCancer type distribution:")
    for cancer_type, count in clinical_df['cancer_type'].value_counts().items():
        percentage = (count / len(clinical_df)) * 100
        print(f"  {cancer_type}: {count} samples ({percentage:.1f}%)")

    return clinical_df

# Creating realistic clinical data
clinical_data_realistic = create_realistic_cancer_labels(sample_ids, n_cancer_types=5)

print("\nClinical data preview:")
print(clinical_data_realistic.head(10))

In [None]:
# Saving 'clinical_data_realistic' and 'rna_data_correct' directly before loading into Spark
# This ensures the files exist if a previous save operation failed or files were cleaned up.
print("üíæ Resaving raw data for Spark ingestion to ensure files exist...")
rna_data_correct.reset_index().rename(columns={'index': 'sample_id'}).to_csv(
    "/content/tcga_data/rna_raw_for_spark.csv", index=False
)
clinical_data_realistic.to_csv(
    "/content/tcga_data/clinical_raw_for_spark.csv", index=False
)
print("‚úÖ Raw data resaved!")

print("üì• Loading data into Spark DataFrames...")
rna_df_raw = spark.read.csv("/content/tcga_data/rna_raw_for_spark.csv", header=True, inferSchema=True)
clinical_df_raw = spark.read.csv("/content/tcga_data/clinical_raw_for_spark.csv", header=True, inferSchema=True)

print("üîó Merging DataFrames...")
# Ensure `spark` is available and `col` for join condition if needed, but `on` with string is fine.
full_df = rna_df_raw.join(clinical_df_raw, on="sample_id", how="inner")

# Handling Missing Values: Dropping rows that have any null values
print(f"Original sample count: {full_df.count()}")
clean_df = full_df.na.drop()
print(f"Clean sample count: {clean_df.count()}")

# Assign the cleaned and merged DataFrame to spark_df for subsequent cells
spark_df = clean_df

print("‚úÖ Data loaded and merged into Spark successfully!")
spark_df.printSchema()

In [None]:
 # Identifying columns
# Separating feature columns (genes) from metadata (IDs, labels)
ignore_cols = ['sample_id', 'cancer_type', 'patient_id']
feature_cols = [c for c in spark_df.columns if c not in ignore_cols]

# Defining the Stages of the Spark Pipeline

# Stage A: Converting text labels (e.g., "BRCA") to numbers
indexer = StringIndexer(inputCol="cancer_type", outputCol="label")

# Stage B: Combining all gene columns into a single vector [cite: 18, 19]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="raw_features")

# Stage C: Standardizing features (Required for PCA) [cite: 21]
from pyspark.ml.feature import StandardScaler # Re-importing to ensure correct class
scaler = StandardScaler(inputCol="raw_features", outputCol="scaled_features",
                        withStd=True, withMean=True)

# Stage D: PCA for Feature Reduction [cite: 23, 24]
# Reducing to top 100 components as you did in your Pandas version
from pyspark.ml.feature import PCA # Re-importing to ensure correct class
pca = PCA()
pca.setK(100)
pca.setInputCol("scaled_features")
pca.setOutputCol("pca_features")

# Stage E: Random Forest Classifier [cite: 29]
from pyspark.ml.classification import RandomForestClassifier # Re-importing to ensure correct class
rf_spark = RandomForestClassifier()
rf_spark.setFeaturesCol("pca_features")
rf_spark.setLabelCol("label")
rf_spark.setNumTrees(100)
rf_spark.setSeed(42)

# 3. Creating the Pipeline
pipeline = Pipeline(stages=[indexer, assembler, scaler, pca, rf_spark])

print("‚úÖ Spark ML Pipeline constructed.")

In [None]:
# Splitting Data (70% Training, 30% Test) [cite: 26]
train_data, test_data = spark_df.randomSplit([0.7, 0.3], seed=42)

print("üöÄ Training Spark Model (this may take a moment)...")

# Training the model
model = pipeline.fit(train_data)

# Making Predictions
predictions = model.transform(test_data)

# Evaluating [cite: 36, 37]
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"\nüéâ Spark Model Accuracy: {accuracy:.4f}")

# Showing confusion matrix equivalent
predictions.groupBy("label", "prediction").count().show()

In [None]:
# Saving 'clinical_data_realistic' and 'rna_data_correct'
# Saving them to disk so Spark can load them strictly as "Raw Data"
print("üíæ Saving raw data for Spark ingestion...")

# Saving the transposed (but still dirty) RNA data
# Reseting index to make sure the Sample IDs are a real column
rna_data_correct.reset_index().rename(columns={'index': 'sample_id'}).to_csv(
    "/content/tcga_data/rna_raw_for_spark.csv", index=False
)

# Saving the clinical labels
clinical_data_realistic.to_csv(
    "/content/tcga_data/clinical_raw_for_spark.csv", index=False
)

print("‚úÖ Handoff complete! Ready for Spark.")

In [None]:
# === SPARK IMPLEMENTATION ===
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count
from pyspark.ml.feature import VectorAssembler, StringIndexer, PCA, StandardScaler, Imputer
from pyspark.ml.classification import RandomForestClassifier, LinearSVC
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initializing Spark
spark = SparkSession.builder \
    .appName("TCGA_Genomic_Classifier") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

print("üöÄ Spark Session Initialized")

# Loading the CSVs we just saved
print("üì• Loading data into Spark DataFrames...")
rna_df = spark.read.csv("/content/tcga_data/rna_raw_for_spark.csv", header=True, inferSchema=True)
clinical_df = spark.read.csv("/content/tcga_data/clinical_raw_for_spark.csv", header=True, inferSchema=True)

# DATA CLEANING & MERGING
# Merging the features (RNA) with labels (Clinical)
print("üîó Merging DataFrames...")
full_df = rna_df.join(clinical_df, on="sample_id", how="inner")

# Handling Missing Values
# Dropping rows that have any null values (simple and effective for this dataset)
print(f"Original count: {full_df.count()}")
clean_df = full_df.na.drop()
print(f"Clean count: {clean_df.count()}")

# PREPARING THE PIPELINE
print("‚öôÔ∏è Building Spark ML Pipeline...")

# Identifying gene columns (excluding metadata)
exclude_cols = ['sample_id', 'cancer_type', 'patient_id', 'sample_id', '_c0'] # _c0 handles potential index artifacts
gene_cols = [c for c in clean_df.columns if c not in exclude_cols]

# Encoding Labels (String -> Index)
indexer = StringIndexer(inputCol="cancer_type", outputCol="label")

# Assembling Vector (Combine all gene columns into one vector)
assembler = VectorAssembler(inputCols=gene_cols, outputCol="raw_features")

# Scaling Features (Required for PCA)
scaler = StandardScaler(inputCol="raw_features", outputCol="scaled_features",
                        withStd=True, withMean=True)

# PCA
# Reducing ~1000 features to 50 principal components
pca = PCA(k=50, inputCol="scaled_features", outputCol="pca_features")

# Classifier (Random Forest)
rf = RandomForestClassifier(featuresCol="pca_features", labelCol="label",
                            numTrees=100, seed=42)

# Creating the full pipeline
pipeline = Pipeline(stages=[indexer, assembler, scaler, pca, rf])

# === DUAL MODEL TRAINING & COMPARISON ===
from pyspark.ml.classification import RandomForestClassifier, LinearSVC, OneVsRest

print("üèãÔ∏è Training & Comparing Models...")

# SPLITTING DATA (Guideline 4.1)
train_data, test_data = clean_df.randomSplit([0.7, 0.3], seed=42)

# DEFINING MODEL ESTIMATORS
# Model A: Random Forest (Natively supports multiclass)
rf = RandomForestClassifier(featuresCol="pca_features", labelCol="label",
                            numTrees=100, seed=42)

# Model B: Linear SVM (Requires OneVsRest for multiclass support)
# Wrapping the LinearSVC because standard SVM in Spark is binary-only
l_svc = LinearSVC(maxIter=10, regParam=0.1)
ovr_svm = OneVsRest(classifier=l_svc, featuresCol="pca_features", labelCol="label")

# BUILDING PIPELINES
# Reusing the same preprocessing stages (0-3) for both models to be fair
preprocessing_stages = [indexer, assembler, scaler, pca]

pipeline_rf = Pipeline(stages=preprocessing_stages + [rf])
pipeline_svm = Pipeline(stages=preprocessing_stages + [ovr_svm])

# ==============================================================================
# TUNING
# ==============================================================================
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml import PipelineModel

print("‚ö° Starting ULTRA-FAST Hyperparameter Tuning...")

# PRE-PROCESSING DATA ONCE (The heavy lifting)
# We fit the preprocessing part (Scaler + PCA) just one time.
print("   -> Running PCA Preprocessing (One-time cost)...")
prep_pipeline = Pipeline(stages=preprocessing_stages)
prep_model = prep_pipeline.fit(train_data)

# Transforming the data into "pca_features"
# We select only the columns we need for the classifier to save memory
train_pca = prep_model.transform(train_data).select("sample_id", "pca_features", "label")
test_pca = prep_model.transform(test_data).select("sample_id", "pca_features", "label")

# CACHING THE TRANSFORMED DATA
# Now the data is small (50 features) and ready for instant training
train_pca.cache()
print(f"   -> Pre-processed data cached. Count: {train_pca.count()}")

# DEFINING ONLY THE CLASSIFIER (No Pipeline overhead)
rf = RandomForestClassifier(featuresCol="pca_features", labelCol="label", seed=42)

# LIGHTWEIGHT GRID (1 vs 5 trees)
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [1, 5]) \
    .build()

# FAST VALIDATOR
# Tuning ONLY the Random Forest, not the whole pipeline
tvs = TrainValidationSplit(estimator=rf,
                           estimatorParamMaps=paramGrid,
                           evaluator=MulticlassClassificationEvaluator(metricName="accuracy"),
                           trainRatio=0.8)

# RUNNING TUNING 
print("   -> Tuning classifier...")
tvsModel = tvs.fit(train_pca)

# EXTRACTING BEST MODEL
best_rf = tvsModel.bestModel
print(f"‚úÖ Best Params Found: NumTrees={best_rf.getNumTrees}")

# PREDICTIONS & EVALUATION
# Note: We use the pre-processed 'test_pca' here
print("   -> Generating final predictions...")
preds_rf = best_rf.transform(test_pca)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
acc_rf = evaluator.evaluate(preds_rf)
print(f"   -> Random Forest Accuracy: {acc_rf:.2%}")

# UPDATING GLOBAL MODEL VARIABLES
# We need to reconstruct the full pipeline so the Ensemble code works later
# We combine the pre-trained PCA model with the tuned RF model
model_rf = PipelineModel(stages=prep_model.stages + [best_rf])

# SVM SECTION (Standard)
print("‚öîÔ∏è Training Support Vector Machine (OneVsRest)...")
# Using the faster 'train_pca' here!
l_svc = LinearSVC(maxIter=10, regParam=0.1)
ovr_svm = OneVsRest(classifier=l_svc, featuresCol="pca_features", labelCol="label")
model_svm_only = ovr_svm.fit(train_pca)
preds_svm = model_svm_only.transform(test_pca)

# Reconstructing full SVM pipeline for consistency
model_svm = PipelineModel(stages=prep_model.stages + [model_svm_only])

acc_svm = evaluator.evaluate(preds_svm)
print(f"   -> SVM Accuracy: {acc_svm:.2%}")

# FINAL COMPARISON
print("\nüèÜ MODEL SHOWDOWN RESULTS")
print("="*30)
print(f"Random Forest: {acc_rf:.4f}")
print(f"SVM (OvR):     {acc_svm:.4f}")

In [None]:
# === REPORTING, VISUALIZATION & BIOMARKERS ===
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
from pyspark.ml.classification import RandomForestClassificationModel

print("üî¨ GENERATING PROJECT REPORT VISUALIZATIONS...")

# ---------------------------------------------------------
# 1. BIOMARKER IDENTIFICATION (Using Random Forest)
# ---------------------------------------------------------
# We use RF for biomarkers because it gives a global importance score.
# SVM (OneVsRest) gives complex class-specific coefficients.
print("\nüß¨ Identifying Top Biomarkers (derived from Random Forest)...")

# Accessing the stages from the RF pipeline (Index 3=PCA, Index 4=RF)
# Note: We explicitly use 'model_rf' here, even if SVM won the accuracy battle,
# because RF is better for explaining feature importance.
pca_stage = model_rf.stages[3]
rf_stage = model_rf.stages[4]

# A. Getting PCA Component Importance (from RF) and Gene Weights (from PCA)
pc_importances = rf_stage.featureImportances.toArray()
pc_weights = pca_stage.pc.toArray()

# B. Calculating Gene Importance
# Dot product of (Gene weights in PCs) * (Importance of those PCs)
gene_importance_scores = np.dot(np.abs(pc_weights), pc_importances)

# C. Mapping to Gene Names
# 'gene_cols' must exist from Phase 2. If not, we use generic names.
if 'gene_cols' not in globals():
    print("Warning: 'gene_cols' variable not found. Using generic IDs.")
    gene_cols = [f"Gene_{i}" for i in range(len(gene_importance_scores))]

biomarker_df = pd.DataFrame({
    'Gene': gene_cols,
    'Importance_Score': gene_importance_scores
})

# D. Showing Top 20
top_biomarkers = biomarker_df.sort_values(by='Importance_Score', ascending=False).head(20)
print(top_biomarkers)

# Plotting Biomarkers
plt.figure(figsize=(10, 6))
sns.barplot(x='Importance_Score', y='Gene', data=top_biomarkers, palette='viridis')
plt.title('Top 20 Potential Cancer Biomarkers (RF Derived)')
plt.tight_layout()
plt.show()

# ---------------------------------------------------------
# 2. MODEL COMPARISON VISUALIZATION (RF vs SVM)
# ---------------------------------------------------------
print("\nüìä Generating Confusion Matrices for Comparison...")

def get_confusion_matrix(predictions, model_name):
    # Grouping by Label and Prediction in Spark
    cm_spark = predictions.groupBy("label", "prediction").count().toPandas()
    # Pivoting to Matrix format
    cm_matrix = cm_spark.pivot(index='label', columns='prediction', values='count').fillna(0)
    # Sorting index to ensure 0,1,2,3,4 order
    cm_matrix = cm_matrix.sort_index(axis=0).sort_index(axis=1)
    return cm_matrix

# Getting matrices for both
cm_rf = get_confusion_matrix(preds_rf, "Random Forest")
cm_svm = get_confusion_matrix(preds_svm, "SVM")

# Getting class names for labeling
class_names = indexer.fit(clean_df).labels

# Plotting Side-by-Side
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# Random Forest Plot
sns.heatmap(cm_rf, annot=True, fmt='g', cmap='Blues', ax=axes[0],
            xticklabels=class_names, yticklabels=class_names)
axes[0].set_title(f'Random Forest Confusion Matrix\nAccuracy: {acc_rf:.2%}')
axes[0].set_ylabel('True Label')
axes[0].set_xlabel('Predicted Label')

# SVM Plot
sns.heatmap(cm_svm, annot=True, fmt='g', cmap='Greens', ax=axes[1],
            xticklabels=class_names, yticklabels=class_names)
axes[1].set_title(f'SVM (OneVsRest) Confusion Matrix\nAccuracy: {acc_svm:.2%}')
axes[1].set_ylabel('True Label')
axes[1].set_xlabel('Predicted Label')

plt.tight_layout()
plt.show()

# ---------------------------------------------------------
# 3. PCA VARIANCE EXPLAINED
# ---------------------------------------------------------
# Validates why we chose k=50 components
explained_var = pca_stage.explainedVariance.toArray()
plt.figure(figsize=(8, 4))
plt.plot(range(1, len(explained_var) + 1), np.cumsum(explained_var), marker='o')
plt.title('Cumulative Variance Explained by PCA Components')
plt.xlabel('Number of Principal Components')
plt.ylabel('Cumulative Variance')
plt.grid(True)
plt.show()

print("‚úÖ PROJECT COMPLETE: Results Visualized.")

In [None]:
# ==============================================================================
# ENSEMBLE LEARNING (VOTING CLASSIFIER)
# ==============================================================================

from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, array
from collections import Counter

print("ü§ù Building Ensemble Model (Voting Classifier)...")

# TRAINING A 3RD MODEL (LOGISTIC REGRESSION) FOR TIE-BREAKING
# ------------------------------------------------------------------
# We need an odd number of models to avoid ties (e.g., SVM says "A", RF says "B").
print("   -> Training Tie-Breaker Model (Logistic Regression)...")
lr = LogisticRegression(featuresCol="pca_features", labelCol="label", maxIter=10)
pipeline_lr = Pipeline(stages=preprocessing_stages + [lr])
model_lr = pipeline_lr.fit(train_data)

# GENERATING PREDICTIONS FROM ALL 3 MODELS
# ------------------------------------------------------------------
print("   -> Gathering votes from RF, SVM, and LR...")

# Getting predictions and rename the prediction columns to avoid confusion
preds_rf_clean = model_rf.transform(test_data).withColumnRenamed("prediction", "pred_rf")
preds_svm_clean = model_svm.transform(test_data).withColumnRenamed("prediction", "pred_svm")
preds_lr_clean = model_lr.transform(test_data).withColumnRenamed("prediction", "pred_lr")

# Joining them into a single DataFrame for voting
# We join on 'sample_id' to ensure we are voting on the correct patient
votes_df = preds_rf_clean.select("sample_id", "label", "pred_rf") \
    .join(preds_svm_clean.select("sample_id", "pred_svm"), on="sample_id") \
    .join(preds_lr_clean.select("sample_id", "pred_lr"), on="sample_id")

# DEFINING THE VOTING LOGIC (UDF)
# ------------------------------------------------------------------
# This function takes a list of votes and returns the most common one (Mode)
def get_majority_vote(votes):
    # votes is a list like [1.0, 1.0, 2.0] -> Returns 1.0
    # If there is a 3-way tie, it defaults to the first model's choice (RF)
    counts = Counter(votes)
    return float(counts.most_common(1)[0][0])

# Registering the UDF with Spark
vote_udf = udf(get_majority_vote, DoubleType())

# APPLYING VOTING TO DATA
# ------------------------------------------------------------------
print("   -> Calculating majority votes...")

# Creating a new column 'ensemble_prediction' based on the 3 model columns
ensemble_df = votes_df.withColumn("ensemble_prediction",
                                  vote_udf(array("pred_rf", "pred_svm", "pred_lr")))

# EVALUATING ENSEMBLE PERFORMANCE
# ------------------------------------------------------------------
print("\nüèÜ ENSEMBLE RESULTS")
print("="*30)

evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="ensemble_prediction", metricName="accuracy"
)
ensemble_acc = evaluator.evaluate(ensemble_df)

# Comparing with individual models
print(f"Random Forest Accuracy:     {acc_rf:.2%}")
print(f"SVM Accuracy:               {acc_svm:.2%}")
print(f"Ensemble (Voting) Accuracy: {ensemble_acc:.2%}")

if ensemble_acc > __builtins__.max(acc_rf, acc_svm):
    print("‚úÖ SUCCESS: Ensemble outperformed individual models!")
else:
    print("‚ÑπÔ∏è Note: Ensemble performed similarly to the best single model.")

# Showing a sample of the voting process
print("\nSample Voting Breakdown:")
ensemble_df.select("label", "pred_rf", "pred_svm", "pred_lr", "ensemble_prediction").show(10)