In [3]:
import os
import shutil
import uuid
from google.colab import files

In [4]:
def mount_drive ():
  from google.colab import drive
  drive.mount('/content/drive')

In [5]:
def list_drive_datasets(drive_dir="/content/drive/MyDrive/colab_datasets"):
    """
    List all files in a Google Drive folder.
    """
    os.makedirs(drive_dir, exist_ok=True)
    return [f for f in os.listdir(drive_dir) if os.path.isfile(os.path.join(drive_dir, f))]

In [6]:
def upload_dataset_to_drive(drive_dir="/content/drive/MyDrive/colab_datasets"):
    """
    Upload a new dataset to Google Drive permanently, with unique ID.
    """
    os.makedirs(drive_dir, exist_ok=True)

    uploaded = files.upload()
    original_name = list(uploaded.keys())[0]

    name, ext = original_name.rsplit(".", 1)
    unique_id = str(uuid.uuid4())[:8]
    new_file_name = f"{name}_{unique_id}.{ext}"

    drive_path = os.path.join(drive_dir, new_file_name)

    with open(drive_path, "wb") as f:
        f.write(uploaded[original_name])

    print(f" Uploaded to Drive: {new_file_name}")
    return drive_path

In [7]:


def restore_file_to_colab(drive_file_path, target_colab_dir="/content/datasets"):
    """
    Restore a file from Drive into Colab workspace.
    Returns metadata for validation or Spark pipeline.
    """
    os.makedirs(target_colab_dir, exist_ok=True)
    file_name = os.path.basename(drive_file_path)
    colab_path = os.path.join(target_colab_dir, file_name)

    shutil.copy(drive_file_path, colab_path)

    metadata = {
        "file_name": file_name,
        "file_path": colab_path,
        "drive_path": drive_file_path,
        "file_extension": file_name.split(".")[-1].lower(),
        "file_size_bytes": os.path.getsize(colab_path)
    }

    print(f" File restored to Colab: {colab_path}")
    return metadata

In [8]:
def persist_file_to_drive(colab_file_path, drive_dir="/content/drive/MyDrive/colab_datasets"):
    """
    Save any file from Colab to Drive permanently.
    Returns metadata of the saved file.
    """
    os.makedirs(drive_dir, exist_ok=True)

    file_name = os.path.basename(colab_file_path)
    name, ext = file_name.rsplit(".", 1)
    unique_id = str(uuid.uuid4())[:8]
    drive_file_name = f"{name}_{unique_id}.{ext}"
    drive_path = os.path.join(drive_dir, drive_file_name)

    shutil.copy(colab_file_path, drive_path)

    metadata = {
        "original_colab_path": colab_file_path,
        "drive_path": drive_path,
        "file_name": drive_file_name,
        "file_extension": ext.lower(),
        "file_size_bytes": os.path.getsize(drive_path)
    }

    print(f" File persisted to Drive: {drive_path}")
    return metadata

In [9]:
def storage_ui_pipeline(colab_dir="/content/datasets", drive_dir="/content/drive/MyDrive/colab_datasets"):
    """
    Interactive storage pipeline:
    1. Ask user to use existing dataset or upload new
    2. Restore dataset into Colab
    3. Return metadata for validation and Spark pipeline
    """
    mount_drive()

    # List existing files
    existing_files = list_drive_datasets(drive_dir)

    # Interactive selection
    if existing_files:
        print("Existing datasets in Google Drive:")
        for i, f in enumerate(existing_files, 1):
            print(f"{i} - {f}")
        print("\nOptions:")
        print("1 - Use existing dataset")
        print("2 - Upload new dataset")
        choice = input("Enter choice (1 or 2): ").strip()
    else:
        print("No existing datasets found. You must upload a new dataset.")
        choice = "2"

    # Existing dataset
    if choice == "1":
        idx = int(input("Enter dataset number: ")) - 1
        drive_path = os.path.join(drive_dir, existing_files[idx])
    # Upload new dataset
    else:
        drive_path = upload_dataset_to_drive(drive_dir)
        print(" Dataset saved permanently in Drive.")

    # Restore to Colab
    dataset_meta = restore_file_to_colab(drive_path, colab_dir)

    return dataset_meta

In [10]:


def validate_dataset(metadata):
    """
    Validate dataset type and detect Spark read strategy.
    """

    allowed_types = ["csv", "json", "txt", "pdf"]
    ext = metadata["file_extension"]

    if ext not in allowed_types:
        raise ValueError(f"Unsupported file type: .{ext}")

    spark_read_config = {
        "format": None,
        "options": {}
    }

    if ext == "csv":
        spark_read_config["format"] = "csv"
        spark_read_config["options"] = {
            "header": "true",
            "inferSchema": "true"
        }

    elif ext == "json":
        spark_read_config["format"] = "json"

    elif ext == "txt":
        spark_read_config["format"] = "text"

    elif ext == "pdf":
        spark_read_config["format"] = "pdf"
        print(" PDF detected: requires preprocessing before Spark ML")

    print(" Dataset validated")
    print(spark_read_config)

    return {
        "valid": True,
        "spark_read_config": spark_read_config
    }

In [11]:
def select_job_and_subtasks_numbered():
    """
    Ask user to choose main job type (Descriptive / ML)
    and then select subtasks using numbers (e.g., 1,2,7).
    Returns full job configuration object.
    """

    # --- Main Job Type ---
    print("Select Main Job Type:")
    print("1 - Descriptive Statistics")
    print("2 - Machine Learning")
    choice = input("Enter choice (1 or 2): ").strip()

    config = {}

    # --- Descriptive Statistics ---
    if choice == "1":
        config["mode"] = "descriptive"
        descriptive_tasks = [
            "row_count",
            "column_count",
            "data_types",
            "min_max_mean",
            "null_percentage",
            "unique_counts"
        ]
        print("\nSelect Descriptive Subtasks (comma separated numbers, e.g., 1,3,5):")
        for i, task in enumerate(descriptive_tasks, start=1):
            print(f"{i} - {task}")
        selections = input("Enter your choices: ").split(",")
        config["tasks"] = [descriptive_tasks[int(s.strip())-1] for s in selections]

    # --- Machine Learning ---
    elif choice == "2":
        config["mode"] = "ml"
        ml_tasks = ["regression", "kmeans", "fpgrowth", "timeseries"]
        print("\nSelect ML Subtasks (comma separated numbers, e.g., 1,2):")
        for i, task in enumerate(ml_tasks, start=1):
            print(f"{i} - {task}")
        selections = input("Enter your choices: ").split(",")
        config["tasks"] = [ml_tasks[int(s.strip())-1] for s in selections]

    else:
        raise ValueError("Invalid selection. Choose 1 or 2.")

    print("\n Job Configuration Complete")
    return config

In [12]:
from pyspark.sql.functions import (
    col, count, countDistinct, isnan, when,
    min, max, mean, sum as spark_sum
)

def run_descriptive_subtasks(df, selected_tasks):

    output = {}

    # ---------- Basic metadata ----------
    if "row_count" in selected_tasks:
        output["row_count"] = df.count()

    if "column_count" in selected_tasks:
        output["column_count"] = len(df.columns)

    if "data_types" in selected_tasks:
        output["data_types"] = {
            f.name: str(f.dataType) for f in df.schema.fields
        }

    # ---------- Numeric column detection ----------
    numeric_types = {"int", "double", "float", "long"}
    numeric_cols = [
        f.name for f in df.schema.fields
        if f.dataType.simpleString() in numeric_types
    ]

    # ---------- Min / Max / Mean ----------
    if "min_max_mean" in selected_tasks and numeric_cols:
        agg_exprs = []
        for c in numeric_cols:
            agg_exprs.extend([
                min(col(c)).alias(f"{c}__min"),
                max(col(c)).alias(f"{c}__max"),
                mean(col(c)).alias(f"{c}__mean")
            ])

        row = df.agg(*agg_exprs).collect()[0].asDict()

        stats = {}
        for c in numeric_cols:
            stats[c] = {
                "min": row.get(f"{c}__min"),
                "max": row.get(f"{c}__max"),
                "mean": row.get(f"{c}__mean")
            }

        output["min_max_mean"] = stats

    # ---------- Null percentage ----------
    if "null_percentage" in selected_tasks:
        total_rows = df.count()

        null_exprs = []
        for c in df.columns:
            if c in numeric_cols:
                null_exprs.append(
                    spark_sum(
                        when(col(c).isNull() | isnan(col(c)), 1).otherwise(0)
                    ).alias(c)
                )
            else:
                null_exprs.append(
                    spark_sum(
                        when(col(c).isNull(), 1).otherwise(0)
                    ).alias(c)
                )

        null_counts = df.agg(*null_exprs).collect()[0].asDict()

        output["null_percentage"] = {
            c: round((null_counts[c] / total_rows) * 100, 2)
            for c in df.columns
        }

    # ---------- Unique counts ----------
    if "unique_counts" in selected_tasks:
        unique_exprs = [
            countDistinct(col(c)).alias(c) for c in df.columns
        ]

        unique_counts = df.agg(*unique_exprs).collect()[0].asDict()
        output["unique_counts"] = unique_counts

    return output

In [13]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

def run_kmeans(df, k=3):
    numeric_cols = [f.name for f in df.schema.fields if f.dataType.simpleString() in ['int','double']]
    if not numeric_cols:
        return "Skipped: no numeric columns"

    assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
    data = assembler.transform(df)

    model = KMeans(k=k, seed=1).fit(data)
    return model.clusterCenters()

In [14]:
from pyspark.ml.regression import LinearRegression

def run_regression(df):
    numeric_cols = [f.name for f in df.schema.fields if f.dataType.simpleString() in ['int','double']]
    if len(numeric_cols) < 2:
        return "Skipped: not enough numeric columns"

    assembler = VectorAssembler(inputCols=numeric_cols[:-1], outputCol="features")
    data = assembler.transform(df).withColumnRenamed(numeric_cols[-1], "label")

    model = LinearRegression(featuresCol="features", labelCol="label").fit(data)
    return model.coefficients.tolist()

In [15]:
from pyspark.ml.fpm import FPGrowth

def run_fpgrowth(df, min_support=0.5):
    if "items" not in df.columns:
        return "Skipped: 'items' column required"

    fp = FPGrowth(itemsCol="items", minSupport=min_support, minConfidence=0.6)
    model = fp.fit(df)
    return model.freqItemsets.collect()

In [16]:
def run_timeseries(df, timestamp_col="timestamp", freq="day"):
    if timestamp_col not in df.columns:
        return "Skipped: timestamp column required"

    # Basic example: count per day
    from pyspark.sql.functions import to_date, col
    ts_df = df.withColumn("date", to_date(col(timestamp_col)))
    result = ts_df.groupBy("date").count().collect()
    return result

In [17]:
def run_ml_subtasks(df, selected_tasks):
    """
    Executes ML subtasks modularly.
    """
    results = {}

    for task in selected_tasks:
        if task == "kmeans":
            results["kmeans"] = run_kmeans(df)
        elif task == "regression":
            results["regression"] = run_regression(df)
        elif task == "fpgrowth":
            results["fpgrowth"] = run_fpgrowth(df)
        elif task == "timeseries":
            results["timeseries"] = run_timeseries(df)
        else:
            results[task] = "Unknown ML task"

    return results

In [18]:
from pyspark.sql import SparkSession

def create_spark_session(nodes=1):

    spark = SparkSession.builder \
            .appName(f"JobExecutor_{nodes}_nodes") \
            .master(f"local[{nodes}]") \
            .getOrCreate()
    return spark

In [19]:
import time
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, isnan, when, min, max, mean

def run_pipeline_cached(dataset_meta, spark_read_config, job_config, node_list=[1,2,4,8]):
    """
    Executes the main job on multiple node counts with caching for improved performance.
    Returns job outputs and performance metrics.
    """
    perf_records = []
    job_outputs = {}

    for nodes in node_list:
        print(f"\n--- Running job on {nodes} node(s) ---")
        spark =create_spark_session(nodes)

        # -----------------------------
        # Load Dataset
        # -----------------------------
        reader = spark.read.format(spark_read_config["format"])
        for k, v in spark_read_config.get("options", {}).items():
            reader = reader.option(k, v)
        df = reader.load(dataset_meta["file_path"])

        # -----------------------------
        # Cache dataset in memory
        # -----------------------------
        df.cache()
        df.count()  # Force Spark to load data into memory
        print(" Dataset cached in memory")

        # -----------------------------
        # Run Job
        # -----------------------------
        start_time = time.time()
        if job_config["mode"] == "descriptive":
            output = run_descriptive_subtasks(df, job_config["tasks"])
        else:
            output = run_ml_subtasks(df, job_config["tasks"])
        duration = time.time() - start_time

        # Save outputs & metrics
        job_outputs[f"{nodes}_nodes"] = output
        perf_records.append({"nodes": nodes, "time_sec": duration})
        print(f"Execution time: {duration:.2f} sec")

        spark.stop()

    # Compute speedup and efficiency
    base_time = perf_records[0]["time_sec"]
    for r in perf_records:
        r["speedup"] = round(base_time / r["time_sec"], 2)
        r["efficiency"] = round(r["speedup"] / r["nodes"], 2)

    perf_table = pd.DataFrame(perf_records)
    return job_outputs, perf_table

In [20]:
import pandas as pd
import datetime

def persist_pipeline_results(
    job_outputs: dict,
    perf_table: pd.DataFrame,
    show_node: str,
    drive_dir: str = "/content/drive/MyDrive/cloud_results"
):
    """
    1. Display performance metrics as a table
    2. Show output of ONE selected node
    3. Save ALL node outputs + performance table to Drive
    4. Return saved paths & metadata
    """

    if not isinstance(perf_table, pd.DataFrame):
        raise TypeError("perf_table must be a pandas DataFrame")

    if show_node not in job_outputs:
        raise ValueError(f"Node '{show_node}' not found in job_outputs")

    os.makedirs(drive_dir, exist_ok=True)


    unique_id = str(uuid.uuid4())[:8]
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    run_folder = os.path.join(drive_dir, f"run_{timestamp}_{unique_id}")
    os.makedirs(run_folder, exist_ok=True)


    print("Performance Metrics")
    display(perf_table)

    perf_file = os.path.join(run_folder, "performance_metrics.csv")
    perf_table.to_csv(perf_file, index=False)


    print(f"Result from node: {show_node}")
    print(job_outputs[show_node])


    node_output_paths = {}

    for node, output in job_outputs.items():
        node_file = os.path.join(run_folder, f"node_{node}_output.txt")
        with open(node_file, "w", encoding="utf-8") as f:
            f.write(str(output))
        node_output_paths[node] = node_file


    metadata = {
        "run_folder": run_folder,
        "performance_table_path": perf_file,
        "node_output_paths": node_output_paths,
        "shown_node": show_node,
        "timestamp": timestamp,
        "run_id": unique_id
    }

    print(f"\nAll results saved successfully")
    print(f"Drive path: {run_folder}")

    return metadata

In [21]:
def pipeline_ui_app():
    """
    Orchestrates the full pipeline with UI for each phase,
    invoking existing functions without re-implementation.
    """

    # -----------------------------
    #  Upload Dataset
    # -----------------------------
    print("=== Phase 1: Upload Dataset ===")
    dataset_meta = storage_ui_pipeline()  # existing upload function

    # -----------------------------
    #  Validate Dataset
    # -----------------------------
    print("\n=== Phase 2: Validate Dataset ===")
    validation = validate_dataset(dataset_meta)  # existing validation function
    spark_read_config = validation["spark_read_config"]

    # -----------------------------
    #  Select Job & Subtasks
    # -----------------------------
    print("\n=== Phase 3: Select Job and Subtasks ===")
    job_config = select_job_and_subtasks_numbered()  # existing selection function

    # -----------------------------
    #  Run Pipeline
    # -----------------------------
    print("\n=== Phase 4: Execute Pipeline on Nodes ===")
    job_outputs, perf_table = run_pipeline_cached(dataset_meta, spark_read_config, job_config)  # existing pipeline

    # -----------------------------
    #  Display & Save Results
    # -----------------------------
    print("\n=== Phase 5: Display and Save Results ===")
    persist_pipeline_results(job_outputs, perf_table, show_node="1_nodes")  # existing display/save function

In [24]:
pipeline_ui_app()

=== Phase 1: Upload Dataset ===
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Existing datasets in Google Drive:
1 - customers-100000 (1)_8d8a3627.csv

Options:
1 - Use existing dataset
2 - Upload new dataset
Enter choice (1 or 2): 1
Enter dataset number: 1
 File restored to Colab: /content/datasets/customers-100000 (1)_8d8a3627.csv

=== Phase 2: Validate Dataset ===
 Dataset validated
{'format': 'csv', 'options': {'header': 'true', 'inferSchema': 'true'}}

=== Phase 3: Select Job and Subtasks ===
Select Main Job Type:
1 - Descriptive Statistics
2 - Machine Learning
Enter choice (1 or 2): 2

Select ML Subtasks (comma separated numbers, e.g., 1,2):
1 - regression
2 - kmeans
3 - fpgrowth
4 - timeseries
Enter your choices: 2

 Job Configuration Complete

=== Phase 4: Execute Pipeline on Nodes ===

--- Running job on 1 node(s) ---
 Dataset cached in memory
Execution time: 12.16 sec

--- Running job on 2 node

Unnamed: 0,nodes,time_sec,speedup,efficiency
0,1,12.163591,1.0,1.0
1,2,8.780622,1.39,0.69
2,4,8.628088,1.41,0.35
3,8,8.211227,1.48,0.18


Result from node: 1_nodes
{'kmeans': [array([49969.5]), array([16651.5]), array([83318.5])]}

All results saved successfully
Drive path: /content/drive/MyDrive/cloud_results/run_20260111_145106_42136076


In [23]:
import os
import multiprocessing

# Number of logical processors (threads)
logical_cores = os.cpu_count()
print("Logical cores (threads):", logical_cores)

# Number of physical cores
try:
    import psutil
    physical_cores = psutil.cpu_count(logical=False)
    print("Physical cores:", physical_cores)
except ImportError:
    print("psutil not installed, physical cores info not available")

Logical cores (threads): 2
Physical cores: 1
