<a href="https://colab.research.google.com/github/anasFaleh/distributed-ML-Notbook/blob/main/DistributedML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports

In [12]:
import gradio as gr
import pandas as pd
import time
import os
from google.colab import drive # Added for Google Drive integration

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnull
from pyspark.sql.types import IntegerType, LongType, FloatType, DoubleType, BooleanType, DateType, TimestampType, StringType

from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, DecisionTreeRegressor
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator, ClusteringEvaluator
from pyspark.ml import Pipeline

# Spark Session

In [13]:
# Global dictionary to store performance metrics
performance_metrics = {}

try:
    spark = (SparkSession.builder
             .master("local[*]")
             .appName("DataProcessor")
             .config("spark.driver.memory", "2g")
             .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.1")
             .getOrCreate())
    print("‚úÖ Spark session is ready!")
except Exception as e:
    print(f"Failed to initialize Spark. Ensure Java is installed correctly. Error: {e}")
    exit()

‚úÖ Spark session is ready!


# Google Drive Mounting

In [14]:
def mount_gdrive():
    """
    Mounts Google Drive to the /content/drive directory.
    """
    try:
        drive.mount('/content/drive')
        return "‚úÖ Google Drive mounted successfully!"
    except Exception as e:
        return f"Failed to mount Google Drive. Error: {e}"
mount_gdrive() # Calling directly to ensure mount before listing files

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


'‚úÖ Google Drive mounted successfully!'

# Helper & Core Functions

In [29]:
def analyze_column_type(df, column_name):
    # Analyzes column data type in a Spark DataFrame
    try:
        dtype = df.schema[column_name].dataType

        if isinstance(dtype, (IntegerType, LongType)):
            return "integer"
        if isinstance(dtype, (FloatType, DoubleType)):
            return "numeric"
        if isinstance(dtype, BooleanType):
            return "boolean"
        if isinstance(dtype, (DateType, TimestampType)):
            return "datetime"

        if isinstance(dtype, StringType):
            # For strings, infer type from content
            sample = df.select(column_name).where(col(column_name).isNotNull()).limit(100).rdd.flatMap(lambda x: x).collect()
            if not sample:
                return "empty"

            numeric_count = sum(1 for value in sample if str(value).replace('.', '', 1).isdigit())
            numeric_ratio = numeric_count / len(sample)

            distinct_count = df.select(column_name).distinct().count()
            total_count = df.count()

            if numeric_ratio > 0.9:
                return "numeric"
            # This line was problematic as it was unreachable after the above return, removed for clarity if it was intended to infer categorical differently
            # elif distinct_count < total_count * 0.1 and distinct_count <= 50: # Arbitrary threshold for categorical
            #     return "categorical"
            else:
                return "text"

        return "unknown"

    except Exception as e:
        print(f"Error analyzing column type for {column_name}: {e}")
        return "unknown"

def get_file_summary(df, filename, column_info):
    # Generates a summary for the uploaded file
    n_rows, n_cols = df.count(), len(df.columns)

    stats = f"File Information:\n- Name: {filename}\n- Rows: {n_rows:,}\n- Columns: {n_cols}\n\n"
    stats += "Column Analysis:\n"

    for info in column_info:
        stats += f"- {info['name']}: `{info['type']}` (Missing: {info['missing_pct']:.1f}%) \n"

    stats += f"\nData Preview (First 5 Rows):\n"
    preview_df = df.limit(5).toPandas()
    stats += preview_df.to_markdown(index=False)

    return stats

def process_uploaded_file(file_input_or_path):
    # Reads an uploaded file, converts to Spark DataFrame, extracts stats
    # Accepts Gradio file or string path. Returns summary, df_spark, column_info, file_path
    if file_input_or_path is None:
        return "No file uploaded.", None, None, None

    file_path = None
    filename = None

    if isinstance(file_input_or_path, str):
        # Google Drive path or direct path string
        file_path = file_input_or_path
        filename = os.path.basename(file_path)
    elif hasattr(file_input_or_path, 'name'):
        # Gradio File object
        file_path = file_input_or_path.name
        filename = os.path.basename(file_path)
    else:
        return "Invalid file input type.", None, None, None


    print(f"Processing file: {filename} from path: {file_path}")

    df_spark = None
    try:
        if file_path.startswith('/content/drive/') or file_path.startswith('gs://'): # Check for GDrive or GCS paths
            # Use Spark readers for Google Drive paths
            if filename.lower().endswith('.csv'):
                df_spark = spark.read.option("header", "true").csv(file_path)
            elif filename.lower().endswith(('.xlsx', '.xls')):
                df_spark = spark.read.format("com.crealytics.spark.excel").option("header", "true").load(file_path)
            else:
                return "Unsupported file type for Google Drive path. Please use CSV or Excel.", None, None, file_path
        else:
            # Pandas-based reading for local uploads
            if filename.lower().endswith('.csv'):
                df_pandas = pd.read_csv(file_path)
            elif filename.lower().endswith(('.xlsx', '.xls')):
                df_pandas = pd.read_excel(file_path)
            else:
                return "Unsupported file type for local upload. Please use CSV or Excel.", None, None, file_path
            df_spark = spark.createDataFrame(df_pandas)

        column_info = []
        if df_spark.count() > 0: # Check if DataFrame is empty
            for col_name in df_spark.columns:
                col_type = analyze_column_type(df_spark, col_name)
                missing_count = df_spark.filter(col(col_name).isNull()).count()
                missing_pct = (missing_count / df_spark.count() * 100) if df_spark.count() > 0 else 0
                column_info.append({'name': col_name, 'type': col_type, 'missing_pct': missing_pct})
        else:
            print("Warning: DataFrame is empty. Cannot generate column info.")


        summary_report = get_file_summary(df_spark, filename, column_info)

        return summary_report, df_spark, column_info, file_path

    except Exception as e:
        return f"An error occurred while processing the file: {str(e)}", None, None, file_path

def save_df_to_gdrive(df, gdrive_path, file_format):
    # Saves Spark DataFrame to Google Drive
    if df is None:
        return "No DataFrame to save. Load a file first."
    if not gdrive_path:
        return "Provide a Google Drive path to save the file."

    try:
        if file_format == "CSV":
            # For CSV, Spark saves as a directory
            output_path = gdrive_path.replace('.csv', '') # Remove .csv extension
            df.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path)
            return f"DataFrame saved as CSV to '{output_path}' (as a directory of part files)."
        elif file_format == "Parquet":
            df.write.mode("overwrite").parquet(gdrive_path)
            return f"DataFrame saved as Parquet to '{gdrive_path}'."
        else:
            return "Unsupported save format. Choose CSV or Parquet."
    except Exception as e:
        return f"Error saving DataFrame to Google Drive: {str(e)}"

def run_machine_learning_task(file_path_for_ml, task, target_col, num_cores):
    # Handles training different machine learning models
    global spark
    global performance_metrics

    if file_path_for_ml is None:
        return "No data file path available for training. Upload a file first.", ""

    overall_start_time = time.time()
    print(f"Running ML task '{task}' with {num_cores} cores.")

    result_text = ""
    performance_report_text = ""

    try:
        # Stop current Spark session
        spark.stop()
        print("Old Spark session stopped.")

        # Re-initialize Spark session with new number of cores
        spark = (SparkSession.builder
                 .master(f"local[{num_cores}]") # Set master to local
                 .appName("DataProcessor")
                 .config("spark.driver.memory", "2g")
                 .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.1")
                 .getOrCreate())
        print(f"New Spark session initialized with {num_cores} cores.")

        # Re-process file to get a new DataFrame
        summary, df_new_session, col_info_new, _ = process_uploaded_file(file_path_for_ml)
        if df_new_session is None:
            return f"Failed to re-load data for ML task: {summary}", ""

        # Use df_new_session and col_info_new for ML tasks
        task_specific_start_time = time.time()

        if task == "Clustering (K-Means)":
            result_text = run_kmeans_clustering(df_new_session, col_info_new)
        elif task == "Classification":
            result_text = run_classification_models(df_new_session, target_col, col_info_new)
        elif task == "Regression":
            result_text = run_regression_models(df_new_session, target_col, col_info_new)
        else:
            result_text = f"Unknown task: {task}"

        task_specific_end_time = time.time()
        task_execution_time = task_specific_end_time - task_specific_start_time
        performance_metrics[(task, num_cores)] = task_execution_time
        print(f"Model training for '{task}' completed in {task_execution_time:.2f} seconds with {num_cores} cores.")
        print(f"Current performance metrics: {performance_metrics}")

        # Generate performance report after the task completes
        performance_report_text = generate_performance_report_func(performance_metrics)

        return result_text, performance_report_text

    except Exception as e:
        return f"An error occurred during model training:\n{str(e)}", ""
    finally:
        overall_total_time = time.time() - overall_start_time
        print(f"Total execution time for task (including Spark re-init) '{task}': {overall_total_time:.2f} seconds.")

def run_kmeans_clustering(df, col_info):
    # Runs K-Means clustering
    result_text = "## K-Means Clustering (Unsupervised)\n\n"
    feature_cols = [c['name'] for c in col_info if c['type'] in ("numeric", "integer")]

    if len(feature_cols) < 2:
        return "K-Means requires at least two numeric/integer columns."

    result_text += f"Features Used: {', '.join(feature_cols)}\n\n"

    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    data = assembler.transform(df).select("features")

    kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=3, seed=42)
    model = kmeans.fit(data)
    predictions = model.transform(data)

    evaluator = ClusteringEvaluator(featuresCol="features", predictionCol="cluster")
    silhouette = evaluator.evaluate(predictions)

    cluster_stats = predictions.groupBy("cluster").count().orderBy("cluster").collect()

    result_text += f"Number of Clusters (k): 3\n"
    result_text += f"Silhouette Score: {silhouette:.4f}\n\n"
    result_text += "Cluster Distribution:\n"
    for row in cluster_stats:
        result_text += f"- Cluster {row['cluster']}: {row['count']:,} samples\n"

    return result_text

def run_classification_models(df, target_col, col_info):
    # Runs multiple classification algorithms
    if not target_col or target_col == "No Target":
        return "A target column must be selected for classification."

    target_type = next((c['type'] for c in col_info if c['name'] == target_col), None)
    if target_type not in ("categorical", "boolean", "integer", "text"): # Added 'text' as a potential target for StringIndexer
        return f"Target column '{target_col}' is of type '{target_type}', which is not suitable for classification."

    feature_cols = [c['name'] for c in col_info if c['type'] in ("numeric", "integer") and c['name'] != target_col]
    if not feature_cols:
        return "No numeric or integer features found for training."

    indexer = StringIndexer(inputCol=target_col, outputCol="label", handleInvalid="skip")
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="skip")

    pipeline = [indexer, assembler]
    from pyspark.ml import Pipeline
    pipeline_model = Pipeline(stages=pipeline).fit(df)
    data = pipeline_model.transform(df).select("features", "label")

    train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

    result_text = f"## Classification Algorithms\n\n"
    result_text += f"Target Column: {target_col}\n"
    result_text += f"Training Samples: {train_data.count():,}\n"
    result_text += f"Test Samples: {test_data.count():,}\n\n"

    models = {
        "Logistic Regression": LogisticRegression(featuresCol="features", labelCol="label"),
        "Decision Tree": DecisionTreeClassifier(featuresCol="features", labelCol="label"),
        "Random Forest": RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=10)
    }

    evaluator = MulticlassClassificationEvaluator(labelCol="label")

    for name, model in models.items():
        result_text += f"### {name}\n"
        model_start = time.time()
        trained_model = model.fit(train_data)
        predictions = trained_model.transform(test_data)

        accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
        f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

        result_text += f"- Training Time: {time.time() - model_start:.2f}s\n"
        result_text += f"- Accuracy: {accuracy:.4f}\n"
        result_text += f"- F1-Score: {f1:.4f}\n\n"

    return result_text

def run_regression_models(df, target_col, col_info):
    # Runs multiple regression algorithms
    if not target_col or target_col == "No Target":
        return "A target column must be selected for regression."

    target_type = next((c['type'] for c in col_info if c['name'] == target_col), None)
    if target_type not in ("numeric", "integer"):
        return f"Target column '{target_col}' is of type '{target_type}', which is not suitable for regression."

    feature_cols = [c['name'] for c in col_info if c['type'] in ("numeric", "integer") and c['name'] != target_col]
    if not feature_cols:
        return "No numeric or integer features found for training."

    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="skip")
    data = assembler.transform(df).select("features", target_col)

    train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

    result_text = f"## Regression Algorithms\n\n"
    result_text += f"Target Column: {target_col}\n"
    result_text += f"Training Samples: {train_data.count():,}\n"
    result_text += f"Test Samples: {test_data.count():,}\n\n"

    models = {
        "Linear Regression": LinearRegression(featuresCol="features", labelCol=target_col),
        "Decision Tree Regressor": DecisionTreeRegressor(featuresCol="features", labelCol=target_col, maxDepth=5),
        "Random Forest Regressor": RandomForestRegressor(featuresCol="features", labelCol=target_col, numTrees=10)
    }

    evaluator = RegressionEvaluator(labelCol=target_col)

    for name, model in models.items():
        result_text += f"### {name}\n"
        model_start = time.time()
        trained_model = model.fit(train_data)
        predictions = trained_model.transform(test_data)

        rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
        r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

        result_text += f"- Training Time: {time.time() - model_start:.2f}s\n"
        result_text += f"- RMSE (Root Mean Squared Error): {rmse:.4f}\n"
        result_text += f"- R Squared: {r2:.4f}\n\n"

    return result_text

    # Generates table summarizing performance metrics (time, speedup, efficiency)
def generate_performance_report_func(current_performance_metrics):
    if not current_performance_metrics:
        return "üìä No performance metrics available yet. Please run an ML task first"

    report = "## üìä Performance Report\n\n"

    metrics_by_task = {}
    for (task, cores), time_taken in current_performance_metrics.items():
        if task not in metrics_by_task:
            metrics_by_task[task] = {}
        metrics_by_task[task][cores] = time_taken

    for task, core_metrics in metrics_by_task.items():
        if 1 not in core_metrics:
            report += f"### Task: {task}\n"
            report += "No 1-core baseline available to calculate speedup and efficiency for this task.\n"
            report += "| Cores | Time (s) |\n"
            report += "|-------|----------|\n"
            sorted_cores = sorted(core_metrics.keys())
            for cores in sorted_cores:
                time_n_cores = core_metrics[cores]
                report += f"| {cores} | {time_n_cores:.2f} |\n"
            report += "\n"
            continue

        time_1_core = core_metrics[1]
        report += f"### Task: {task}\n"
        report += "| Cores | Time (s) | Speedup (vs 1 core) | Efficiency |\n"
        report += "|-------|----------|---------------------|------------|\n"

        sorted_cores = sorted(core_metrics.keys())
        for cores in sorted_cores:
            time_n_cores = core_metrics[cores]
            speedup_val = time_1_core / time_n_cores if time_n_cores > 0 else 0.0
            efficiency_val = (speedup_val / cores) * 100 if cores > 0 else 0.0

            report += f"| {cores} | {time_n_cores:.2f} | {speedup_val:.2f} | {efficiency_val:.2f}% |\n"
        report += "\n"

    return report

# Gardio UI

In [30]:
with gr.Blocks(title="Distributed ML System") as app:
    gr.Markdown("# ‚òÅÔ∏è Distributed Machine Learning System with PySpark")
    gr.Markdown("Upload a dataset, then select a machine learning task to train a model and analyze its performance.")

    # State management
    df_spark_state = gr.State()
    column_info_state = gr.State()
    file_path_state = gr.State() # New state to store the file path

    with gr.Row():
        with gr.Column(scale=1):
            gr.Markdown("### 1. Upload & Analyze Data")
            gr.Markdown("#### From Local/Colab Filesystem")
            file_input = gr.File(label="Upload CSV or Excel File", file_types=[".csv", ".xlsx", ".xls"])
            upload_btn = gr.Button("‚úÖ‚ÄÖ Upload & Analyze", variant="primary")

            gr.Markdown("#### From Google Drive")
            mount_gdrive_btn = gr.Button("Mount Google Drive", variant="secondary")
            gdrive_mount_status = gr.Markdown("Google Drive not mounted.")
            gdrive_path_input = gr.Textbox(label="Google Drive File Path (e.g., /content/drive/MyDrive/data.csv)", placeholder="Enter path to CSV or Excel file on Google Drive")
            load_gdrive_btn = gr.Button("‚úÖ‚ÄÖ Load from Drive & Analyze", variant="secondary")

            gr.Markdown("### 2. Configure ML Model")
            target_column_dropdown = gr.Dropdown(label="Select Target Column (optional for clustering)", choices=[], interactive=True)
            ml_task_dropdown = gr.Dropdown(
                    label="Select ML Task",
                    choices=["Classification", "Regression", "Clustering (K-Means)"],
                    value="Classification",
                    interactive=True
                )
            # New dropdown for core selection
            num_cores_dropdown = gr.Dropdown(
                    label="Select Number of Cores",
                    choices=[1, 2, 4, 8],
                    value=1,
                    interactive=True
                )
            ml_btn = gr.Button("‚ñ∂Ô∏è Train Model", variant="secondary")

            gr.Markdown("### 3. Save Processed Data (Optional)")
            save_gdrive_path_input = gr.Textbox(label="Save to Google Drive Path (e.g., /content/drive/MyDrive/processed_data)", placeholder="Enter desired output path for processed data")
            save_format_dropdown = gr.Dropdown(label="Save Format", choices=["CSV", "Parquet"], value="CSV")
            save_gdrive_btn = gr.Button("üíæ Save to Drive", variant="secondary")

        with gr.Column(scale=2):
            gr.Markdown("### ‚úÖ‚ÄÖ Analysis & Results")
            analysis_output = gr.Markdown("Awaiting file upload...")
            ml_output = gr.Markdown("Model training results will appear here...")
            # NEW: Markdown component for performance table
            performance_table_output = gr.Markdown('Performance report will appear here...')
            save_output = gr.Markdown("Save status will appear here...")

    # ==============================================================================
    # 5. EVENT HANDLERS
    # ==============================================================================

    def upload_handler(file):
        """
        Handles the file upload button click event.
        """
        summary, df, col_info, file_path = process_uploaded_file(file)
        if df is None:
            return summary, None, None, gr.Dropdown(choices=[]), None # Added None for file_path_state

        column_names = [c['name'] for c in col_info] if col_info else []
        choices_with_none = ["No Target"] + column_names

        # Update dropdown and return states
        return summary, df, col_info, gr.Dropdown(choices=choices_with_none, value="No Target", interactive=True), file_path

    # New function to process file from Google Drive path
    def process_gdrive_file(gdrive_path):
        if not gdrive_path:
            return "‚ùå No Google Drive path provided.", None, None, gr.Dropdown(choices=[]), None # Added None for file_path_state

        # Use the modified process_uploaded_file which accepts a string path
        summary, df, col_info, file_path = process_uploaded_file(gdrive_path)
        if df is None:
            return summary, None, None, gr.Dropdown(choices=[]), file_path # Pass the file_path even if df is None

        column_names = [c['name'] for c in col_info] if col_info else []
        choices_with_none = ["No Target"] + column_names

        return summary, df, col_info, gr.Dropdown(choices=choices_with_none, value="No Target", interactive=True), file_path

    upload_btn.click(
        fn=upload_handler,
        inputs=[file_input],
        outputs=[analysis_output, df_spark_state, column_info_state, target_column_dropdown, file_path_state] # Added file_path_state
    )

    mount_gdrive_btn.click(
        fn=mount_gdrive,
        inputs=[],
        outputs=[gdrive_mount_status]
    )

    load_gdrive_btn.click(
        fn=process_gdrive_file,
        inputs=[gdrive_path_input],
        outputs=[analysis_output, df_spark_state, column_info_state, target_column_dropdown, file_path_state] # Added file_path_state
    )

    # Modified ml_btn.click to include performance_table_output
    ml_btn.click(
        fn=run_machine_learning_task,
        inputs=[file_path_state, ml_task_dropdown, target_column_dropdown, num_cores_dropdown],
        outputs=[ml_output, performance_table_output]
    )

    save_gdrive_btn.click(
        fn=save_df_to_gdrive,
        inputs=[df_spark_state, save_gdrive_path_input, save_format_dropdown],
        outputs=[save_output]
    )

# ==============================================================================
# 6. APPLICATION LAUNCH
# ==============================================================================
app.launch(share=True, quiet=True)

* Running on public URL: https://e5a1dd8636253f0514.gradio.live


