# Team information
|S. No|Name|email|
|-|-|-|
|1|Mathews Roy|psymr3@nottingham.ac.uk|
|2|Ewan Ross|psyer1@nottingham.ac.uk|
|3|Soham Talukdar|ppxst3@nottingham.ac.uk|
|4|Srushanth Baride|ppxsb5@nottingham.ac.uk|

# Resources used
[pyspark: Extracting, transforming and selecting features](https://spark.apache.org/docs/latest/ml-features)</br>
[sklearn mutual_info_regression](https://scikit-learn.org/stable/modules/generated/sklearn.feature_selection.mutual_info_regression.html)</br>
[sklearn chi2](https://scikit-learn.org/stable/modules/generated/sklearn.feature_selection.chi2.html?highlight=chi2#sklearn.feature_selection.chi2)

# Data processing & plot libraries

In [None]:
import time
import pandas as pd
from sklearn import metrics
from datetime import timedelta
import matplotlib.pyplot as plt

# pyspark ML libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml.feature import ChiSqSelector, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier as pyspark_DecisionTreeClassifier

# sklearn ML libraries

In [None]:
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.feature_selection import SelectKBest, chi2, mutual_info_regression
from sklearn.tree import DecisionTreeClassifier as sklearn_DecisionTreeClassifier

In [None]:
# Creating a new spark session
spark = SparkSession.builder.master("local[*]").appName("MLlib lab").getOrCreate()

In [None]:
# Path to the Leukemia csv file
_leukemia_dataset_file = "../Datasets/Leukemia_GSE9476.csv"

In [None]:
'''
Read data from Leukemia csv file
Max columns are set to 22285
Header is set to True as the csv file contains a header
'''
sparkDF = spark.read.option("maxColumns", 22285).csv(_leukemia_dataset_file, header=True)

In [None]:
# Converting spark DataFrame to pandas DataFrame for easy processing
pandasDF = sparkDF.toPandas()
pandasDF.head()

[Curse of Dimensionality](https://en.wikipedia.org/wiki/Curse_of_dimensionality)

In [None]:
# Getting the DataFrame shape
# The data is suffering the 'curse of dimensionality' as the no.of features are exponentially greater than the no.of samples
pandasDF.shape

In [None]:
bone_marrow_type = pandasDF["type"].unique()

In [None]:
# Converting string categorical to numerical
LE = LabelEncoder()
pandasDF['type'] = LE.fit_transform(pandasDF['type'])
pandasDF['type'] = pandasDF['type'].astype('int32')

In [None]:
# Converting pandas DataFrame to spark DataFrame
sparkDF = spark.createDataFrame(pandasDF)

# Converting spark DataFrame columns to float
sparkDF = sparkDF.select(*(F.col(c).cast("float").alias(c) for c in sparkDF.columns))

In [None]:
# Vectorising the spark DataFrame for easier processing
vecAssembler = VectorAssembler(inputCols=sparkDF.columns, outputCol="features")
vector_sparkDF = vecAssembler.transform(sparkDF)

In [None]:
# Setting features split
features_mul = 4000

# spark [ChiSqSelector](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.ChiSqSelector.html)
[ChiSqSelector](https://george-jen.gitbook.io/data-science-and-apache-spark/chisqselector) stands for Chi-Squared feature selection. It operates on labeled data with categorical features. ChiSqSelector uses the Chi-Squared test of independence to decide which features to choose.

In [None]:
'''
Function: spark_ChiSqSelector
INPUT:
------
i: Number of Features to select
_vector_sparkDF: DataFrame from which the Features will be selected

OUTPUT:
-------
1. Model accuracy
2. Execution time for feature selection, building and evaluating the model
'''

def spark_ChiSqSelector(i, _vector_sparkDF):
  # Start time for execution time
  start_time = time.monotonic()
  
  # Selecting the best i features from the entire dataset
  selector = ChiSqSelector(
    numTopFeatures=i, 
    featuresCol="features", 
    outputCol="selectedFeatures", 
    labelCol="type"
  )
  result = selector.fit(_vector_sparkDF).transform(_vector_sparkDF)
  print(f"Top {selector.getNumTopFeatures()} features selected")
  
  # Splitting the data into training & testing
  (train, test) = result.randomSplit([0.7, 0.3])
  
  # Using pyspark DecisionTreeClassifier to define and fit the ML model
  dt = pyspark_DecisionTreeClassifier(labelCol="type", featuresCol="selectedFeatures")
  model = dt.fit(train)
  
  # Make predictions
  predictions = model.transform(test)
  
  # Evaluating the predictions
  evaluator = MulticlassClassificationEvaluator(
    labelCol="type", 
    predictionCol="prediction", 
    metricName="accuracy"
  )
  accuracy = evaluator.evaluate(predictions)
  print(f"Test accuracy = {accuracy}")
  
  # End time for execution time
  end_time = time.monotonic()
  
  # Return accuracy and execution time
  return accuracy, timedelta(seconds=end_time - start_time).total_seconds()
  

# spark [UnivariateFeatureSelector](https://spark.apache.org/docs/latest/ml-features#univariatefeatureselector)
[UnivariateFeatureSelector](https://spark.apache.org/docs/latest/ml-features#univariatefeatureselector) operates on categorical/continuous labels with categorical/continuous features. User can set featureType and labelType, and Spark will pick the score function to use based on the specified featureType and labelType.

In [None]:
'''
Function: spark_UnivariateFeatureSelector
INPUT:
------
i: Number of Features to select
_vector_sparkDF: DataFrame from which the Features will be selected

OUTPUT:
-------
1. Model accuracy
2. Execution time for feature selection, building and evaluating the model
'''

def spark_UnivariateFeatureSelector(i, _vector_sparkDF):
  # Start time for execution time
  start_time = time.monotonic()
  
  # Selecting the best i features from the entire dataset
  selector = UnivariateFeatureSelector(
    featuresCol="features", 
    outputCol="selectedFeatures", 
    labelCol="type", 
    selectionMode="numTopFeatures"
  )
  selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(i)
  result = selector.fit(_vector_sparkDF).transform(_vector_sparkDF)

  print("UnivariateFeatureSelector output with top %d features selected using f_classif" % selector.getSelectionThreshold())
  # result.show()
  
  # Splitting the data into training & testing
  (train, test) = result.randomSplit([0.7, 0.3])
  
  # Using pyspark DecisionTreeClassifier to define and fit the ML model
  dt = pyspark_DecisionTreeClassifier(labelCol="type", featuresCol="selectedFeatures")
  model = dt.fit(train)
  
  # Make predictions
  predictions = model.transform(test)
  
  # Evaluating the predictions
  evaluator = MulticlassClassificationEvaluator(
    labelCol="type", 
    predictionCol="prediction", 
    metricName="accuracy"
  )
  accuracy = evaluator.evaluate(predictions)
  print(f"Test accuracy = {accuracy}")
  
  # End time for execution time
  end_time = time.monotonic()
  
  # Return accuracy and execution time
  return accuracy, timedelta(seconds=end_time - start_time).total_seconds()
  

In [None]:
spark_acc_ChiSqSelector = []
spark_time_ChiSqSelector = []
spark_acc_UnivariateFeatureSelector = []
spark_time_UnivariateFeatureSelector = []

for i in range(2, len(vector_sparkDF.columns), features_mul):
  acc, exec_time = spark_ChiSqSelector(i, vector_sparkDF)
  spark_acc_ChiSqSelector.append(acc)
  spark_time_ChiSqSelector.append(exec_time)
  
  acc, exec_time = spark_UnivariateFeatureSelector(i, vector_sparkDF)
  spark_acc_UnivariateFeatureSelector.append(acc)
  spark_time_UnivariateFeatureSelector.append(exec_time)

# sklearn [chi2](https://scikit-learn.org/stable/modules/generated/sklearn.feature_selection.chi2.html)
[ChiSqSelector](https://george-jen.gitbook.io/data-science-and-apache-spark/chisqselector) stands for Chi-Squared feature selection. It operates on labeled data with categorical features. ChiSqSelector uses the Chi-Squared test of independence to decide which features to choose.

In [None]:
# X is features
X = pandasDF[pandasDF.columns.drop('type')]

# y is labels
y = pandasDF['type']

In [None]:
'''
Function: sklearn_chi2
INPUT:
------
i: Number of Features to select
_X: Features
_y: labels

OUTPUT:
-------
1. Model accuracy
2. Execution time for feature selection, building and evaluating the model
'''

def sklearn_chi2(i, _X, _y):
  # Start time for execution time
  start_time = time.monotonic()
  
  # Selecting the best i features from the entire dataset
  X_new = SelectKBest(chi2, k=i).fit_transform(_X, _y)
  
  # Splitting the data into training & testing
  train, test, train_labels, test_labels = train_test_split(X_new, _y, test_size=0.30, random_state=42)
  
  # Using sklearn DecisionTreeClassifier to define and fit the ML model
  clf = sklearn_DecisionTreeClassifier()
  clf = clf.fit(train, train_labels)
  
  # Make predictions
  predictions = clf.predict(test)
  
  # Evaluating the predictions
  accuracy = metrics.accuracy_score(test_labels, predictions)
  print(f"Test accuracy = {accuracy}")
  
  # End time for execution time
  end_time = time.monotonic()
  
  # Return accuracy and execution time
  return accuracy, timedelta(seconds=end_time - start_time).total_seconds()

# sklearn [mutual_info_regression](https://scikit-learn.org/stable/modules/generated/sklearn.feature_selection.mutual_info_regression.html?highlight=mutual_info_regression#sklearn.feature_selection.mutual_info_regression)
[Estimate mutual information](https://scikit-learn.org/stable/modules/generated/sklearn.feature_selection.mutual_info_regression.html?highlight=mutual_info_regression#sklearn.feature_selection.mutual_info_regression) for a continuous target variable.</br>
Mutual information (MI) between two random variables is a non-negative value, which measures the dependency between the variables. It is equal to zero if and only if two random variables are independent, and higher values mean higher dependency.</br>
The function relies on nonparametric methods based on entropy estimation from k-nearest neighbors distances.

In [None]:
'''
Function: sklearn_mutual_info_regression
INPUT:
------
i: Number of Features to select
_X: Features
_y: labels

OUTPUT:
-------
1. Model accuracy
2. Execution time for feature selection, building and evaluating the model
'''

def sklearn_mutual_info_regression(i, _X, _y):
  # Start time for execution time
  start_time = time.monotonic()
  
  # Selecting the best i features from the entire dataset
  selector = SelectKBest(mutual_info_regression, k=i)
  selector.fit(_X, _y)
  
  X_new = X[X.columns[selector.get_support()]]
  
  # Splitting the data into training & testing
  train, test, train_labels, test_labels = train_test_split(X_new, _y, test_size=0.30, random_state=42)
  
  # Using sklearn DecisionTreeClassifier to define and fit the ML model
  clf = sklearn_DecisionTreeClassifier()
  clf = clf.fit(train, train_labels)
  
  # Make predictions
  predictions = clf.predict(test)
  
  # Evaluating the predictions
  accuracy = metrics.accuracy_score(test_labels, predictions)
  print(f"Test accuracy = {accuracy}")
  
  # End time for execution time
  end_time = time.monotonic()
  
  # Return accuracy and execution time
  return accuracy, timedelta(seconds=end_time - start_time).total_seconds()

In [None]:
sklearn_acc_chi2 = []
sklearn_time_chi2 = []
sklearn_acc_mutual_info_regression = []
sklearn_time_mutual_info_regression = []

for i in range(2, X.columns.size, features_mul):
  acc, exec_time = sklearn_chi2(i, X, y)
  sklearn_acc_chi2.append(acc)
  sklearn_time_chi2.append(exec_time)
  
  acc, exec_time = sklearn_mutual_info_regression(i, X, y)
  sklearn_acc_mutual_info_regression.append(acc)
  sklearn_time_mutual_info_regression.append(exec_time)

# Visualisation

In [None]:
x_axis = list([i for i in range(2, X.columns.size, features_mul)])

In [None]:
plt.plot(x_axis, spark_acc_ChiSqSelector, label='spark_ChiSqSelector')
plt.plot(x_axis, spark_acc_UnivariateFeatureSelector, label='spark_UnivariateFeatureSelector')
plt.plot(x_axis, sklearn_acc_chi2, label='sklearn_chi2')
plt.plot(x_axis, sklearn_acc_mutual_info_regression, label='sklearn_mutual_info_regression')
plt.ylim(0 , 1.1)
plt.title("Features vs Accuracy")
plt.xlabel('Number of Features')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
plt.plot(x_axis, spark_time_ChiSqSelector, label='spark_ChiSqSelector')
plt.plot(x_axis, sklearn_time_chi2, label='sklearn_chi2')
plt.plot(x_axis, spark_time_UnivariateFeatureSelector, label='spark_UnivariateFeatureSelector')
plt.plot(x_axis, sklearn_time_mutual_info_regression, label='sklearn_mutual_info_regression')
plt.title("Features vs Execution Time")
plt.xlabel('Number of Features')
plt.ylabel('Execution Time in Seconds')
plt.legend()
plt.show()

In [None]:
plt.plot(x_axis, spark_acc_ChiSqSelector, label='spark_ChiSqSelector')
plt.plot(x_axis, sklearn_acc_chi2, label='sklearn_chi2')
plt.ylim(0 , 1.1)
plt.title("Features vs Accuracy")
plt.xlabel('Number of Features')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
plt.plot(x_axis, spark_time_ChiSqSelector, label='spark_ChiSqSelector')
plt.plot(x_axis, sklearn_time_chi2, label='sklearn_chi2')
plt.title("Features vs Execution Time")
plt.xlabel('Number of Features')
plt.ylabel('Execution Time in Seconds')
plt.legend()
plt.show()

In [None]:
plt.plot(x_axis, spark_acc_UnivariateFeatureSelector, label='spark_UnivariateFeatureSelector')
plt.plot(x_axis, sklearn_acc_mutual_info_regression, label='sklearn_mutual_info_regression')
plt.ylim(0 , 1.1)
plt.title("Features vs Accuracy")
plt.xlabel('Number of Features')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
plt.plot(x_axis, spark_time_UnivariateFeatureSelector, label='spark_UnivariateFeatureSelector')
plt.plot(x_axis, sklearn_time_mutual_info_regression, label='sklearn_mutual_info_regression')
plt.title("Features vs Execution Time")
plt.xlabel('Number of Features')
plt.ylabel('Execution Time in Seconds')
plt.legend()
plt.show()