In [None]:
#INIZIALIZZAIZIONE SESSION E AVVIO SPARK
import pyspark as pys 
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import pyspark.sql as sql
import pyspark.sql.functions as func
#Spark session
spark = SparkSession.builder.getOrCreate()

#Spark context
sc = spark.sparkContext

#Alcuni moduli sono deprecati o spostati nel ML module che vedremo in seguito

#Caricamento dataset
census_path = './learningPySpark/Data/census_income.csv'

census=spark.read.csv(
    census_path,
    header=True,
    inferSchema=True
)

census.take(5)

census.printSchema()
#Lo schema è corretto

census.take(5)
#È necessario rimuovere gli spazi all'inizio e alla fine delle stringhe
#dtype lista di coppie (NomeColonna,Tipo)
#ltrim rimuove gli spazi all'inizio
#rtrim alla fine 
for col,typ in census.dtypes:
    if typ == "string":
        census = census.withColumn(
            col, func.ltrim(func.rtrim(census[col]))
        )
census.take(5)


In [None]:
#Preparazione Dataset 

In [None]:
#PULIZIA DATASET

#Selezioniamo le colonne di nostro interesse
cols_to_keep = census.dtypes #lista di coppie (nome colonna, tipo)

cols_to_keep=(
    ["label","age","capital-gain","capital-loss","hours-per-week"] +
    [
        #:-1 tranne ultimo elemento. label già presente nella lista sopra
        e[0] for e in cols_to_keep[:-1] if e[1]=="string" #tutte le stringhe
    ]
)
#Prenso il subset create dalle colonne di interesse
census_subset = census.select(cols_to_keep)

#Prendo le colonne con valori numerici di interesse
cols_num = [
    e[0] for e in census_subset.dtypes if e[1]=="int"
]
#Prenso le colonne con valori stringa
cols_cat = [
    e[0] for e in census_subset.dtypes[1:] if e[1]=="string" #1: non voglio label che è particolare
    #lo trattiamo dopo
]

cols_num,cols_cat

In [None]:
#ATTRIBUTI NUMERICI

#Calcolo alcune statistiche su colonne numeriche
import pyspark.mllib.stat as st
import numpy as np

rdd_num = (
    census_subset.select(cols_num).rdd
    .map(lambda row: [e for e in row]) #da Row a lista
)

stats_num =st.Statistics.colStats(rdd_num)

for col, min_, mean_, max_, var_ in zip(
cols_num
, stats_num.min()
, stats_num.mean()
, stats_num.max()
, stats_num.variance()
):
    print('{0}: min->{1:.1f}, mean->{2:.1f}, max->{3:.1f}, stdev->{4:.1f}'.format(col, min_, mean_, max_, np.sqrt(var_)))

In [None]:
#STRINGHE
#Colonne categorizzate. Contiamo le occorrenze

rdd_cat =(
    census_subset.select(cols_cat+['label']).rdd
    .map(lambda row: [e for e in row])
)

results_cat={} #dizionario
#Metto nel dizionario per ogni colonna una coppia (nome,numero ricorrenze)

for i,col in enumerate(cols_cat+ ['label']):
    results_cat[col] = (
        rdd_cat.groupBy(lambda row: row[i])
        .map(lambda el: (el[0],len(el[1])))
        #el[0] nome attributo (raggruppato)
        #el[1] lista valori per attributo raggruppato
        .collect()
    )

#Stampo 
for k in results_cat:
    print(k,sorted(
        results_cat[k]
        ,key=lambda el: el[1] #el[1] numero ricorrenze come elemento su cui ordinare
        ,reverse=True)
        ,"\n")



In [None]:
#CALCOLO MATRICE DI CORRELAZIONE
correlations = st.Statistics.corr(rdd_num) #Ritorna un array numpy (matrice di correlazione)
print(correlations) #mancano i nomi delle colonne

#Stampo la matrice (solo valori con abs()>0.05)
for i,el_i in enumerate(abs(correlations)>0.05): #ritorna matrice di booleani
    print(cols_num[i])

    for j,el_j in (enumerate(el_i)): #el_j è un booleano
        if el_j and j!= i:
            print(" ",cols_num[j],correlations[i][j])
    print("\n") #riga vuota

#Poca correlazione. Bene.

In [None]:
#Test statistici

#Calcoliamo l'indipendenza fra label e occupation

import pyspark.mllib.linalg as ln #algebra lineare

census_occupation = (
    census.groupby("occupation")
    .pivot("label")
    .count()
)

census_occupation_coll = (
    census_occupation.rdd.map(lambda row: (row[1:]))
    .flatMap(lambda row: row).collect()
)

len_row = census_occupation.count()
#DenseMatrix (numeroRighe, NColonne, Dati, Trasposta)
dense_mat = ln.DenseMatrix(
    len_row, 2, census_occupation_coll,True
)

chi_sq = st.Statistics.chiSqTest(dense_mat)

print(chi_sq.pValue) #=0 Le occorrenze sono indipendenti
print(dense_mat)

dense_mat.toArray()

In [None]:
#Valori stringa categorizzati
import pyspark.mllib.feature as feat

#Conto il numero di valori distinti per colonna
len_ftrs =[] #lista (colonna,numeroValoriDistinti)
for col in cols_cat:
    (
        len_ftrs.append(
            (col,census.select(col).distinct().count())
        )
    )
len_ftrs=dict(len_ftrs)

#Codifica dei dati
final_data = (
    census
    .select(cols_to_keep)
    .rdd
    .map(lambda row: [
        list(
            feat.HashingTF(int(len_ftrs[col] / 2.0))
            .transform(row[i])
            .toArray()
        ) if i >= 5
        else [row[i]] 
        for i, col in enumerate(cols_to_keep)]
    )
)

final_data.take(3)


In [None]:
#Codifica label (2 valori possibili)


def labelEncode(label):
    return [int(label[0] == '>50K')]

final_data = (
    final_data
    .map(lambda row: labelEncode(row[0]) 
         + [item 
            for sublist in row[1:] 
            for item in sublist]
        )
)

In [34]:
#NORMALIZZAZIONE (standardizzazione)

#StandardScaler(Media,Stddev) standardizzazione z-score

standardizer = feat.StandardScaler(True, True)
sModel = standardizer.fit(final_data.map(lambda row: row[1:]))
final_data_scaled = sModel.transform(final_data.map(lambda row: row[1:]))

final_data = (
    final_data
    .map(lambda row: row[0])
    .zipWithIndex()
    .map(lambda row: (row[1], row[0]))
    .join(
        final_data_scaled
        .zipWithIndex()
        .map(lambda row: (row[1], row[0]))
    )
    .map(lambda row: row[1])
)

final_data.take(3)




[(0,
  DenseVector([0.0307, 0.1485, -0.2167, -0.0354, -1.2635, 0.008, 1.7796, 1.0001, 0.83, 0.5743, -0.3473, -0.443, 0.6826, -0.4007, -0.3862, -0.4685, -1.1369, -0.4555, 0.4551, -1.1329, 2.0776, 1.8713, -1.0381, -0.3381, -0.2381, -0.775, 0.9805, 1.5207, 0.3083, -0.0634, -0.2574, -0.7031, 0.3208, -0.0901, -0.1263, 0.3355, -0.1223, 0.3378, -0.0853, -0.0937, -0.0887, -0.2104, 0.0, 0.1286, -0.1976, -0.1433, -0.1419, 0.1895, 0.298, 0.2896, 0.1638, 0.1221, 0.0])),
 (0,
  DenseVector([-0.7024, -0.1459, -0.2167, 1.4224, -0.2368, 0.008, -0.5593, -0.2503, 0.83, 1.3423, -0.3473, -0.443, 0.6826, -0.4007, -0.3862, 1.1159, -0.4505, -1.1655, -2.1192, -0.0989, -0.8052, 0.6842, 1.0935, -0.3381, -0.2381, -0.775, 0.9805, 1.5207, 0.3083, -0.0634, -0.2574, -0.7031, 0.3208, -0.0901, -0.1263, 0.3355, -0.1223, 0.3378, -0.0853, -0.0937, -0.0887, -0.2104, 0.0, 0.1286, -0.1976, -0.1433, -0.1419, 0.1895, 0.298, 0.2896, 0.1638, 0.1221, 0.0])),
 (0,
  DenseVector([-0.2626, -0.1459, -0.2167, -0.0354, -2.2902, -2.761

In [35]:
final_data.take(1)

[(0,
  DenseVector([0.0307, 0.1485, -0.2167, -0.0354, -1.2635, 0.008, 1.7796, 1.0001, 0.83, 0.5743, -0.3473, -0.443, 0.6826, -0.4007, -0.3862, -0.4685, -1.1369, -0.4555, 0.4551, -1.1329, 2.0776, 1.8713, -1.0381, -0.3381, -0.2381, -0.775, 0.9805, 1.5207, 0.3083, -0.0634, -0.2574, -0.7031, 0.3208, -0.0901, -0.1263, 0.3355, -0.1223, 0.3378, -0.0853, -0.0937, -0.0887, -0.2104, 0.0, 0.1286, -0.1976, -0.1433, -0.1419, 0.1895, 0.298, 0.2896, 0.1638, 0.1221, 0.0]))]

In [36]:
sModel.mean, sModel.std

(DenseVector([0.0, -0.0, 0.0, -0.0, -0.0, -0.0, 0.0, -0.0, 0.0, 0.0, 0.0, 0.0, 0.0, -0.0, 0.0, -0.0, -0.0, 0.0, -0.0, -0.0, -0.0, 0.0, -0.0, -0.0, 0.0, 0.0, -0.0, -0.0, 0.0, -0.0, 0.0, -0.0, -0.0, -0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, -0.0, 0.0, 0.0, -0.0, -0.0, 0.0, 0.0, -0.0, -0.0, -0.0, 0.0, 0.0]),
 DenseVector([1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0]))

In [38]:
import pyspark.mllib.regression as reg
#Creazione labeled points per la classificazione
final_data_income = (
    final_data
    .map(lambda row: reg.LabeledPoint(
        row[0]
        , row[1]
        )
    )
)
final_data_income.take(2)



[LabeledPoint(0.0, [0.030670086380022974,0.14845061558793732,-0.21665620002803668,-0.03542890292131961,-1.263495506961592,0.007996473732139464,1.7795863357831083,1.000120836900974,0.8300118030815629,0.5743180912245661,-0.34730116635185726,-0.4429929605624822,0.6826214082675929,-0.40070785156170013,-0.3862275949058749,-0.46854980553952685,-1.1368735829205499,-0.4555118517327585,0.45507885276723214,-1.1329160643506926,2.0775754968328806,1.8712975153274591,-1.0380868217125387,-0.33808094251059423,-0.23812708304818486,-0.7750139160707569,0.9804554188515354,1.5207190561281538,0.30834093641720645,-0.06339046129248913,-0.25736785057582295,-0.7030605487269814,0.3207798471547668,-0.09006362818252117,-0.12626697150459493,0.33547741813187393,-0.12230357893453815,0.33781064530673466,-0.08526116800202502,-0.0936707494514088,-0.08866697117953638,-0.21041541274675832,0.0,0.12864878811001434,-0.19757081643280744,-0.1433270085322855,-0.1419322155844281,0.18951895277276848,0.2979804796353552,0.289626591

In [39]:
#RDD di labeled points per la regressione

mu, std = sModel.mean[3], sModel.std[3]

final_data_hours = (
    final_data
    .map(lambda row: reg.LabeledPoint(
        row[1][3] * std + mu
        , ln.Vectors.dense([row[0]] + list(row[1][0:3]) + list(row[1][4:]))
        )
    )
)
final_data_hours.take(2)

[LabeledPoint(-0.03542890292131962, [0.0,0.030670086380022974,0.14845061558793732,-0.21665620002803668,-1.263495506961592,0.007996473732139464,1.7795863357831083,1.000120836900974,0.8300118030815629,0.5743180912245661,-0.34730116635185726,-0.4429929605624822,0.6826214082675929,-0.40070785156170013,-0.3862275949058749,-0.46854980553952685,-1.1368735829205499,-0.4555118517327585,0.45507885276723214,-1.1329160643506926,2.0775754968328806,1.8712975153274591,-1.0380868217125387,-0.33808094251059423,-0.23812708304818486,-0.7750139160707569,0.9804554188515354,1.5207190561281538,0.30834093641720645,-0.06339046129248913,-0.25736785057582295,-0.7030605487269814,0.3207798471547668,-0.09006362818252117,-0.12626697150459493,0.33547741813187393,-0.12230357893453815,0.33781064530673466,-0.08526116800202502,-0.0936707494514088,-0.08866697117953638,-0.21041541274675832,0.0,0.12864878811001434,-0.19757081643280744,-0.1433270085322855,-0.1419322155844281,0.18951895277276848,0.2979804796353552,0.289626591

In [40]:
#Dividiamo il dataset in training e testing

(
    final_data_income_train
    , final_data_income_test
    ) = (
    #Come parametro la proporzione da assegnare ad ogni dataset
    final_data_income.randomSplit([0.7, 0.3])
)

(
    final_data_hours_train
    , final_data_hours_test
    ) = (
    final_data_hours.randomSplit([0.7, 0.3])
)

In [47]:
#Predire ore di lavoro
#con regressione lineare

workhours_model_lm = reg.LinearRegressionWithSGD.train(final_data_hours_train)

#Testiamo il modello
small_sample_hours = sc.parallelize(final_data_hours_test.take(10))

for t,p in zip(
    small_sample_hours
    .map(lambda row: row.label).collect()
    , workhours_model_lm.predict(small_sample_hours
    .map(lambda row: row.features)
).collect()):
    print(t,p)


1.4223644939039817 0.10542444909923396
-0.03542890292131962 0.5115554270579242
-0.03542890292131962 -0.05563934053726413
-0.03542890292131962 0.2857853919345268
0.36951370730793076 0.09397650078312011
-1.2502567336090709 -0.6734044368007291
0.7744563175371811 0.0760273478274319
1.1793989277664314 -0.07481547691060852
0.36951370730793076 0.23583431166140195
0.9364333616288814 0.5147502219910091


In [49]:
#Predire guadagno lavoro
#con SVM support vector machine
import pyspark.mllib.classification as cl

income_model_lr = cl.LogisticRegressionWithSGD.train(final_data_income_train)

#Testiamo il modello
small_sample_income = sc.parallelize(final_data_income_test.take(10))

for t,p in zip(
    small_sample_income
    .map(lambda row: row.label)
    .collect()
    , income_model_lr.predict(
    small_sample_income
    .map(lambda row: row.features)
    ).collect()):
        print(t,p)

0.0 1
0.0 0
0.0 0
0.0 0
1.0 1
0.0 1
1.0 1
1.0 1
0.0 0
0.0 0


In [52]:
#Modello di custering
import pyspark.mllib.clustering as clu

model = clu.KMeans.train(
    final_data.map(lambda row: row[1])
    , 2
    , initializationMode='random'
    , seed=666
)

import sklearn.metrics as m
predicted = (
model
.predict(
final_data.map(lambda row: row[1])
)
)
predicted = predicted.collect()
true = final_data.map(lambda row: row[0]).collect()
print(m.homogeneity_score(true, predicted))
print(m.completeness_score(true, predicted))

0.15347287281512212
0.12233906102086833


In [56]:
#MANCA CALCOLO PERFORMANCE (errori nel modello)

spark.stop()