In [188]:
import findspark ; findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Classificazione").getOrCreate()

spark

In [189]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator as Tester
import os
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, explode, array, lit
from pyspark.ml.feature import StringIndexer,MinMaxScaler,VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator as BinaryEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression as LogisticRegressor
from pyspark.ml.classification import RandomForestClassifier as RF,NaiveBayes,LinearSVC,MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator as Tester
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [190]:
dname = 'salary.csv'

In [191]:
biglab = os.environ['BIGLAB'] ; datapath = os.path.join(biglab,"datasets")

In [192]:
df = spark.read.csv(os.path.join(datapath,dname), header=True, inferSchema=True)

In [193]:
df.count()

32561

In [194]:
df.dropna()
df.count()

32561

In [195]:
df.show(5)

+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+
|age|        workclass|fnlwgt| education|education-num|     marital-status|        occupation|  relationship|  race|    sex|capital-gain|capital-loss|hours-per-week|native-country|salary|
+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov| 77516| Bachelors|           13|      Never-married|      Adm-clerical| Not-in-family| White|   Male|        2174|           0|            40| United-States| <=50K|
| 50| Self-emp-not-inc| 83311| Bachelors|           13| Married-civ-spouse|   Exec-managerial|       Husband| White|   Male|           0|           0|            13| United-States| <=50K|
| 38|          Private|215646|   HS-grad|            9|     

In [196]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- salary: string (nullable = true)



## PREPARIAMO PER L'INDEX

#### creiamo due variabili, una con le colonne da indicizzare, la seconda con il nome delle variabili indicizzate.   Successivamente le utilizziamo i per crearci la variabile "features" da inserire nell'assembler

In [197]:
variables_to_index = [col for col in list(set(df.columns)) if
                  dict(df.dtypes)[col] == "string"]
variables_indexed=[col+'_I' for col in list(set(df.columns)) if
                  dict(df.dtypes)[col] == "string" and col!='salary']

In [198]:
numeric_columns = [col for col in df.columns if col not in variables_to_index]
features = variables_indexed + numeric_columns

## DEFINIAMO LA PIPELINE

In [199]:
Indexed=StringIndexer(inputCols=variables_to_index, outputCols=[col + "_I" for col in variables_to_index])

In [200]:
assembler= VectorAssembler(inputCols= features,outputCol='features')

In [201]:
scaler= MinMaxScaler(inputCol='features',outputCol='scaledFeatures')

In [202]:
pipeline=Pipeline(stages=[Indexed,assembler,scaler])

In [203]:
Scaled_data=pipeline.fit(df).transform(df).select('scaledFeatures','salary_I')

In [204]:
Scaled_data=Scaled_data.withColumnRenamed('salary_I','target')

## SPLIT DATASET IN TRAINING AND TEST

In [205]:
training_set, test_set = Scaled_data.randomSplit([0.7,0.3], seed=0)

In [206]:
training_set.show(5)

+--------------------+------+
|      scaledFeatures|target|
+--------------------+------+
|(14,[0,1,2,3,4,8,...|   0.0|
|(14,[0,1,2,3,4,8,...|   0.0|
|(14,[0,1,2,3,4,8,...|   0.0|
|(14,[0,1,2,3,4,8,...|   0.0|
|(14,[0,1,2,3,4,8,...|   0.0|
+--------------------+------+
only showing top 5 rows



## LOGISTIC REGRESSION

In [207]:
regressor = LogisticRegressor(featuresCol="scaledFeatures",labelCol='target',threshold=0.27)
model = regressor.fit(training_set)


In [208]:
performance = model.evaluate(test_set)

In [209]:
performance.predictions.show(5)

+--------------------+------+--------------------+--------------------+----------+
|      scaledFeatures|target|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+----------+
|(14,[0,1,2,3,4,8,...|   0.0|[4.57007365056571...|[0.98974897500791...|       0.0|
|(14,[0,1,2,3,4,8,...|   0.0|[6.25892079298582...|[0.99809034422271...|       0.0|
|(14,[0,1,2,3,4,9,...|   0.0|[5.2515401661391,...|[0.99478786610651...|       0.0|
|(14,[0,1,2,3,5,8,...|   0.0|[6.60602966310936...|[0.99864963579290...|       0.0|
|(14,[0,1,2,3,5,9,...|   0.0|[5.01464106847764...|[0.99340378405210...|       0.0|
+--------------------+------+--------------------+--------------------+----------+
only showing top 5 rows



In [210]:
predictions = performance.predictions.select('target','prediction')

In [211]:
evaluator = BinaryEvaluator(rawPredictionCol='prediction', labelCol='target')

In [212]:
accuracy = evaluator.evaluate(predictions)

In [213]:
print(f"Logistic Accuracy: {round(accuracy,2)}")
# senza threshold accuracy al 76.92

Logistic Accuracy: 0.8


In [214]:
def confusion_matrix(predictions):

  sensitivity=predictions.filter((predictions.target ==0) & (predictions.prediction==0)) # TRUE POSITIVE
  false_pos=predictions.filter((predictions.target ==1)& (predictions.prediction==0))    # FALSE POSITIVE
  false_neg=predictions.filter((predictions.target ==0)& (predictions.prediction==1))    # FALSE NEGATIVE
  specificity=predictions.filter((predictions.target ==1) & (predictions.prediction==1)) # TRUE NEGATIVE

  print(f"Sensitivity: {round(sensitivity.count()/(false_neg.count()+sensitivity.count()),2)}") # tasso veri positvi predetti
  print(f"Specificity: {round(specificity.count()/(specificity.count()+false_pos.count()),2)}") # tasso di veri neg. predetti

In [215]:
confusion_matrix(predictions)

Sensitivity: 0.82
Specificity: 0.78


## RANDOM FOREST

In [216]:
classifier = RF(maxDepth=12,numTrees=20,labelCol="target",featuresCol="scaledFeatures")

model = classifier.fit(training_set)

predictions = model.transform(test_set)
predictions.show(5)
tester = Tester(predictionCol='prediction', labelCol='target')

accuracy = tester.evaluate(predictions)

print(f"Final accuracy is: {accuracy:5.3f}")

+--------------------+------+--------------------+--------------------+----------+
|      scaledFeatures|target|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+----------+
|(14,[0,1,2,3,4,8,...|   0.0|[19.7001752069805...|[0.98500876034902...|       0.0|
|(14,[0,1,2,3,4,8,...|   0.0|[19.9406151079278...|[0.99703075539639...|       0.0|
|(14,[0,1,2,3,4,9,...|   0.0|[19.9825531732326...|[0.99912765866163...|       0.0|
|(14,[0,1,2,3,5,8,...|   0.0|[19.9808090493495...|[0.99904045246747...|       0.0|
|(14,[0,1,2,3,5,9,...|   0.0|[19.9286792033629...|[0.99643396016814...|       0.0|
+--------------------+------+--------------------+--------------------+----------+
only showing top 5 rows

Final accuracy is: 0.849


In [217]:
confusion_matrix(predictions) 

Sensitivity: 0.94
Specificity: 0.59


## SPOSTIAMO IL CONFINE DECISIONALE DEL RANDOM FOREST

In [218]:
probabilities = [row['probability'][0] for row in predictions.collect()]
pred_strict = [0 if prob >0.70 else 1 for prob in probabilities]
y_true = [row['target'] for row in predictions.collect()]
predictions= spark.createDataFrame(zip(pred_strict, y_true), schema=['prediction', 'target'])

In [219]:
predictions=predictions.withColumn("prediction", predictions.prediction.cast(DoubleType()))
predictions=predictions.withColumn("target", predictions.target.cast(DoubleType()))   

In [220]:
tester = Tester(predictionCol='prediction', labelCol='target')
accuracy = tester.evaluate(predictions)

print(f"Final accuracy is: {accuracy:5.2f}")

Final accuracy is:  0.84


In [221]:
confusion_matrix(predictions)    # 0.30== 84  e 82  # 0.35== 88 and 75   # 0.40 == 90 and 71   # 0.45== 92 and 66

Sensitivity: 0.83
Specificity: 0.81


## MULTILAYER PERCEPTRON CLASSIFIER

In [222]:
### RESAMPLING DATA

major_df = Scaled_data.filter(Scaled_data.target == 0)
minor_df = Scaled_data.filter(Scaled_data.target  == 1)
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in range(0,2)]))).drop('dummy')
minor_df.count()
oversampled_df.count()
## DATASET UNION
combined_df = major_df.unionAll(oversampled_df)

In [223]:
training_set_resampled, test_set_resampled = combined_df.randomSplit([0.7,0.3], seed=0)

In [224]:
layers = [len(features),20,10,2]
print(layers)
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=100,
                                         labelCol="target",featuresCol="scaledFeatures" )

[14, 20, 10, 2]


In [228]:
model = trainer.fit(training_set_resampled)

In [232]:
result = model.transform(test_set_resampled)

predictions = result.select("prediction", "target")
evaluator = MulticlassClassificationEvaluator(labelCol = 'target', predictionCol = 'prediction', metricName = 'accuracy')
accuracy = evaluator.evaluate(predictions)

print(f"Final accuracy is: {accuracy:5.2f}")

#print("Test set accuracy = " + str(evaluator.evaluate(predictions)))

Final accuracy is:  0.82


In [230]:
confusion_matrix(predictions)   ### SENS:0.94 SPEC: 0.56

Sensitivity: 0.84
Specificity: 0.78
