# HMS-PySpark-exercise
### by Iulian Cozma

## Toools and Libraries

In [1]:
import os
import sys
import numpy as np

root = "/scratch/eecs545w24_class_root/eecs545w24_class/shared_data/hms_data/raw_data/" 
#root = "/kaggle/input/hms-harmful-brain-activity-classification" 
ENVIRONMENT = 'kaggle'

experiment_params = {
    # "N": 1_000,
    "sample_frac": .10,     # fraction of the dataset to be used, -1 to use 'all' data
    "seed": 42,
    "flags": { "eval", ENVIRONMENT},
    "spectogram_freq_sparse_N": 10,
    "eeg_freq_sparse_N": 20
}
FLAGS = experiment_params["flags"] 

print(f"Running with experiment params: {experiment_params}")

Running with experiment params: {'sample_frac': 0.1, 'seed': 42, 'flags': {'eval', 'kaggle'}, 'spectogram_freq_sparse_N': 10, 'eeg_freq_sparse_N': 20}


In [2]:
#pip install mlflow

### Install pyspark

In [3]:
import shutil
#src_path = r"/kaggle/input/pyspark-package/pyspark-3.5.0.tar.gz.mp4"
#dst_path = r"/kaggle/working/pyspark-3.5.0.tar.gz"
#src_path = r"/Users/sohamdas/Desktop/EECS 545/Project"
#dst_path = r"/Users/sohamdas/Desktop/EECS 545/Project"
#shutil.copy(src_path, dst_path)
#!pip install pyspark
#!pip install /kaggle/working/pyspark-3.5.0.tar.gz

### ML Flow - setup

In [4]:
#if "eval" in FLAGS:
#    import os
#    # Set the environment variable
#    os.environ["PYSPARK_PIN_THREAD"] = "False"
#    spark.builder.config("spark.jars.packages", "org.mlflow.mlflow-spark")
#    import mlflow
#
#     # mlflow.set_tracking_uri("http://127.0.0.0:5000")
#    mlflow.set_tracking_uri("http://localhost:5000")
#    mlflow.autolog()

### PySpark app

In [5]:
import pyspark as ps
from pyspark.sql import SparkSession
import faulthandler

faulthandler.enable()
ps.__version__

'3.5.1'

In [6]:
spark = (
    SparkSession.builder.master("local[*]")
    # .config("spark.jars.packages", "org.mlflow.mlflow-spark")
    .config("spark.driver.memory", "15g")
    .config("spark.sql.adaptive.enabled", "true")  # Enable adaptive query execution
    .config(
        "spark.debug.maxToStringFields", 20_000
    )  # For msg: truncated the string representation of a plan since it was too large.
    .config("spark.sql.autoBroadcastJoinThreshold", -1)
    .appName("brain-spark-1")
    .getOrCreate()
)
# Access the Spark UI URL
print("Spark UI: ", spark.sparkContext.uiWebUrl)

sc = spark.sparkContext

from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    FloatType,
    LongType,
    DoubleType,
    ArrayType,
)

from pyspark.ml.functions import vector_to_array

from pyspark.sql.functions import (
    input_file_name as input_file_name,
    regexp_extract as regexp_extract,
    collect_list as collect_list,
    col,
    lit,
    expr,
    slice,
    udf,
)
from pyspark.sql.functions import array

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/28 09:37:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark UI:  http://gl3205.arc-ts.umich.edu:4040


## Configs

In [7]:
import time
start = time.time()

In [8]:
# Train & Test columns and schema
train_columns = eval(
    """['eeg_id', 'eeg_sub_id', 'eeg_label_offset_seconds', 'spectrogram_id', 'spectrogram_sub_id', 'spectrogram_label_offset_seconds', 'label_id', 'patient_id', 'expert_consensus', 'seizure_vote', 'lpd_vote', 'gpd_vote', 'lrda_vote', 'grda_vote', 'other_vote']"""
)
train_schema = StructType(
    [
        StructField("eeg_id", LongType(), True),
        StructField("eeg_sub_id", IntegerType(), True),
        StructField("eeg_label_offset_seconds", DoubleType(), True),
        StructField("spectrogram_id", IntegerType(), True),
        StructField("spectrogram_sub_id", IntegerType(), True),
        StructField("spectrogram_label_offset_seconds", DoubleType(), True),
        StructField("label_id", LongType(), True),
        StructField("patient_id", IntegerType(), True),
        StructField("expert_consensus", StringType(), True),
        StructField("seizure_vote", IntegerType(), True),
        StructField("lpd_vote", IntegerType(), True),
        StructField("gpd_vote", IntegerType(), True),
        StructField("lrda_vote", IntegerType(), True),
        StructField("grda_vote", IntegerType(), True),
        StructField("other_vote", IntegerType(), True),
    ]
)
test_schema = StructType(
    [
        StructField("spectrogram_id", IntegerType(), True),
        StructField("eeg_id", LongType(), True),
        StructField("patient_id", IntegerType(), True),
    ]
)


# EEG columns and schema
eeg_columns = eval(
    """[
        'Fp1', 'F3', 'C3', 'P3', 'F7', 'T3', 'T5', 'O1', 'Fz', 'Cz', 'Pz', 'Fp2', 'F4', 'C4', 'P4', 'F8', 'T4', 'T6', 'O2', 'EKG', 'eeg_id']"""
)
eeg_columns_data = eeg_columns[:-1]  # eeg columns containing data only (no eeg_id)
eeg_schema = StructType(
    [
        StructField("Fp1", FloatType(), True),
        StructField("F3", FloatType(), True),
        StructField("C3", FloatType(), True),
        StructField("P3", FloatType(), True),
        StructField("F7", FloatType(), True),
        StructField("T3", FloatType(), True),
        StructField("T5", FloatType(), True),
        StructField("O1", FloatType(), True),
        StructField("Fz", FloatType(), True),
        StructField("Cz", FloatType(), True),
        StructField("Pz", FloatType(), True),
        StructField("Fp2", FloatType(), True),
        StructField("F4", FloatType(), True),
        StructField("C4", FloatType(), True),
        StructField("P4", FloatType(), True),
        StructField("F8", FloatType(), True),
        StructField("T4", FloatType(), True),
        StructField("T6", FloatType(), True),
        StructField("O2", FloatType(), True),
        StructField("EKG", FloatType(), True),
        StructField("eeg_id", IntegerType(), True),
    ]
)

# Spectrogram columns and schema
spectrogram_columns_prefix = eval("""['LL', 'RL', 'RP', 'LP']""")
spectrogram_columns_sufix = eval(
    """['0.59', '0.78', '0.98', '1.17', '1.37', '1.56', '1.76', '1.95', '2.15', '2.34', '2.54', '2.73', '2.93', '3.13', '3.32', '3.52', '3.71', '3.91', '4.1', '4.3', '4.49', '4.69', '4.88', '5.08', '5.27', '5.47', '5.66', '5.86', '6.05', '6.25', '6.45', '6.64', '6.84', '7.03', '7.23', '7.42', '7.62', '7.81', '8.01', '8.2', '8.4', '8.59', '8.79', '8.98', '9.18', '9.38', '9.57', '9.77', '9.96', '10.16', '10.35', '10.55', '10.74', '10.94', '11.13', '11.33', '11.52', '11.72', '11.91', '12.11', '12.3', '12.5', '12.7', '12.89', '13.09', '13.28', '13.48', '13.67', '13.87', '14.06', '14.26', '14.45', '14.65', '14.84', '15.04', '15.23', '15.43', '15.63', '15.82', '16.02', '16.21', '16.41', '16.6', '16.8', '16.99', '17.19', '17.38', '17.58', '17.77', '17.97', '18.16', '18.36', '18.55', '18.75', '18.95', '19.14', '19.34', '19.53', '19.73', '19.92']"""
)
spectrogram_columns_data = [
    f"{prefix}_{suffix}"
    for prefix in spectrogram_columns_prefix
    for suffix in spectrogram_columns_sufix
]


# Create a StructType for the schema from a list of StructFields
spectrogram_schema = StructType(
    [StructField("time", IntegerType())]
    + [
        StructField(prefix + "_" + suffix, FloatType(), True)
        for prefix in spectrogram_columns_prefix
        for suffix in spectrogram_columns_sufix
    ]
)

NFreq = experiment_params.get("spectogram_freq_sparse_N", 10)
spectrogram_columns_data = (
    [x for x in spectrogram_columns_data if x.startswith("LL")][::NFreq]
    + [x for x in spectrogram_columns_data if x.startswith("RL")][::NFreq]
    + [x for x in spectrogram_columns_data if x.startswith("RP")][::NFreq]
    + [x for x in spectrogram_columns_data if x.startswith("LP")][::NFreq]
)

spectrogram_columns_data_dot = [
    col.replace(".", "__") for col in spectrogram_columns_data
]


votes_columns = eval(
    """[  'seizure_vote', 'lpd_vote', 'gpd_vote', 'lrda_vote', 'grda_vote', 'other_vote']"""
)

In [9]:
len(eeg_columns_data), len(spectrogram_columns_data), len(votes_columns)

(20, 40, 6)

## Load Data

In [10]:
N = experiment_params.get("N", -1)
sample_frac = experiment_params.get("sample_frac", -1)
seed = experiment_params.get("seed", 42)

sum_of_votes_expr = "(" + "+".join(votes_columns) + ")"

In [11]:
df_train = (
    spark.read.csv(root + "train.csv", header=True, schema=train_schema)
    # .alias("train")
    # .groupBy("train.eeg_id")
    # .agg(*[first(col(column)).alias(column) for column in train_columns])
    .filter("eeg_sub_id=0")
    # cast offest to integer
    .withColumn(
        "eeg_label_offset_seconds",
        col("eeg_label_offset_seconds").cast("integer"),
    ).withColumn(
        "spectrogram_label_offset_seconds",
        col("spectrogram_label_offset_seconds").cast("integer"),
    )
    # label
    .withColumn("label", col("expert_consensus"))
    # percent of votes for each label for model evaluation,necesary on 'eval' flag
    .withColumns(
        {
            f"{column}_actual": expr(f"{column} / {sum_of_votes_expr}")
            for column in votes_columns
        },
    )
)

In [12]:
if N > 0:
    df_train = df_train.limit(N)
if sample_frac > 0:
    df_train = df_train.sample(
        fraction=sample_frac,
        withReplacement=False,
        seed=experiment_params.get("seed", 42),
    )
    print(f"getting a fraction of the data: {sample_frac}")

# Split the data into training and test sets (30% held out for testing)
train_df, eval_df = df_train.randomSplit([0.7, 0.3], seed=seed)

getting a fraction of the data: 0.1


### EEGs and Spectrograms parquet files

In [13]:
def get_parquet_files(df, folder, root=root):
    """
    Returns a list of file paths for EEG and Spectrograns, based on the given ids.
    Function is needed because multiple EEGs & Spectrogram are stored into single parquet file.

    Args:
        df (DataFrame): The DataFrame containing the EEG/Spectrogram id as a column named 'id'.
        folder (str): The folder where the EEG/Spectrogram files are stored. Defaults to "train_eegs".
        root (str, optional): The root directory where the folder is located. Defaults to root.

    Returns:
        list: A list of parquet file paths
    """

    paths = [
        os.path.join(root, folder, f"{x.id}.parquet")
        for x in df.select("id").distinct().collect()
    ]
    print(f"Found {len(paths)} files")
    return paths

In [14]:
train_eeg_files_paths = get_parquet_files(
    train_df.select(col("eeg_id").alias("id")), folder="train_eegs", root=root
)
train_spectro_files_paths = get_parquet_files(
    train_df.select(col("spectrogram_id").alias("id")),
    folder="train_spectrograms",
    root=root,
)


if "eval" in FLAGS:
    eval_eeg_files_paths = get_parquet_files(
        eval_df.select(col("eeg_id").alias("id")), folder="train_eegs", root=root
    )

    eval_spectro_files_paths = get_parquet_files(
        eval_df.select(col("spectrogram_id").alias("id")),
        folder="train_spectrograms",
        root=root,
    )

                                                                                

Found 1277 files
Found 1157 files
Found 500 files
Found 474 files


### UDF for Summary Statistics

In [15]:
from pyspark.ml.linalg import Vectors, VectorUDT

# Convert array columns to vectors
to_vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())

from scipy.stats import skew, kurtosis
from pyspark.sql.functions import udf
import math
import numpy as np


# Define the UDF for stats summary
@udf(ArrayType(DoubleType()))
def stats_summary_udf(arr):
    """
    It splits the array in 3 parts and calculate some stats for each segment:
    - mean
    - std
    - variance ...
    """

    def stats_sub_array(x):
        if len(x) == 0:
            return [0] * 7

        return [
            float(np.mean(x)),
            float(np.std(x)),
            float(np.var(x)),
            float(np.median(x)),
            float(np.max(x)),
            float(np.min(x)),
            float(np.max(x) - np.min(x)),
        ]

    if not arr or len(arr) == 0:
        return [0] * 7 * 3
    else:
        arr = np.array(arr)
        split_point1 = len(arr) // 3
        split_point2 = 2 * (len(arr) // 3)
        ret = (
            stats_sub_array(arr[:split_point1])
            + stats_sub_array(arr[split_point1:split_point2])
            + stats_sub_array(arr[split_point2:])
        )
        return ret

### EEGs

In [16]:
eeg_freq_sparse_N = experiment_params.get("eeg_freq_sparse_N", 20)


def load_eegs(df, eeg_files_paths, eeg_schema, cols_to_array):
    """
    Load EEG data from parquet files and join it with the train/test data.

    Args:
        df (DataFrame): The train/test data DataFrame.
        eeg_files_paths (list): List of file paths for the EEG parquet files.
        schema (StructType): The schema of the EEG data.

    Returns:
        DataFrame: The DataFrame with the EEG data joined with the train/test data.
    """
    initial_columns = df.columns
    return (
        df.alias("train")
        .join(
            # read the eeg data from parquet files
            spark.read.parquet(*eeg_files_paths, schema=eeg_schema)
            .withColumn(
                "eeg_id", regexp_extract(input_file_name(), r"(\d+).parquet", 1)
            )
            .groupBy("eeg_id")
            # collect all eeg data into a single row / array
            .agg(
                # i.e. collect_list("Fp1").alias("Fp1")
                *([collect_list(column).alias(column) for column in cols_to_array])
            )
            # join the eeg data with the train/test data
            .alias("eegs"),  # join alias is needed to avoid ambiguous column names
            "eeg_id",
            "inner",
        )
        .withColumns(  # slice the eeg data to 50 seconds, starting from eeg_label_offset_seconds, data recording frequency is 200Hz
            {
                column: expr(f"slice({column}, 200*eeg_label_offset_seconds+1, 200*50)")
                for column in cols_to_array
                if "eeg_label_offset_seconds" in initial_columns
            }
        )
        .withColumns(  # decrease frequency to ...Hz
            {
                column: expr(
                    f"FILTER({column}, (element, index) -> index % {eeg_freq_sparse_N} = 0)"
                )
                for column in cols_to_array
            }
        )
        .withColumns(  # apply tranformation to the eeg data
            {
                column: to_vector_udf(stats_summary_udf(f"{column}"))
                for column in cols_to_array
            }
        )
    )

In [17]:
df_train_eegs = load_eegs(train_df, train_eeg_files_paths, eeg_schema, eeg_columns_data)

if "eval" in FLAGS:
    df_eval_eegs = load_eegs(
        eval_df, eval_eeg_files_paths, eeg_schema, eeg_columns_data
    )

                                                                                

### Spectrograms

In [18]:
def load_spectrograms(df, files_paths, schema, cols_to_array):
    """ """
    initial_columns = df.columns
    return (
        df.alias("train")
        .join(
            # read the eeg data from parquet files
            spark.read.parquet(*files_paths, schema=schema)
            .withColumn(
                "spectrogram_id", regexp_extract(input_file_name(), r"(\d+).parquet", 1)
            )
            .selectExpr(
                # "." in column name does not help, "." will be replaced with "__"
                *(
                    ["spectrogram_id", "time"]
                    + [
                        f"`{column.replace('__', '.')}` as {column}"
                        for column in cols_to_array
                    ]
                ),
            )
            .na.fill(0, subset=cols_to_array)
            .groupBy("spectrogram_id")
            # collect all eeg data into a single array
            .agg(
                *(
                    [
                        collect_list(f"`{column}`").alias(f"{column}")
                        for column in cols_to_array
                    ]
                )
            )
            # join the spectrogram to the train/test data
            .alias("spectrograms"),
            "spectrogram_id",
            "inner",
        )
        .withColumns(  # slice the eeg data to 600 seconds, starting from ..._offset_seconds
            {
                f"{column.replace('.', '__')}": slice(
                    col(f"`{column.replace('.', '__')}`"),
                    col("spectrogram_label_offset_seconds") + 1,
                    600,
                )
                for column in cols_to_array
                if "spectrogram_label_offset_seconds" in initial_columns
            }
        )
        .withColumns(  # apply tranformation to the spectrogram data
            {
                column: to_vector_udf(stats_summary_udf(f"{column}"))
                for column in cols_to_array
            }
        )
    )

In [19]:
df_train_spectrograms = load_spectrograms(
    df_train_eegs,
    train_spectro_files_paths,
    schema=spectrogram_schema,
    cols_to_array=spectrogram_columns_data_dot,
)

if "eval" in FLAGS:
    df_eval_spectrograms = load_spectrograms(
        df_eval_eegs,
        eval_spectro_files_paths,
        schema=spectrogram_schema,
        cols_to_array=spectrogram_columns_data_dot,
    )

                                                                                

### Execution plan - new the query makes sense :-)

In [20]:
df_train_spectrograms.explain("formatted")

== Physical Plan ==
AdaptiveSparkPlan (35)
+- Project (34)
   +- BatchEvalPython (33)
      +- Project (32)
         +- SortMergeJoin Inner (31)
            :- Sort (22)
            :  +- Exchange (21)
            :     +- Project (20)
            :        +- BatchEvalPython (19)
            :           +- Project (18)
            :              +- SortMergeJoin Inner (17)
            :                 :- Sort (9)
            :                 :  +- Exchange (8)
            :                 :     +- Filter (7)
            :                 :        +- Sample (6)
            :                 :           +- Sort (5)
            :                 :              +- Sample (4)
            :                 :                 +- Project (3)
            :                 :                    +- Filter (2)
            :                 :                       +- Scan csv  (1)
            :                 +- Sort (16)
            :                    +- Exchange (15)
            :            

24/03/28 09:37:26 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


## Model

In [21]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

### Model the brain problem

In [22]:
# Convert string labels to numeric using StringIndexer
label_indexer = StringIndexer(inputCol="label", outputCol="indexed_label")

inputCols = eeg_columns_data + spectrogram_columns_data_dot
# VectorAssembler for the discrete value and vectorized arrays
vector_assembler = VectorAssembler(
    inputCols=inputCols,
    outputCol="feature_vector",
)

# Normalize features using StandardScaler
standard_scaler = StandardScaler(
    inputCol="feature_vector",
    outputCol="normalized_features",
    withMean=True,
    withStd=True,
)

# RandomForestClassifier
random_forest_classifier = RandomForestClassifier(
    featuresCol="feature_vector",
    labelCol="indexed_label",
    numTrees=16,#255
    maxDepth=8,#30
    maxBins=16,#32
    bootstrap=True,
    minInstancesPerNode=1,
    minInfoGain=0.0,
    subsamplingRate=1.0,
    featureSubsetStrategy="auto",
    seed=experiment_params.get("seed", 42),
)

# Create a pipeline
pipeline = Pipeline(
    stages=[
        label_indexer,
        # my_custom_transformer,
        vector_assembler,
        standard_scaler,
        # dt_classifier,
        # logistic_regression,
        # gbt_classifier,
        random_forest_classifier,
    ]
)


# Fit the pipeline to the DataFrame
model = pipeline.fit(df_train_spectrograms)  # df_train_eegs

----------------------------------------                         (13 + 23) / 36]
Exception occurred during processing of request from ('127.0.0.1', 55330)
Traceback (most recent call last):
  File "/sw/pkgs/arc/python3.10-anaconda/2023.03/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/sw/pkgs/arc/python3.10-anaconda/2023.03/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/sw/pkgs/arc/python3.10-anaconda/2023.03/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/sw/pkgs/arc/python3.10-anaconda/2023.03/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/home/sohamdas/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/home/sohamdas/.local/lib/python3.10/site-packages/p

Py4JError: An error occurred while calling o1969.fit

## Evaluation

In [None]:
if "eval" in FLAGS:

    predictions = model.transform(df_eval_spectrograms)

    # Evaluate the model using MulticlassClassificationEvaluator:
    evaluator = MulticlassClassificationEvaluator(
        labelCol="indexed_label", predictionCol="prediction", metricName="accuracy"
    )
    accuracy = evaluator.evaluate(predictions)

    print(f"Accuracy: {accuracy}")

## Predict & Submit

### Load Test & EEGs

In [None]:
df_test = (
    spark.read.csv(os.path.join(root, "test.csv"), header=True, schema=test_schema)
    # cast offest to integer
    # .withColumn(
    #     "eeg_label_offset_seconds",
    #     col("eeg_label_offset_seconds").cast("integer"),
    # )
    # label
    # .withColumn("label", col("expert_consensus"))
    # percent of votes for each label for model evaluation,necesary on 'eval' flag
    # .withColumns(
    #     {
    #         f"{column}_actual": expr(f"{column} / {sum_of_votes_expr}")
    #         for column in votes_columns
    #     },
    # )
)

In [None]:
test_eegs_files_paths = get_parquet_files(
    df_test.select(col("eeg_id").alias("id")), folder="test_eegs", root=root
)
test_spectro_files_paths = get_parquet_files(
    df_test.select(col("spectrogram_id").alias("id")),
    folder="test_spectrograms",
    root=root,
)

df_test_eegs = load_eegs(df_test, test_eegs_files_paths, eeg_schema, eeg_columns_data)
df_test_spectrograms = load_spectrograms(
    df_test_eegs,
    test_spectro_files_paths,
    schema=spectrogram_schema,
    cols_to_array=spectrogram_columns_data_dot,
)
print(f"Test eegs paths: {test_eegs_files_paths[:3]} ... ")

In [None]:
end = time.time()
print("sample fraction =",sample_frac)
print((end-start)/60)

### Predict & Submit

In [None]:
# Make predictions
predictions = model.transform(df_test_spectrograms)

# Extract individual probability values into separate columns
labels = [
    x.lower() for x in model.stages[0].labels
]  # Get labels from the StringIndexer

exprs = [
    vector_to_array("probability")[i].alias(f"{labels[i]}_vote")
    for i in range(len(labels))
]

# Show the predictions, including probabilities for each class
predictions.select(
    "eeg_id",
    *exprs,
).toPandas().to_csv("submission.csv", index=False)