In [1]:
import pyspark
sc = pyspark.SparkContext()

In [2]:
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
from pyspark.sql.functions import when
from kneed import KneeLocator
from yellowbrick.cluster import KElbowVisualizer
from pyspark.ml.feature import Normalizer, MinMaxScaler, VectorAssembler, StandardScaler
from pyspark.sql import Row, Column
from pyspark.sql.types import FloatType, DoubleType
import plotly.graph_objects as go
from pyspark.mllib.linalg import DenseVector

from pyspark.ml.classification import LinearSVC

from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.tree import LabeledPoint

from sklearn.metrics import confusion_matrix, roc_curve, auc
import numpy as np
import itertools

from pyspark.ml import Pipeline
from pyspark.sql.functions import udf, mean, stddev

from pyspark.ml.feature import StringIndexer, VectorIndexer, StandardScaler, VectorAssembler


from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, ClusteringEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics, BinaryClassificationMetrics

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
spark = SparkSession.builder \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
clusters_path = "hdfs://kddrtserver13.isti.cnr.it:9000/user/hpsa15/kmeans_cluster_"
optimals_path = "hdfs://kddrtserver13.isti.cnr.it:9000/user/hpsa15/linearsvm_optimals.csv"

- Numeri dei clusters

In [5]:
prediction_v = [0, 1, 2, 3, 4, 5]

- Lettura dei clusters

In [6]:
clusters_df = {}
for i in prediction_v:
    clusters_df[i] = spark.read.options(inferSchema = True, header = True)\
                .csv(clusters_path+str(i)+".csv")

In [7]:
features_numeric = ["age", "duration", "campaign", "pdays", "previous",
                  "emp_var_rate", "cons_price_idx", "cons_conf_idx", "euribor3m",
                  "nr_employed"]

In [8]:
numIterations = [100, 150, 200, 250]
regParm = [0.01, 0.02, 0.04, 0.06, 0.08]
tol = [1e-03, 1e-04, 1e-05, 1e-06, 1e-07]

In [9]:
tuned_param = {}
s = 0
for i in numIterations:
    for j in regParm:
        for k in tol:
            tuned_param["sharp_"+str(s)] = {"maxIter": i, "regParam": j, "tol": k}
            s = s + 1

In [10]:
tuned_param

{'sharp_0': {'maxIter': 100, 'regParam': 0.01, 'tol': 0.001},
 'sharp_1': {'maxIter': 100, 'regParam': 0.01, 'tol': 0.0001},
 'sharp_2': {'maxIter': 100, 'regParam': 0.01, 'tol': 1e-05},
 'sharp_3': {'maxIter': 100, 'regParam': 0.01, 'tol': 1e-06},
 'sharp_4': {'maxIter': 100, 'regParam': 0.01, 'tol': 1e-07},
 'sharp_5': {'maxIter': 100, 'regParam': 0.02, 'tol': 0.001},
 'sharp_6': {'maxIter': 100, 'regParam': 0.02, 'tol': 0.0001},
 'sharp_7': {'maxIter': 100, 'regParam': 0.02, 'tol': 1e-05},
 'sharp_8': {'maxIter': 100, 'regParam': 0.02, 'tol': 1e-06},
 'sharp_9': {'maxIter': 100, 'regParam': 0.02, 'tol': 1e-07},
 'sharp_10': {'maxIter': 100, 'regParam': 0.04, 'tol': 0.001},
 'sharp_11': {'maxIter': 100, 'regParam': 0.04, 'tol': 0.0001},
 'sharp_12': {'maxIter': 100, 'regParam': 0.04, 'tol': 1e-05},
 'sharp_13': {'maxIter': 100, 'regParam': 0.04, 'tol': 1e-06},
 'sharp_14': {'maxIter': 100, 'regParam': 0.04, 'tol': 1e-07},
 'sharp_15': {'maxIter': 100, 'regParam': 0.06, 'tol': 0.001},

In [12]:
def grid(df, prediction_v, tuned_param):
    optimals = {}
    evaluatorMulti = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
    evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName='areaUnderROC')
    for i in prediction_v:
        assembler = VectorAssembler().setInputCols(features_numeric).setOutputCol("features")
        tmp_df = assembler.transform(df[i])
        tmp_df = tmp_df.withColumnRenamed("y","label")
        # Training e test
        train1, test = tmp_df.randomSplit([0.7, 0.3], seed = 2)
        # Training e validation
        train, val = train1.randomSplit([0.7, 0.3], seed = 2)
        f1 = 0
        s_tmp = ""
        print("------------ Grid per cluster " + str(i) + " ------------")
        for j in tuned_param:            
            lsvc = LinearSVC(maxIter = tuned_param[j]["maxIter"], regParam = tuned_param[j]["regParam"], tol = tuned_param[j]["tol"])
            # Fit the model
            model = lsvc.fit(train)
            print("Effettuato " + str(j))
            # Make predicitons
            predictionAndTarget = model.transform(val).select("label", "prediction")
            
            f1_new = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "f1"})
            # Massimizziamo F1
            if(f1_new > f1):
                s_tmp = j
        print(" > Parametri migliori ")
        print(s_tmp)
        # Metriche Reali
        optimals[i] = {"maxIter": tuned_param[s_tmp]["maxIter"], "regParam" : tuned_param[s_tmp]["regParam"], "tol": tuned_param[s_tmp]["tol"]}
        print(optimals[i])
        mlp = LinearSVC(maxIter = tuned_param[s_tmp]["maxIter"], regParam = tuned_param[s_tmp]["regParam"], tol = tuned_param[s_tmp]["tol"])
        model = mlp.fit(train1)
        predictionAndTarget = model.transform(test).select("label", "prediction")
        print(" > Valutazioni")
        # Print the coefficients and intercept for linear SVC
        print("- Coefficients: " + str(model.coefficients))
        print("- Intercept: " + str(model.intercept))
        acc = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "accuracy"})
        f1 = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "f1"})
        weightedPrecision = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedPrecision"})
        weightedRecall = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedRecall"})
        auc = evaluator.evaluate(predictionAndTarget)
        print(" - Accuracy = " + str(acc))
        print(" - F1 score = " + str(f1))
        print(" - Weighted Precision = " + str(weightedPrecision))
        print(" - Weighted Recall = " + str(weightedRecall))
        print(" - AUC = " + str(auc))
    return optimals

In [None]:
optimals = grid(clusters_df, prediction_v, tuned_param)

------------ Grid per cluster 0 ------------
Effettuato sharp_0
Effettuato sharp_1
Effettuato sharp_2
Effettuato sharp_3
Effettuato sharp_4
Effettuato sharp_5
Effettuato sharp_6
Effettuato sharp_7
Effettuato sharp_8
Effettuato sharp_9
Effettuato sharp_10
Effettuato sharp_11
Effettuato sharp_12
Effettuato sharp_13
Effettuato sharp_14
Effettuato sharp_15
Effettuato sharp_16
Effettuato sharp_17
Effettuato sharp_18
Effettuato sharp_19
Effettuato sharp_20
Effettuato sharp_21
Effettuato sharp_22
Effettuato sharp_23
Effettuato sharp_24
Effettuato sharp_25
Effettuato sharp_26
Effettuato sharp_27
Effettuato sharp_28
Effettuato sharp_29
Effettuato sharp_30
Effettuato sharp_31
Effettuato sharp_32
Effettuato sharp_33
Effettuato sharp_34
Effettuato sharp_35
Effettuato sharp_36
Effettuato sharp_37
Effettuato sharp_38
Effettuato sharp_39
Effettuato sharp_40
Effettuato sharp_41
Effettuato sharp_42
Effettuato sharp_43
Effettuato sharp_44
Effettuato sharp_45
Effettuato sharp_46
Effettuato sharp_47
Effet

Effettuato sharp_24
Effettuato sharp_25
Effettuato sharp_26
Effettuato sharp_27
Effettuato sharp_28
Effettuato sharp_29
Effettuato sharp_30
Effettuato sharp_31
Effettuato sharp_32
Effettuato sharp_33
Effettuato sharp_34
Effettuato sharp_35
Effettuato sharp_36
Effettuato sharp_37
Effettuato sharp_38
Effettuato sharp_39
Effettuato sharp_40
Effettuato sharp_41
Effettuato sharp_42
Effettuato sharp_43
Effettuato sharp_44
Effettuato sharp_45
Effettuato sharp_46
Effettuato sharp_47
Effettuato sharp_48
Effettuato sharp_49
Effettuato sharp_50
Effettuato sharp_51
Effettuato sharp_52
Effettuato sharp_53
Effettuato sharp_54
Effettuato sharp_55
Effettuato sharp_56
Effettuato sharp_57
Effettuato sharp_58
Effettuato sharp_59
Effettuato sharp_60
Effettuato sharp_61
Effettuato sharp_62
Effettuato sharp_63
Effettuato sharp_64
Effettuato sharp_65
Effettuato sharp_66
Effettuato sharp_67
Effettuato sharp_68
Effettuato sharp_69
Effettuato sharp_70
Effettuato sharp_71
Effettuato sharp_72
Effettuato sharp_73


- Salvataggio valori ottimi (backup)

In [58]:
"""
# Eseguire solo se è stata eseguita la crossvalidation
def opt_towrite(optimals):
    ll = []
    for i in range(0, 6):
        x ={}
        x["clusters"] = i
        x["maxIter"] = optimals[i]["maxIter"]
        ss = str(optimals[i]["layers"][0])
        for j in range(1, len(optimals[i]["layers"])):
            ss = ss+"_"+str(optimals[i]["layers"][j])
        x["layers"] = ss
        x["blockSize"] = optimals[i]["blockSize"]
        ll.append(x)
    df_optimals = spark.createDataFrame(ll)
    df_optimals.write.format("csv").save("hdfs://kddrtserver13.isti.cnr.it:9000/user/hpsa15/multilayerperceptron_optimals.csv", header = True)

opt_towrite(optimals)
"""

In [61]:
"""
optimals_df = spark.read.options(inferSchema = True, header = True)\
                .csv(optimals_path)    
optimals_df.show()
"""

+---------+--------+------+-------+
|blockSize|clusters|layers|maxIter|
+---------+--------+------+-------+
|      128|       1|10_8_2|    250|
|      128|       2|10_8_2|    250|
|      128|       4|10_8_2|    250|
|      128|       5|10_8_2|    250|
|      128|       0|10_8_2|    250|
|      128|       3|10_8_2|    250|
+---------+--------+------+-------+



- Funzione per plottare la confusion matrix

In [52]:
def plt_matrix(matrix, title):
    # Normalizzazione della matrice
    matrix = matrix.astype('float') / matrix.sum(axis=1)[:, np.newaxis]
    plt.figure()
    plt.imshow(matrix, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(2)
    plt.xticks(tick_marks, ["Aperto", "Non-Aperto"], rotation=45)
    plt.yticks(tick_marks, ["Aperto", "Non-Aperto"])

    thresh = matrix.max() / 2.
    for i, j in itertools.product(range(matrix.shape[0]), range(matrix.shape[1])):
        plt.text(j, i, format(matrix[i, j], '.2f'),
                 horizontalalignment="center",
                 color="white" if matrix[i, j] > thresh else "black")

    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.grid()
    plt.tight_layout()
    plt.plot()

- Funzione per stampare le metriche

In [53]:
def performance(df, title):
    metrics = MulticlassMetrics(df)
  
    print('  Accuracy '+str(metrics.accuracy))
    print('  F-1 Score         '+ str(metrics.fMeasure()))
    print('  Precision (False) '+ str(metrics.precision(0)))
    print('  Precision (True) '+str(metrics.precision(1)))
    print('  Recall (False)   '+str(metrics.recall(0)))
    print('  Recall (True)    '+str(metrics.recall(1)))
    print('  Confusion Matrix')
    matrix = metrics.confusionMatrix().toArray()
    print(matrix)
    plt_matrix(matrix, title)

- Funzione per stampare roc_auc

In [54]:
def roc_auc_plot(res, title):

    results_list = res.collect()
    y_score = [i[0] for i in results_list]
    y_test = [i[1] for i in results_list]
    
    fpr, tpr, _ = roc_curve(y_test, y_score)
    roc_auc = auc(fpr, tpr)

    print("AUC MLlib: " + str(BinaryClassificationMetrics(res).areaUnderROC))
    %matplotlib inline
    plt.figure()
    plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(title)
    plt.legend(loc="lower right")
    plt.plot()

In [62]:
def get_labels_plot(model, data, title):    
    # 1 - Valore target predetto
    predictions = model.predict(data.map(lambda r: r.features))
    # 2 - Etichette per performance
    labels_ = predictions.zip(data.map(lambda r: r.label))
    # 3 - Stampa Performance
    performance(labels_, title)

'\n    # 4 - Valore target reale\n    actual = test.map(lambda x: x.label)\n    # 5 - Dataframe valori target predetti: (target_predetto, indice)\n    df_prediction = predictions.zipWithIndex().toDF([\'predicted_label\', \'index\'])\n    # 6 - Dataframe valori target attuali: (target_attuale, indice)\n    df_actual = predictions.zipWithIndex().toDF([\'actual_label\', \'index\'])\n    # 7 - JOIN dei dataframe tramite indice\n    join_df = df_actual.join(df_prediction, df_actual.index == df_prediction.index).drop(\'index\')\n\n    # 8 - Count di Predizioni corrette (1) e predizioni errate (0)\n    print("          - Count predizioni corrette (=1) vs errate (=0) - ")\n    x = join_df.withColumn(\'correct_predictions\', when(join_df[\'actual_label\'] == join_df[\'predicted_label\'], 1).otherwise(0))\n    x.groupBy(\'correct_predictions\').count().show()\n\n    # 9 - Classificazioni sbagliate\n    print("          - Count predizioni errate - ")\n    x.filter(x[\'correct_predictions\'] == 0.