#Install Apache Spark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://downloads.apache.org/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install scikit-plot

#Init stuff of any spark code

In [None]:
# INIT STUFF OF ANY SPARK CODE
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

import findspark
findspark.init()

import pyspark 
conf = pyspark.SparkConf().setMaster('local[4]').setAppName('Naive-Kernel')
sc = pyspark.SparkContext( conf=conf )

In [None]:
#Import for data analysis and drive
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
import numpy as np
import pandas as pd
from scipy.spatial.distance import euclidean
from sklearn.metrics import classification_report
import matplotlib.pyplot as plt
import seaborn as sns
import scikitplot as skplt
 # DRIVE CONNECTION
from google.colab import drive
drive.mount('/content/drive/')
os.chdir("/content/drive/My Drive/SDA")

In [None]:
# DATASET IMPORT (BREAST CANCER)
dataset = pd.read_csv("breast_cancer.csv")
filter = [c for c in dataset.columns if c != 'id' and c!= "Unnamed: 32"]
dataset = dataset[filter].dropna()
dataset = pd.get_dummies(dataset)
filter = [c for c in dataset.columns if c != "diagnosis_M"]
dataset = dataset[filter]
columns = [c for c in dataset.columns if c != "diagnosis_B"]
X = dataset[columns]
y = dataset['diagnosis_B']
'''
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
D_train = np.column_stack((y_train,X_train))
if type(X_test) != np.ndarray:
  X_test = X_test.to_numpy()
RDD = sc.parallelize(D_train).cache()
'''

In [None]:
#Data with correlation <.6
columns = ['radius_mean','smoothness_mean','symmetry_mean','fractal_dimension_mean', 'texture_se', 'smoothness_se', 'compactness_se','symmetry_se', 'texture_worst']
X = dataset[columns]
y = dataset['diagnosis_B']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
D_train = np.column_stack((y_train,X_train))
if type(X_test) != np.ndarray:
  X_test = X_test.to_numpy()
RDD = sc.parallelize(D_train).cache()

In [None]:
#Data with correlation <0.3
columns=['radius_mean','smoothness_mean', 'texture_se', 'fractal_dimension_se']
X = dataset[columns]
y = dataset['diagnosis_B']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
D_train = np.column_stack((y_train,X_train))
if type(X_test) != np.ndarray:
  X_test = X_test.to_numpy()
RDD = sc.parallelize(D_train).cache()

In [None]:
#Correlation Matrix
print(dataset)
data_corr = dataset.iloc[:,:-1].corr()
plt.figure(figsize=(18,12))

sns.heatmap(data_corr,center=0,xticklabels=True,yticklabels=True,square=True,cmap='coolwarm')
plt.title("Correlazione tra le  variabili")
plt.savefig('corr_all.png')
plt.show()

In [None]:
#check correlation between variables first 10 variables
data_set = dataset.iloc[:,:10]
data_corr = data_set.corr()
plt.figure(figsize=(9,6))
sns.set()
sns.heatmap(data_corr, square=True,annot = True,center=0,fmt='.2f',cmap='coolwarm')
plt.title("Correlazione tra le prime 10 variabili")
plt.savefig('first_10.png')
plt.show()


#check correlation between variables second 10 variables
data_set = dataset.iloc[:,10:20]
data_corr = data_set.corr()
plt.figure(figsize=(9,6))
sns.set()
sns.heatmap(data_corr, square=True,annot = True,center=0,fmt='.2f',cmap='coolwarm')
plt.title("Correlazione tra le seconde 10 variabili")
plt.savefig('second_10.png')
plt.show()


#check correlation between variables third 10 variables
data_set = dataset.iloc[:,20:30]
data_corr = data_set.corr()
plt.figure(figsize=(9,6))
sns.set()
sns.heatmap(data_corr, square=True,annot = True,center=0,fmt='.2f',cmap='coolwarm')
plt.title("Correlazione tra le terze 10 variabili")
plt.savefig('third_10.png')
plt.show()

In [None]:
#check correlation between variables with a correlation < 0.6
data_set = dataset.loc[:,['radius_mean','smoothness_mean','symmetry_mean','fractal_dimension_mean', 'texture_se', 'smoothness_se', 'compactness_se','symmetry_se', 'texture_worst']]
data_corr = data_set.corr()
plt.figure(figsize=(9,6))
sns.set()
sns.heatmap(data_corr, square=True,annot = True,center=0,fmt='.2f',cmap='coolwarm')
plt.title("Correlazione tra le variabili con soglia inferiore a 0.6")
plt.savefig('0.6soglia.png')
plt.show()


#check correlation between variables with a correlation < 0.3
data_set = dataset.loc[:,['radius_mean','smoothness_mean', 'texture_se', 'fractal_dimension_se']]
data_corr = data_set.corr()
plt.figure(figsize=(9,6))
sns.set()
sns.heatmap(data_corr, square=True,annot = True,center=0,fmt='.2f',cmap='coolwarm')
plt.title("Correlazione tra le variabili con soglia inferiore a 0.3")
plt.savefig('0.3soglia.png')
plt.show()

In [None]:
##Function to compute classification 
def is_inside(single_partition):
  return [s[0] for s in single_partition if euclidean(x_br.value,s[1:]) <= radius_br.value]

In [None]:
#Check performance with all predictors
radiuses = [0.1,0.5,1,5,10,50,100,500,10**3]
accuracies=[]
for radius in radiuses:
  print('radius ',radius)
  radius_br = sc.broadcast(radius)
  YY_estimated = []
  for x_test in X_test:
    x_br = sc.broadcast(x_test)
    NK = RDD.mapPartitions(is_inside).collect()
    if len(NK)!= 0:
      YY_estimated.append(1 if np.average(NK)>0.5 else 0)
    else:
      YY_estimated.append(0)
  d = classification_report(y_test, YY_estimated,target_names=['malignant','benignant'],output_dict=True)
  accuracies.append(d['accuracy'])
  print(classification_report(y_test, YY_estimated,target_names=['malignant','benignant']))

y_coord= max(accuracies)
x_coord = radiuses[accuracies.index(y_coord)]
plt.plot(radiuses,accuracies,label='accuracy',marker='o')
plt.xticks([i for i in range(0,1000,100)])
plt.xlabel('Raggio dell\'ipersfera')
plt.ylabel('Accuracy')
plt.annotate('({},{})'.format(x_coord,round(y_coord,2)), xy=(x_coord, y_coord), xytext=(x_coord+50,y_coord+0.1),
             arrowprops=dict(facecolor='black', shrink=0.05),
             )
plt.legend()
plt.title('Accuracy al variare del raggio')
plt.savefig('first_instance_accuracy.png')
plt.show()


In [None]:
#Zoom (35 predictors)
radiuses = [i+0.5 for i in range(30,71)]
sensitivities = []
specificities = []
accuracies=[]
i = 30.5
for radius in radiuses:
  print('Radius value: ',i)
  radius_br = sc.broadcast(radius)
  YY_estimated = []
  for x_test in X_test:
    x_br = sc.broadcast(x_test)
    NK = RDD.mapPartitions(is_inside).collect()
    #print(NK)
    if len(NK)!= 0:
      YY_estimated.append(1 if np.average(NK)>0.5 else 0)
    else:
      YY_estimated.append(0)
  d = classification_report(y_test, YY_estimated,target_names=['malignant','benignant'],output_dict=True)
  sensitivities.append(d['benignant']['recall'])
  specificities.append(d['malignant']['recall'])
  accuracies.append(d['accuracy'])
  print(classification_report(y_test, YY_estimated,target_names=['malignant','benignant']))
  i+=1
plt.plot(radiuses,accuracies,label='accuracy')
plt.plot(radiuses,sensitivities,label='sensitivity')
plt.plot(radiuses,specificities,label='specificity')
plt.yticks(np.linspace(0.4,1,7))
plt.ylabel('Performance')
plt.xlabel('Raggio dell\'ipersfera')
plt.title('Accuracy,Sensitivity e Specificity in funzione dell\'iperraggio')
plt.legend(loc='best')
plt.savefig('second_instance.png')
plt.show()

In [None]:
#Best performance and confusion matrix
radius_br = sc.broadcast(49.5)
YY_estimated = []
for x_test in X_test:
  x_br = sc.broadcast(x_test)
  NK = RDD.mapPartitions(is_inside).collect()
  #print(NK)
  if len(NK)!= 0:
    YY_estimated.append(1 if np.average(NK)>0.5 else 0)
  else:
    YY_estimated.append(0)
print(classification_report(y_test, YY_estimated,target_names=['malignant','benignant']))
skplt.metrics.plot_confusion_matrix(
    y_test, 
    YY_estimated)
plt.title('Matrice di confusione')
plt.xlabel('Classe predetta')
plt.ylabel('Classe vera')
plt.savefig('confusion_matrix.png')
plt.show()

In [None]:
#Predictor correlation <0.6
radiuses = [0.1,0.5,1,5,10,50,100,500,10**3]
accuracies=[]
for radius in radiuses:
  print('radius ',radius)
  radius_br = sc.broadcast(radius)
  YY_estimated = []
  for x_test in X_test:
    x_br = sc.broadcast(x_test)
    NK = RDD.mapPartitions(is_inside).collect()
    if len(NK)!= 0:
      YY_estimated.append(1 if np.average(NK)>0.5 else 0)
    else:
      YY_estimated.append(0)
  d = classification_report(y_test, YY_estimated,target_names=['malignant','benignant'],output_dict=True)
  accuracies.append(d['accuracy'])
  print(classification_report(y_test, YY_estimated,target_names=['malignant','benignant']))


plt.plot(radiuses,accuracies,label='accuracy',marker='o')
plt.title('Accuracy in funzione dell\'iperragio \n(correlazione minore di 0.6)')
plt.xticks([i for i in range(0,1000,100)])
plt.yticks(np.linspace(0.4,1,7))
plt.xlabel('Raggio dell\'ipersfera')
plt.ylabel('Accuracy')
y_coord= max(accuracies)
x_coord = radiuses[accuracies.index(y_coord)]
plt.annotate('({},{})'.format(x_coord,round(y_coord,2)), xy=(x_coord, y_coord), xytext=(x_coord+50,y_coord+0.1),
             arrowprops=dict(facecolor='black', shrink=0.05),
             )
plt.legend(loc='best')
plt.savefig('accuracy_0.6_features.png')
plt.show()

In [None]:
#Zoom (9 predictors)
radiuses = [i for i in range(1,11)]
sensitivities = []
specificities = []
accuracies=[]

for radius in radiuses:
  print('Radius value: ',radius)
  radius_br = sc.broadcast(radius)
  YY_estimated = []
  for x_test in X_test:
    x_br = sc.broadcast(x_test)
    NK = RDD.mapPartitions(is_inside).collect()
    #print(NK)
    if len(NK)!= 0:
      YY_estimated.append(1 if np.average(NK)>0.5 else 0)
    else:
      YY_estimated.append(0)
  d = classification_report(y_test, YY_estimated,target_names=['malignant','benignant'],output_dict=True)
  sensitivities.append(d['benignant']['recall'])
  specificities.append(d['malignant']['recall'])
  accuracies.append(d['accuracy'])
  print(classification_report(y_test, YY_estimated,target_names=['malignant','benignant']))
plt.plot(radiuses,accuracies,label='accuracy')
plt.plot(radiuses,sensitivities,label='sensitivity')
plt.plot(radiuses,specificities,label='specificity')
plt.ylabel('Performance')
plt.xlabel('Raggio dell\'ipersfera')
plt.yticks(np.linspace(0.4,1,7))
plt.title('Accuracy,Sensitivity e Specificity in funzione dell\'iperraggio\n (correlazione minore di 0.6)')
plt.legend(loc='best')
plt.savefig('acc_sens_spec_0.6.png')
plt.show()

In [None]:
#Best Performance and confusion Matrix
radius_br = sc.broadcast(2)
YY_estimated = []
for x_test in X_test:
  x_br = sc.broadcast(x_test)
  NK = RDD.mapPartitions(is_inside).collect()
  #print(NK)
  if len(NK)!= 0:
    YY_estimated.append(1 if np.average(NK)>0.5 else 0)
  else:
    YY_estimated.append(0)
print(classification_report(y_test, YY_estimated,target_names=['malignant','benignant']))
skplt.metrics.plot_confusion_matrix(
    y_test, 
    YY_estimated)
plt.title('Matrice di confusione')
plt.xlabel('Classe predetta')
plt.ylabel('Classe vera')
plt.savefig('confusion_matrix_06.png')
plt.show()

In [None]:
# Predictors correlation <.3
radiuses = [0.1,0.5,1,5,10,50,100,500,10**3]
accuracies=[]
for radius in radiuses:
  print('radius ',radius)
  radius_br = sc.broadcast(radius)
  YY_estimated = []
  for x_test in X_test:
    x_br = sc.broadcast(x_test)
    NK = RDD.mapPartitions(is_inside).collect()
    if len(NK)!= 0:
      YY_estimated.append(1 if np.average(NK)>0.5 else 0)
    else:
      YY_estimated.append(0)
  d = classification_report(y_test, YY_estimated,target_names=['malignant','benignant'],output_dict=True)
  accuracies.append(d['accuracy'])
  print(classification_report(y_test, YY_estimated,target_names=['malignant','benignant']))


plt.plot(radiuses,accuracies,label='accuracy',marker='o')
plt.title('Accuracy in funzione dell\'iperragio \n(correlazione i minore di 0.3)')
plt.xticks([i for i in range(0,1000,100)])
plt.xlabel('Raggio dell\'ipersfera')
plt.ylabel('Accuracy')
y_coord= max(accuracies)
x_coord = radiuses[accuracies.index(y_coord)]
plt.annotate('({},{})'.format(x_coord,round(y_coord,2)), xy=(x_coord, y_coord), xytext=(x_coord+70,y_coord+0.05),
             arrowprops=dict(facecolor='black', shrink=0.05),
             )
plt.legend(loc='best')
plt.yticks(np.linspace(0.4,1,7))
plt.savefig('accuracy_03.png')
plt.show()


In [None]:
#Zoom (4 predictors)
radiuses = np.arange(0.5,5.5,0.5)
sensitivities = []
specificities = []
accuracies=[]

for radius in radiuses:
  print('Radius value: ',radius)
  radius_br = sc.broadcast(radius)
  YY_estimated = []
  for x_test in X_test:
    x_br = sc.broadcast(x_test)
    NK = RDD.mapPartitions(is_inside).collect()
    #print(NK)
    if len(NK)!= 0:
      YY_estimated.append(1 if np.average(NK)>0.5 else 0)
    else:
      YY_estimated.append(0)
  d = classification_report(y_test, YY_estimated,target_names=['malignant','benignant'],output_dict=True)
  sensitivities.append(d['benignant']['recall'])
  specificities.append(d['malignant']['recall'])
  accuracies.append(d['accuracy'])
  print(classification_report(y_test, YY_estimated,target_names=['malignant','benignant']))
plt.plot(radiuses,accuracies,label='accuracy')
plt.plot(radiuses,sensitivities,label='sensitivity')
plt.plot(radiuses,specificities,label='specificity')
plt.ylabel('Performance')
plt.xlabel('Raggio dell\'ipersfera')
plt.yticks(np.linspace(0.4,1,7))
plt.title('Accuracy,Sensitivity e Specificity in funzione dell\'iperraggio \n (correlazione minore di 0.3)')
plt.legend(loc='best')
plt.savefig('acc_sens_spec_0.3.png')
plt.show()

In [None]:
#Best performance and correlation matrix
radius_br = sc.broadcast(0.5)
YY_estimated = []
for x_test in X_test:
  x_br = sc.broadcast(x_test)
  NK = RDD.mapPartitions(is_inside).collect()
  #print(NK)
  if len(NK)!= 0:
    YY_estimated.append(1 if np.average(NK)>0.5 else 0)
  else:
    YY_estimated.append(0)
print(classification_report(y_test, YY_estimated,target_names=['malignant','benignant']))
skplt.metrics.plot_confusion_matrix(
    y_test, 
    YY_estimated)
plt.title('Matrice di confusione')
plt.xlabel('Classe predetta')
plt.ylabel('Classe vera')
plt.savefig('confusion_matrix_03.png')
plt.show()

In [None]:
#PCA analysis
from sklearn.decomposition import PCA

from sklearn.preprocessing import StandardScaler
dataset = pd.read_csv("breast_cancer.csv")
filter = [c for c in dataset.columns if c != 'id' and c!= "Unnamed: 32"]
dataset = dataset[filter].dropna()
dataset = pd.get_dummies(dataset)
filter = [c for c in dataset.columns if c != "diagnosis_M"]
dataset = dataset[filter]
columns = [c for c in dataset.columns if c != "diagnosis_B"]
X = dataset[columns]
scaler = StandardScaler()
pca = PCA()
X_pca = pca.fit_transform(scaler.fit_transform(X))
X = X_pca[:, 0]
y = dataset['diagnosis_B']
# TRAIN/TEST SPLIT
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
D_train = np.column_stack((y_train,X_train))
if type(X_test) != np.ndarray:
  X_test = X_test.to_numpy()
RDD = sc.parallelize(D_train).cache()
radiuses = [0.1,0.5,1,5,10,50,100,500,10**3]
accuracies=[]
for radius in radiuses:
  print('radius ',radius)
  radius_br = sc.broadcast(radius)
  YY_estimated = []
  for x_test in X_test:
    x_br = sc.broadcast(x_test)
    NK = RDD.mapPartitions(is_inside).collect()
    if len(NK)!= 0:
      YY_estimated.append(1 if np.average(NK)>0.5 else 0)
    else:
      YY_estimated.append(0)
  d = classification_report(y_test, YY_estimated,target_names=['malignant','benignant'],output_dict=True)
  accuracies.append(d['accuracy'])
  print(classification_report(y_test, YY_estimated,target_names=['malignant','benignant']))


plt.plot(radiuses,accuracies,label='accuracy',marker='o')
plt.title('Accuracy in funzione dell\'iperragio \n(analisi PCA)')
plt.xticks([i for i in range(0,1000,100)])
plt.yticks(np.linspace(0.4,1,7))
plt.xlabel('Raggio dell\'ipersfera')
plt.ylabel('Accuracy')
y_coord= max(accuracies)
x_coord = radiuses[accuracies.index(y_coord)]
plt.annotate('({},{})'.format(x_coord,round(y_coord,2)), xy=(x_coord, y_coord), xytext=(x_coord+90,y_coord),
             arrowprops=dict(facecolor='black', shrink=0.05),
             )
plt.legend(loc='best')
plt.savefig('accuracy_PCA_features.png')
plt.show()

In [None]:
#Zoom in
radiuses = np.arange(0.05,2.5,0.05)
sensitivities = []
specificities = []
accuracies=[]

for radius in radiuses:
  print('Radius value: ',radius)
  radius_br = sc.broadcast(radius)
  YY_estimated = []
  for x_test in X_test:
    x_br = sc.broadcast(x_test)
    NK = RDD.mapPartitions(is_inside).collect()
    #print(NK)
    if len(NK)!= 0:
      YY_estimated.append(1 if np.average(NK)>0.5 else 0)
    else:
      YY_estimated.append(0)
  d = classification_report(y_test, YY_estimated,target_names=['malignant','benignant'],output_dict=True)
  sensitivities.append(d['benignant']['recall'])
  specificities.append(d['malignant']['recall'])
  accuracies.append(d['accuracy'])
  print(classification_report(y_test, YY_estimated,target_names=['malignant','benignant']))
plt.plot(radiuses,accuracies,label='accuracy')
plt.plot(radiuses,sensitivities,label='sensitivity')
plt.plot(radiuses,specificities,label='specificity')
plt.ylabel('Performance')
plt.xlabel('Raggio dell\'ipersfera')
plt.xticks(np.arange(0.05,2.5,0.2))
plt.yticks(np.linspace(0.4,1,7))
plt.title('Accuracy,Sensitivity e Specificity in funzione dell\'iperraggio \n (analisi PCA)')
plt.legend(loc='best')
plt.savefig('acc_sens_spec_PCA.png')
plt.show()

In [None]:
#Best Performance and confusion matrix
radius_br = sc.broadcast(0.3)
YY_estimated = []
for x_test in X_test:
  x_br = sc.broadcast(x_test)
  NK = RDD.mapPartitions(is_inside).collect()
  #print(NK)
  if len(NK)!= 0:
    YY_estimated.append(1 if np.average(NK)>0.5 else 0)
  else:
    YY_estimated.append(0)
print(classification_report(y_test, YY_estimated,target_names=['malignant','benignant']))
skplt.metrics.plot_confusion_matrix(
    y_test, 
    YY_estimated)
plt.title('Matrice di confusione')
plt.xlabel('Classe predetta')
plt.ylabel('Classe vera')
plt.savefig('confusion_matrix_PCA.png')
plt.show()

In [None]:
#PCA visualization
radius_br = sc.broadcast(0.3)
YY_estimated = []
for x_test in  np.linspace(-5.5,13,100):
    x_br = sc.broadcast(x_test)
    NK = RDD.mapPartitions(is_inside).collect()
    #print(NK)
    if len(NK)!= 0:
      YY_estimated.append([1]*30 if np.average(NK)>0.5 else [0]*30)
    else:
      YY_estimated.append([0]*30)

plt.figure(figsize=(9,6))
plt.ylim((-0.2,1.2))
ax = plt.gca()
xlim = ax.get_xlim()
ylim = ax.get_ylim()
yy, xx = np.meshgrid(np.linspace(ylim[0], ylim[1],30), np.linspace(-5.5,13,100))
ax.contourf(xx, yy, YY_estimated,25, cmap="RdBu", alpha=0.6, levels=[0,0.1,0.2,0.4,0.5,0.6,0.8,0.9,1], zorder=0)
# ax.contour(y_pred)
 
plt.scatter(X_test, y_test, color = 'black')
plt.savefig('PCA_visualisation.png')
plt.show()



In [None]:
from sklearn.decomposition import PCA

from sklearn.preprocessing import StandardScaler
seeds=[42,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19]
performance = list()
dataset = pd.read_csv("breast_cancer.csv")
filter = [c for c in dataset.columns if c != 'id' and c!= "Unnamed: 32"]
dataset = dataset[filter].dropna()
dataset = pd.get_dummies(dataset)
filter = [c for c in dataset.columns if c != "diagnosis_M"]
dataset = dataset[filter]
columns = [c for c in dataset.columns if c != "diagnosis_B"]
X = dataset[columns]
scaler = StandardScaler()
pca = PCA()
X_pca = pca.fit_transform(scaler.fit_transform(X))
X = X_pca[:, 0]
y = dataset['diagnosis_B']
i=1
for seed in seeds:
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=seed)
  print('seed numero: ',i)
  i+=1
  D_train = np.column_stack((y_train,X_train))
  if type(X_test) != np.ndarray:
    X_test = X_test.to_numpy()
  RDD = sc.parallelize(D_train).cache()
  radius_br = sc.broadcast(.3)
  YY_estimated = []
  for x_test in X_test:
    x_br = sc.broadcast(x_test)
    NK = RDD.mapPartitions(is_inside).collect()
    #print(NK)
    if len(NK)!= 0:
      YY_estimated.append(1 if np.average(NK)>0.5 else 0)
    else:
      YY_estimated.append(0)
  d= classification_report(y_test, YY_estimated,target_names=['malignant','benignant'],output_dict=True)
  performance.append(d['accuracy'])
  #print(classification_report(y_test, YY_estimated,target_names=['malignant','benignant']))
for p in performance:
  print('-',round(p,2),sep="")
