In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('classification').getOrCreate()

In [2]:
cancer_df= spark.read.csv("data.csv",inferSchema=True, header=True)
cancer_df.columns

['id',
 'diagnosis',
 'radius_mean',
 'texture_mean',
 'perimeter_mean',
 'area_mean',
 'smoothness_mean',
 'compactness_mean',
 'concavity_mean',
 'concave points_mean',
 'symmetry_mean',
 'fractal_dimension_mean',
 'radius_se',
 'texture_se',
 'perimeter_se',
 'area_se',
 'smoothness_se',
 'compactness_se',
 'concavity_se',
 'concave points_se',
 'symmetry_se',
 'fractal_dimension_se',
 'radius_worst',
 'texture_worst',
 'perimeter_worst',
 'area_worst',
 'smoothness_worst',
 'compactness_worst',
 'concavity_worst',
 'concave points_worst',
 'symmetry_worst',
 'fractal_dimension_worst',
 '_c32']

In [3]:
len(cancer_df.columns)

33

In [4]:
cancer_df.select("diagnosis").distinct().show(7)

+---------+
|diagnosis|
+---------+
|        M|
|        B|
+---------+



In [5]:
##Preprocessing dei dati

In [6]:
features_cols=cancer_df.columns
features_cols.remove("id")
features_cols.remove("diagnosis")
features_cols.remove("_c32")

In [7]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=features_cols,outputCol="features")
data_df= assembler.transform(cancer_df)

In [8]:
data_df.select("features").show(5,False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                            |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[17.99,10.38,122.8,1001.0,0.1184,0.2776,0.3001,0.1471,0.2419,0.07871,1.095,0.9053,8.589,153.4,0.006399,0.04904,0.05373,0.01587,0.03003,0.006193,25.38,17.33,184.6,2019.0,0.1622,0.6656,0.7119,0.2654,0.4601,0.1189] |
|[20.57,17.77,132.9,1326.0,0.08474,0.07864,0.0869,0.07017,0.1812,0.05667,0.5435,0.7339,3.398,74.08,0.005225,0.01308,0.0186,0.0134,0.01389,0.

In [9]:
##Applichiamo la standardizzazione che trasforma i dati creando una distribuzione normale

In [10]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(data_df)
data_df =scaler_model.transform(data_df)

In [11]:
data_df.select("scaled_features").show(5,False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|scaled_features                                                                                                                                                                                                                                                                                                                                                                                                                                          

In [12]:
##Codifichiamo il target (mappiamo maligno e benigno )

In [13]:
from pyspark.sql.functions import when,col
data_df= data_df.withColumn("label", when(col("diagnosis")=='M',1).otherwise(0))

In [14]:
data_df.select(["label","diagnosis"]).show(20)

+-----+---------+
|label|diagnosis|
+-----+---------+
|    1|        M|
|    1|        M|
|    1|        M|
|    1|        M|
|    1|        M|
|    1|        M|
|    1|        M|
|    1|        M|
|    1|        M|
|    1|        M|
|    1|        M|
|    1|        M|
|    1|        M|
|    1|        M|
|    1|        M|
|    1|        M|
|    1|        M|
|    1|        M|
|    1|        M|
|    0|        B|
+-----+---------+
only showing top 20 rows



In [15]:
##creazione dataframe di addestramento e di test

In [16]:
train_df, test_df = data_df.randomSplit([0.7,0.3],seed=0)

In [17]:
train_df.count()

397

In [18]:
test_df.count()

172

In [19]:
##creazione del modello

In [20]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='scaled_features', labelCol="label")
model = lr.fit(train_df)

In [21]:
## valutiamo il modello

In [22]:
evaluation = model.evaluate(test_df)

In [23]:
## valutiamo l'accuracy

In [24]:
evaluation.accuracy

0.9593023255813954

In [25]:
evaluation.precisionByLabel

[0.9629629629629629, 0.953125]

In [26]:
evaluation.recallByLabel

[0.9719626168224299, 0.9384615384615385]

In [27]:
##testiamo il modello

In [46]:
exams_df= spark.read.csv("exam_results.csv",inferSchema=True, header=True)

In [47]:
new_data_df= assembler.transform(exams_df)

In [48]:
new_data_df= scaler_model.transform(new_data_df)

In [49]:
pred_data_df = model.transform(new_data_df)

In [50]:
pred_data_df.columns

['id',
 'radius_mean',
 'texture_mean',
 'perimeter_mean',
 'area_mean',
 'smoothness_mean',
 'compactness_mean',
 'concavity_mean',
 'concave points_mean',
 'symmetry_mean',
 'fractal_dimension_mean',
 'radius_se',
 'texture_se',
 'perimeter_se',
 'area_se',
 'smoothness_se',
 'compactness_se',
 'concavity_se',
 'concave points_se',
 'symmetry_se',
 'fractal_dimension_se',
 'radius_worst',
 'texture_worst',
 'perimeter_worst',
 'area_worst',
 'smoothness_worst',
 'compactness_worst',
 'concavity_worst',
 'concave points_worst',
 'symmetry_worst',
 'fractal_dimension_worst',
 'features',
 'scaled_features',
 'rawPrediction',
 'probability',
 'prediction']

In [51]:
pred_data_df.select(["id","rawPrediction","probability","prediction"]).show(6, False)

+--------+----------------------------------------+-----------+----------+
|id      |rawPrediction                           |probability|prediction|
+--------+----------------------------------------+-----------+----------+
|150010  |[256248.44161786587,-256248.44161786587]|[1.0,0.0]  |0.0       |
|150011  |[322967.2947435423,-322967.2947435423]  |[1.0,0.0]  |0.0       |
|91594602|[108710.36147982215,-108710.36147982215]|[1.0,0.0]  |0.0       |
|11842302|[-1034019.646648631,1034019.646648631]  |[0.0,1.0]  |1.0       |
+--------+----------------------------------------+-----------+----------+

