# Analysis of CERN Beam Position Dataset

## 4] Big Data Analysis - Full Dataset [Spark]

In [None]:
"""
Machine Specifications:
Processor: Intel-i9 10900K; clocked to 4.7 Ghz; watercooled
Mobo: ASUS ROGSTRX Z490-F
Memory: Corsair Vengeance 32GB 3200 MHz DDR4
SSD:  Samsung 1TB 970 EVO
GPU: EVGA NVIDIA GeForce RTX 3080 XC3 BLACK 10GB
OS: Windows 10 [10.0.19042 Build 19042]
"""

#import required librarys for exploratory data analysis
import os
import numpy as np #version 1.19.2
import pandas as pd #version 1.1.3
import time

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.io as pio
pio.templates.default = "simple_white"

import findspark #version 1.4.2
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [None]:
FILE_LIST = os.listdir(".\data")

In [None]:
def wide_format_ingest(file_count):

    #create a dataframe to store all the ingested data into
    MASTER_INPUT=pd.DataFrame()
    
    for FILE in FILE_LIST[0:file_count]:
        #print the active file for status check
        print(f"Reading file name...{FILE}")
        FILE_PATH="./data/{}".format(FILE)
        
        #read the file
        RAW_INPUT=pd.read_csv(FILE_PATH,delimiter=",")
        
        #extract the file name for the unique experiment log
        RAW_INPUT["experiment"]=FILE[11:34]
        
        #extract the axis for analysis
        RAW_INPUT["axis"]=FILE[34]
        
        #append each file to the master dataframe
        MASTER_INPUT=MASTER_INPUT.append(RAW_INPUT)

    #reset the index to give each row a unique index number
    MASTER_INPUT=MASTER_INPUT.reset_index()
    
    #swap the old index column to be sensor number
    MASTER_INPUT=MASTER_INPUT.rename({"index":"sensor"},axis=1)
 
    #iterate the old columns for turns to a new column with "turn" in the column name
    ORIGINAL_COL_NAMES = MASTER_INPUT.columns[np.arange(1,6601)]
    NEW_COL_NAMES = ["turn_"+str(i) for i in np.arange(1,len(ORIGINAL_COL_NAMES)+1)]
    COLUMN_MAPPER = dict(zip(ORIGINAL_COL_NAMES,NEW_COL_NAMES))
    MASTER_INPUT=MASTER_INPUT.rename(columns=COLUMN_MAPPER)
    
    #use one-hot encoding to split out the axis column from string to numeric in case required
    MASTER_INPUT=pd.get_dummies(MASTER_INPUT,columns=["axis"])    
    
    #set the datatypes for ease of analysis
    MASTER_INPUT=MASTER_INPUT.astype({"axis_x":float,
                                      "axis_y":float,
                                      "sensor":int
                                     })
    
    return MASTER_INPUT

In [None]:
#findspark used to locate spark folder
findspark.init('C:/Users/carlw/spark-3.1.2-bin-hadoop3.2')

#spark context created, with 8/10 cores assigned and memory increased from default to 16/32GB
spark = SparkSession \
    .builder \
    .appName("CERN_Analysis") \
    .master("local[8]") \
    .config("spark.executor.memory", "16G") \
    .config("spark.driver.memory", "16G") \
    .config("spark.driver.maxResultSize", "4G") \
    .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.enabled","true") #arrow enabled to quickly manage the pandas dataframe conversion to spark dataframe

spark #allows access to the SparkUI to monitor activity

In [None]:
%%time
#read in the full dataset
full_dataset_df=wide_format_ingest(66)

#print the dataset shape
print(f"Dataframe shape: {full_dataset_df.shape}")

#examine the memory usage
full_dataset_df.info(memory_usage="deep")

In [None]:
#convert the pandas dataframe to a spark dataframe
cern_dataset = spark.createDataFrame(full_dataset_df)

#use an iterator to create a list of input columns for the vector assembler [6600 of them]
input_cols_for_vector_assembler = ["turn_"+str(i) for i in np.arange(1,6601)]

#create a vector assembler to take the 6600 readings per observation and create a single vector called "features"
assembler = VectorAssembler(
    inputCols=input_cols_for_vector_assembler,
    outputCol="features")

#create an evaluator using silhouette analysis to assess how well the KMeans algo is performing
evaluator = ClusteringEvaluator(
    predictionCol="prediction",
    featuresCol="features",
    metricName="silhouette",
    distanceMeasure="squaredEuclidean")

#create a new column in the spark dataframe with 
assembled_dataset=assembler.transform(cern_dataset)

In [None]:
t0=time.time() #start the timer
KMeans_algo=KMeans(featuresCol="features",k=5,seed=13) #create the model using 5 clusters and the "features column"
KMeans_fit=KMeans_algo.fit(assembled_dataset) #fit the dataset using the KMeans model
output=KMeans_fit.transform(assembled_dataset) #create a results dataframe for post processing
score=evaluator.evaluate(output) #determine the silhouette score of the results
t1=time.time() #stop the timer
total_time=t1-t0 #calculate the time taken

print(f"Silhouette Score: {score:.3f}") #print the silhouette score
print(f"Time to solve: {total_time:.1f}") #print the time taken to solve

In [None]:
#create a results dataframe to store summary statistics from the spark results
SPARK_RESULTS_DF=pd.DataFrame(columns=["cluster", #cluster id
                                       "count", #number of observations
                                       "unique_sensors", #number of unique sensor ID's in the cluster
                                       "x_axis_count", #number of x axis observations in the cluster
                                       "y_axis_count", #number of y axis observations in the cluster
                                       "experiment_count"]) #number of unique experiment ID's in the cluster

for i in range(0,5):
    #status message the computation is starting
    print(f"Analysing cluster: {i+1}")
    
    #start time
    t0=time.time()
    
    #count the number of observations in the cluster
    count=output.filter(output.prediction==i).count()
    
    #count the number of unique sensor ID's in the cluster
    unique_sensors=output.select("sensor").filter(output.prediction==i).distinct().count()
    
    #count the number of x axis observations in the cluster
    x_axis=output.select("axis_x").filter(output.prediction==i).filter(output.axis_x==1).count()
    
    #count the number of y axis observations in the cluster
    y_axis=output.select("axis_y").filter(output.prediction==i).filter(output.axis_y==1).count()
    
    #count the number of unique experiment ID's in the cluster
    experiment=output.select("experiment").filter(output.prediction==i).distinct().count()
    
    #end time
    t1=time.time()    
    
    #append the summary statistics to the summary dataframe
    SPARK_RESULTS_DF=SPARK_RESULTS_DF.append({
        "cluster":i+1,
        "count":count,
        "unique_sensors":unique_sensors,
        "x_axis_count":x_axis,
        "y_axis_count":y_axis,
        "experiment_count":experiment
    },ignore_index=True)
    #status message that the computation is complete
    print(f"Time taken: {t1-t0:.1f}s")
    print("===============================")

In [None]:
SPARK_RESULTS_DF

In [None]:
#examine the sensor number in cluster number 2 [cluster 1 in the 0-4 space]
output.select("sensor").filter(output.prediction==1).distinct().show()

In [None]:
#examine the sensor number in cluster number 4 [cluster 3 in the 0-4 space]
output.select("sensor").filter(output.prediction==3).distinct().show()

In [None]:
#examine which cluster sensor number 526 resides in
output.select("prediction").filter(output.sensor==526).distinct().show()

## 5] Validation

In [None]:
#create a dataframe to store summary statistics for silhouette vs. cluster number analysis
SPARK_VALIDATION_DF=pd.DataFrame(columns=["cluster_no","silhouette_score","time"])

#list of clusters to examine as part of validation
cluster_checks=[5,6,7]

#short loop to iterate over the clusters listed and append to a summary dataframe
for i in cluster_checks:
    t0=time.time()
    KMeans_algo_val=KMeans(featuresCol="features",k=i,seed=13)
    KMeans_fit=KMeans_algo_val.fit(assembled_dataset)
    validation_output=KMeans_fit.transform(assembled_dataset)
    score=evaluator.evaluate(validation_output)
    t1=time.time()
    total_time=t1-t0
    SPARK_VALIDATION_DF=SPARK_VALIDATION_DF.append({
        "cluster_no":i,
        "silhouette_score":score,
        "time":total_time,
    },ignore_index=True)

In [None]:
#create a scatter plot with two y-axes, one for silhouette and one for time taken

validation_plot = make_subplots(specs=[[{"secondary_y":True}]])

validation_plot.add_trace(
    go.Scatter(
        x=SPARK_VALIDATION_DF.cluster_no,
        y=SPARK_VALIDATION_DF.silhouette_score,
        name="Silhouette Score"),
        secondary_y=False)

validation_plot.add_trace(
    go.Scatter(
        x=SPARK_VALIDATION_DF.cluster_no,
        y=SPARK_VALIDATION_DF.time,
        name="Time"),
        secondary_y=True)

validation_plot.update_layout(
    title_text="PySpark Validation:k=5 to 7")

validation_plot.update_xaxes(
    title="k")

validation_plot.update_yaxes(
    title="Silhouette Score [-]",
    secondary_y=False)

validation_plot.update_yaxes(
    title="Time [-]",
    secondary_y=True)

validation_plot.show()

In [None]:
#stop the spark context once all analysis is complete
spark.stop()