<font size="5">**Breve introduzione agli alberi decisionali ed alle foreste casuali**</font><br>

> (c) 2023 Antonio Piemontese 

Gli alberi decisionali sono una famiglia di algoritmi in grado di gestire naturalmente predittori sia categorici che numerici. La costruzione di un singolo albero può essere eseguita utilizzando il calcolo parallelo e molti alberi possono essere costruiti in parallelo contemporaneamente. Gli alberi sono robusti rispetto ai valori anomali nei dati, il che significa che alcuni punti dati estremi (*outlier*) e possibilmente errati possono non influenzare affatto le previsioni.<br>

Gli alberi possono elaborare dati di tipi diversi e su scale diverse, senza la necessità di pre-elaborazione o normalizzazione (o di standardizzazione). Gli algoritmi basati sull'albero decisionale hanno il vantaggio di essere relativamente intuitivi da comprendere e da ragionarci sopra. In effetti, spesso le persone usano uno schema di ragionamento simile a quello incorporato negli alberi decisionali, nella vita di tutti i giorni. Ad esempio, mi siedo per prendere il caffèlatte mattutino. Prima di scegliere il latte e aggiungerlo al caffè, voglio prevedere: il latte è andato a male? Non lo so per certo. Potrei controllare se la data di scadenza è trascorsa. In caso contrario, prevedo di no, non è andato a male. Se la data è trascorsa, ma erano due o meno giorni fa, corro il rischio e prevedo di no, non è andato a male. Altrimenti annuso il latte. Se ha un odore strano, prevedo di sì, altrimenti di no. Questa serie di decisioni sì/no, che porta appunto ad una previsione, è ciò che incorporano gli alberi decisionali. Ogni decisione porta a uno dei due risultati, ovvero una previsione o un'altra decisione, come mostrato nella seguente figura.

![](Figure_4_1.png)

E' naturale pensare al processo decisionale come ad **un albero di decisioni**, dove ogni nodo interno all'albero è una decisione, e ogni **nodo foglia** è una risposta finale.<br>

Consideriamo un <u>altro esempio</u>. Un robot ha trovato lavoro in un negozio di animali domestici esotici. Vuole sapere, prima che il negozio apra, quali animali nel negozio sarebbero adatti per un bambino, prima che egli venga in negozio. Il proprietario del negozio elenca nove animali domestici che, a suo parere, sono o non sono adatti al bambino, e poi se ne va. Il robot esamina gli animali e ne raccoglie alcune informazioni, elencate nella seguente tabella:

![](Table_4_1.png)

Il robot può scegliere uno dei 9 animali elencati; tuttavia, ci sono molti altri animali domestici disponibili nel negozio. Il robot ha bisogno di un metodo per decidere quali animali tra tutti gli altri possono essere adatti per un bambino.<br>

Possiamo presumere che nel negozio siano disponibili animali per tutte le caratteristiche esaminate (*name*,*weight*, *#legs*, *color*) ed anche per le loro combinazioni. Utilizzando i dati decisionali forniti dal proprietario del negozio ed un algoritmo di albero decisionale, possiamo aiutare il robot a capire che aspetto ha un animale domestico per un bambino.<br>

Il nome non è una caratteristica utile per questa previsione, e dunque non sarà incluso nel modello di albero decisionale. Infatti, è difficile pensare che il nome da solo sia predittivo; "Felix" potrebbe nominare un gatto o una tarantola velenosa, per quanto ne sa il robot. Ci sono due caratteristiche numeriche (peso, numero di zampe) e una caratteristica categorica (colore) che possono aiutare a prevedere la risposta categorica (è/non è un animale domestico adatto per un bambino).<br>

Il modo in cui funziona un albero decisionale è: **prendere una o più decisioni in sequenza in base ai predittori forniti**. Per iniziare, il robot può provare ad adattare (*fit*) un semplice albero decisionale a questi dati di training, costituito da una singola decisione, come mostrato qui sotto:

![](Figure_4_2.png)

La logica dell'albero decisionale è di facile comprensione: gli animali di 500 kg sembrano certamente inadatti come animali domestici. Questa regola decisionale, infatti, prevede il valore corretto in <u>cinque dei nove casi</u>. Una rapida occhiata suggerisce che potremmo migliorare la regola abbassando la soglia di peso a 100 kg. Questa regola ottiene <u>sei esempi su nove corretti</u>. Gli animali pesanti ora sono previsti correttamente; gli animali più leggeri sono corretti solo in parte. E' allora possibile costruire **una seconda regola decisionale** per affinare ulteriormente la previsione per animali con peso inferiore a 100 kg. E' utile scegliere un predittore che modifichi alcune delle previsioni Sì errate in No. Ad esempio, c'è un piccolo animale verde, che dal nome potrebbe essere un serpente (!) e che è stato etichettato come non-adatto dal proprietario, che invece è classificato dal nostro primo modello come adatto! Il robot può allora correttamente prevedere come non-adatto semplicemente aggiungendo una decisione basata sul colore, come mostrato nella seguente figura:

![](Figure_4_3.png)

Ora, sette esempi su nove sono classificati correttamente. Naturalmente, altre regole decisionali (ad esempio sul numero di zampe) potrebbero essere aggiunte fino a quando tutte e nove gli esempi non siano previsti correttamente. Tuttavia, la logica incorporata nell'albero decisionale risultante sarebbe probabilmente poco plausibile se tradotta in parole comuni: "Se il peso dell'animale è inferiore a 100 kg, il suo colore è marrone anziché verde e ha meno di 10 zampe, allora sì, è un animale domestico adatto ad un bambino. Pur adattandosi perfettamente agli esempi forniti, un albero decisionale come questo non riuscirebbe a prevedere che un piccolo lupo marrone a quattro zampe non sia un animale domestico adatto. È necessario un bilanciamento per evitare questo fenomeno, noto come **overfitting**.

Gli alberi decisionali generalizzano in un algoritmo più potente, chiamato foreste casuali. Le foreste casuali addestrano molti alberi decisionali separatamente e poi li combinano per ridurre il rischio di overfitting. L'algoritmo inserisce casualità nel processo di addestramento in modo che ogni albero decisionale sia leggermente diverso. La combinazione delle previsioni riduce la varianza delle previsioni, rende il modello risultante più generalizzabile e migliora le prestazioni sui dati di test.

# Impostazioni

In [2]:
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local') # la creazione del contesto Spark 
spark = SparkSession(sc)   # la creazione della sessione Spark. Servono una decina di secondi di esecuzione.
                           # NON si possono creare 2 sessioni contemporaneamente - dà errore!
                           

# La preparazione dei dati

Usiamo il famoso dataset **covtype** [che sta per (forest) *cover type*], disponibile [qui](https://archive.ics.uci.edu/dataset/31/covertype).<br>

Questo dataset registra i tipi di appezzamenti di terreno in Colorado (USA) coperti da foreste. Ogni riga del dataset contiene diverse caratteristiche che descrivono l'appezzamento di terreno, come: **l'altitudine, la pendenza, la distanza dall'acqua, l'ombra e il tipo di suolo**; inoltre per ogni riga (di training e di test) è noto il **tipo di foresta effettivo** che copre il terreno (<u>la risposta categorica</u>). Il tipo di copertura forestale deve appunto essere previsto in base alle caratteristiche dell'appezzamento, <u>54 in totale</u>. Questo dataset è stato utilizzato nella ricerca e persino in una competizione Kaggle. È un dataset interessante da esplorare anche perché contiene caratteristiche (predittori) sia categoriche che numeriche. Ci sono **581.012 righe**: non è esattamente un esempio di "big data", ma è abbastanza per evidenziare alcuni problemi di scala.<br>

Per fortuna, i dati sono già in un semplice formato CSV (ancorchè il tipo file sia *.data* e non *.csv*) e non richiedono molta pulizia o altra preparazione per essere utilizzati con **PySpark MLlib**. Il file *covtype.data* deve essere estratto dallo zip e copiato sul PC locale oppure nel cloud (ad esempio, AWS S3).<br>

Il file *covtype.info* fornisce molte importanti informazioni, utili da leggere.

La creazione di foreste decisionali può richiedere molte risorse computazionali. Se si ha memoria RAM, conviene configurare una sessione Spark con almeno 8GB, cioè: --driver-memory 8g.

In [3]:
spark = SparkSession.builder.config("spark.driver.memory", "8g").appName('tree_forest').getOrCreate()  # circa 8" di esecuz.

In [4]:
spark

In [5]:
data_without_header = spark.read.option("inferSchema", True)\
                      .option("header", False).csv("covtype/covtype.data")   # circa 4" di esecuzione
                      # questo codice non parsifica la prima linea come header; il data-type delle colonne è inferito
                      # dall'esame del file; 
data_without_header.printSchema() # correttamente inferisce che tutte le colonne sono numeriche, e, più esattamente, interi;
                                  # non può inferire il nome delle colonne (perchè assente nel file) e dunque assegna
                                  # genericamente: _c0, _c1, _c2, ecc.
                                  # il significato delle colonne è riportato e ben descritto nel file 'covtype.info'.        

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: integer (nullable = true)
 |-- _c14: integer (nullable = true)
 |-- _c15: integer (nullable = true)
 |-- _c16: integer (nullable = true)
 |-- _c17: integer (nullable = true)
 |-- _c18: integer (nullable = true)
 |-- _c19: integer (nullable = true)
 |-- _c20: integer (nullable = true)
 |-- _c21: integer (nullable = true)
 |-- _c22: integer (nullable = true)
 |-- _c23: integer (nullable = true)
 |-- _c24: integer (nullable = true)
 |-- _c25: integer (nullable = true)
 |-- _c26: integer (nullable = true)
 |-- _

In [None]:
%pwd                              # comando "magic" (cioè che interagisce con il sistema operativo sottostante) per
                                  # conoscere la directory corrente (in genere quella dove si trova il notebook che è stato
                                  # aperto)

![](covtype/covtype_metadata.png)

Alcune colonne sono effettivamente numeriche:
* l'elevazione è un'elevazione in metri; 
* la pendenza (*slope*) è misurata in gradi.

Tuttavia, *Wilderness_Area* è qualcosa di diverso, perché si dice che si estenda su quattro colonne, ognuna delle quali è uno 0 o 1. In realtà, Wilderness_Area è un valore categorico, non numerico. Queste quattro colonne sono in realtà una [**codifica one-hot**](https://en.wikipedia.org/wiki/One-hot), aka codifica 1-of-N. Quando questa forma di codifica è utilizzata per una caratteristica categoriale, una caratteristica categoriale che assume N valori distinti diventa N caratteristiche numeriche, ciascuna assumendo il valore 0 o 1. Esattamente uno solo degli N valori ha valore 1 e tutti gli altri sono 0. Ad esempio, una caratteristica categorica per il tempo che può essere nuvoloso, piovoso o sereno diventerebbe tre caratteristiche numeriche, dove nuvoloso è rappresentato da 1,0,0; piovoso di 0,1,0; e così via. Queste tre caratteristiche numeriche potrebbero essere considerate come caratteristiche "è_nuvoloso", "è_piovoso" ed "è_chiaro". Quindi, 40 colonne sono in realtà una unica caratteristica categorica *Soil_Type*. La codifica *one_hot* non è l'unico modo possibile per codificare una caratteristica categorica. Un'altra possibile codifica assegna semplicemente un valore numerico distinto a ciascun possibile valore della caratteristica categorica. Ad esempio, nuvoloso può diventare 1.0, piovoso 2.0 e così via. Quindi, nel caso covtype, potremmo avere un'unica variabile numerica *Wilderness_Area* che può assumere uno tra i quattro valori [1,2,3,4]. Il rischio di questa codifica è di sembrare ordinabile, anche se non lo è. Il target *Cover_Type* è un valore categorico codificato come valore da 1 a 7.

Prima di procedere, è bene aggiungere al DataFrame i nomi delle colonne per facilitare la comprensione: 

In [6]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col

colnames = ["Elevation", "Aspect", "Slope", \
            "Horizontal_Distance_To_Hydrology", \
            "Vertical_Distance_To_Hydrology", "Horizontal_Distance_To_Roadways", \
            "Hillshade_9am", "Hillshade_Noon", "Hillshade_3pm", \
            "Horizontal_Distance_To_Fire_Points"] + \
[f"Wilderness_Area_{i}" for i in range(4)] + \
[f"Soil_Type_{i}" for i in range(40)] + \
["Cover_Type"]

data = data_without_header.toDF(*colnames).\
                          withColumn("Cover_Type",
                                    col("Cover_Type").cast(DoubleType()))

data.head()

Row(Elevation=2596, Aspect=51, Slope=3, Horizontal_Distance_To_Hydrology=258, Vertical_Distance_To_Hydrology=0, Horizontal_Distance_To_Roadways=510, Hillshade_9am=221, Hillshade_Noon=232, Hillshade_3pm=148, Horizontal_Distance_To_Fire_Points=6279, Wilderness_Area_0=1, Wilderness_Area_1=0, Wilderness_Area_2=0, Wilderness_Area_3=0, Soil_Type_0=0, Soil_Type_1=0, Soil_Type_2=0, Soil_Type_3=0, Soil_Type_4=0, Soil_Type_5=0, Soil_Type_6=0, Soil_Type_7=0, Soil_Type_8=0, Soil_Type_9=0, Soil_Type_10=0, Soil_Type_11=0, Soil_Type_12=0, Soil_Type_13=0, Soil_Type_14=0, Soil_Type_15=0, Soil_Type_16=0, Soil_Type_17=0, Soil_Type_18=0, Soil_Type_19=0, Soil_Type_20=0, Soil_Type_21=0, Soil_Type_22=0, Soil_Type_23=0, Soil_Type_24=0, Soil_Type_25=0, Soil_Type_26=0, Soil_Type_27=0, Soil_Type_28=1, Soil_Type_29=0, Soil_Type_30=0, Soil_Type_31=0, Soil_Type_32=0, Soil_Type_33=0, Soil_Type_34=0, Soil_Type_35=0, Soil_Type_36=0, Soil_Type_37=0, Soil_Type_38=0, Soil_Type_39=0, Cover_Type=5.0)

Note sul codice della cella:
* il "+" concatena le collezioni
* un loop Python genera tutti i nomi delle colonne wilderness
* un loop Python genera tutti i nomi delle colonne soil

Le colonne relative alla natura ed al suolo sono denominate *Wilderness_Area_0*, *Soil_Type_0*, ecc. La colonna  target *Cover_Type* è convertita ad un valore *double* (anzichè *int*), cioè un floating point di due byte, perchè tale è il formato richiesto da tutte le API PySpark di MLlib.<br> 

In [7]:
data.printSchema()

root
 |-- Elevation: integer (nullable = true)
 |-- Aspect: integer (nullable = true)
 |-- Slope: integer (nullable = true)
 |-- Horizontal_Distance_To_Hydrology: integer (nullable = true)
 |-- Vertical_Distance_To_Hydrology: integer (nullable = true)
 |-- Horizontal_Distance_To_Roadways: integer (nullable = true)
 |-- Hillshade_9am: integer (nullable = true)
 |-- Hillshade_Noon: integer (nullable = true)
 |-- Hillshade_3pm: integer (nullable = true)
 |-- Horizontal_Distance_To_Fire_Points: integer (nullable = true)
 |-- Wilderness_Area_0: integer (nullable = true)
 |-- Wilderness_Area_1: integer (nullable = true)
 |-- Wilderness_Area_2: integer (nullable = true)
 |-- Wilderness_Area_3: integer (nullable = true)
 |-- Soil_Type_0: integer (nullable = true)
 |-- Soil_Type_1: integer (nullable = true)
 |-- Soil_Type_2: integer (nullable = true)
 |-- Soil_Type_3: integer (nullable = true)
 |-- Soil_Type_4: integer (nullable = true)
 |-- Soil_Type_5: integer (nullable = true)
 |-- Soil_Type

Si può ora eseguire `data.show` per vedere alcune righe del dataset, ma il display è così ampio che è difficile leggerlo tutto. `data.head` lo visualizza come un oggetto *Row* non elaborato, che in questo caso è leggibile. 

In [8]:
data.show()

+---------+------+-----+--------------------------------+------------------------------+-------------------------------+-------------+--------------+-------------+----------------------------------+-----------------+-----------------+-----------------+-----------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+----------+
|Elevation|Aspect|Slope|Horizontal_Distance_To_Hydrology|Vertical_Distance_To_Hydrology|Horizontal_Distance_To_Roadways|Hillshade_9am|Hillshade_Noon|Hillshade_3pm|Horizontal_Distance_To_Fire_Points|Wilderness

In [12]:
data.head()   # in parentesi il numero di righe di head (cioè all'inizio del dataframe) che si vogliono leggere.
              # il default è 1.

Row(Elevation=2596, Aspect=51, Slope=3, Horizontal_Distance_To_Hydrology=258, Vertical_Distance_To_Hydrology=0, Horizontal_Distance_To_Roadways=510, Hillshade_9am=221, Hillshade_Noon=232, Hillshade_3pm=148, Horizontal_Distance_To_Fire_Points=6279, Wilderness_Area_0=1, Wilderness_Area_1=0, Wilderness_Area_2=0, Wilderness_Area_3=0, Soil_Type_0=0, Soil_Type_1=0, Soil_Type_2=0, Soil_Type_3=0, Soil_Type_4=0, Soil_Type_5=0, Soil_Type_6=0, Soil_Type_7=0, Soil_Type_8=0, Soil_Type_9=0, Soil_Type_10=0, Soil_Type_11=0, Soil_Type_12=0, Soil_Type_13=0, Soil_Type_14=0, Soil_Type_15=0, Soil_Type_16=0, Soil_Type_17=0, Soil_Type_18=0, Soil_Type_19=0, Soil_Type_20=0, Soil_Type_21=0, Soil_Type_22=0, Soil_Type_23=0, Soil_Type_24=0, Soil_Type_25=0, Soil_Type_26=0, Soil_Type_27=0, Soil_Type_28=1, Soil_Type_29=0, Soil_Type_30=0, Soil_Type_31=0, Soil_Type_32=0, Soil_Type_33=0, Soil_Type_34=0, Soil_Type_35=0, Soil_Type_36=0, Soil_Type_37=0, Soil_Type_38=0, Soil_Type_39=0, Cover_Type=5.0)

Ora che abbiamo un pò di familiarità con il nostro dataset, possiamo **addestrare un modello di albero decisionale**.

# Il primo albero di decisione

Suddivisione del dataset in training e test. Come misura di valutazione della qualità predittiva del modello (sui dati di test) useremo l'accuratezza (*accuracy*). Ecco una [rassegna](https://towardsdatascience.com/metrics-to-evaluate-your-machine-learning-algorithm-f10ba6e38234) delle misure disponibili.

In [15]:
(train_data, test_data) = data.randomSplit([0.9, 0.1],seed=1)  
                                  # le % 0.9/0.1 sono accettabili considerate le grandi dimensioni
                                  # del dataset (circa 60.000 righe di test non sono poche).
                                  # il seed (qualsiasi numero!) è essenziale per garantire le RI-PRODUCIBILITA' tra PC.
train_data.cache() 
test_data.cache()

DataFrame[Elevation: int, Aspect: int, Slope: int, Horizontal_Distance_To_Hydrology: int, Vertical_Distance_To_Hydrology: int, Horizontal_Distance_To_Roadways: int, Hillshade_9am: int, Hillshade_Noon: int, Hillshade_3pm: int, Horizontal_Distance_To_Fire_Points: int, Wilderness_Area_0: int, Wilderness_Area_1: int, Wilderness_Area_2: int, Wilderness_Area_3: int, Soil_Type_0: int, Soil_Type_1: int, Soil_Type_2: int, Soil_Type_3: int, Soil_Type_4: int, Soil_Type_5: int, Soil_Type_6: int, Soil_Type_7: int, Soil_Type_8: int, Soil_Type_9: int, Soil_Type_10: int, Soil_Type_11: int, Soil_Type_12: int, Soil_Type_13: int, Soil_Type_14: int, Soil_Type_15: int, Soil_Type_16: int, Soil_Type_17: int, Soil_Type_18: int, Soil_Type_19: int, Soil_Type_20: int, Soil_Type_21: int, Soil_Type_22: int, Soil_Type_23: int, Soil_Type_24: int, Soil_Type_25: int, Soil_Type_26: int, Soil_Type_27: int, Soil_Type_28: int, Soil_Type_29: int, Soil_Type_30: int, Soil_Type_31: int, Soil_Type_32: int, Soil_Type_33: int, S

Per essere utilizzabile dalle librerie di *MLlib*, il dataframe ha bisogno di un pò di pre-elaborazione: le colonne predittore (ma non la risposta) devono essere raccolte in un [**feature vector**](https://en.wikipedia.org/wiki/Feature_(machine_learning)) (che funziona come un'array di *double*). Ovviamente alcune feature sono concettualmente categoriche, anche se fisicamente sono rappresentate come numeri. [feature vs feature set vs feature vector](https://stats.stackexchange.com/questions/192873/difference-between-feature-feature-set-and-feature-vector).

In [16]:
from pyspark.ml.feature import VectorAssembler                   # la funzione di vettorizzazione

input_cols = colnames[:-1]                                       # esclude la risposta, il cover_type  (la risposta)

# l'inizializzazione:
vector_assembler = VectorAssembler(inputCols=input_cols,         # due argomenti in input: le colonne da combinare ed il 
                                    outputCol="featureVector")   # nome della nuova colonna che contiene il feature vector
# la trasformazione:
assembled_train_data = vector_assembler.transform(train_data)    # --> il dataframe dei dati di training con le feature 
                                                                 #     vettorizzate (letter. "assemblate")

assembled_train_data.select("featureVector").show(truncate = False) # --> un vettore SPARSO (la maggior parte dei 54 
                                                                    #     valori è infatti 0) che contiene solo i valori 
                                                                    #     <> 0 ed i loro indici



+----------------------------------------------------------------------------------------------------+
|featureVector                                                                                       |
+----------------------------------------------------------------------------------------------------+
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1859.0,18.0,12.0,67.0,11.0,90.0,211.0,215.0,139.0,792.0,1.0,1.0])  |
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1860.0,18.0,13.0,95.0,15.0,90.0,210.0,213.0,138.0,780.0,1.0,1.0])  |
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1861.0,35.0,14.0,60.0,11.0,85.0,218.0,209.0,124.0,832.0,1.0,1.0])  |
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1863.0,37.0,17.0,120.0,18.0,90.0,217.0,202.0,115.0,769.0,1.0,1.0]) |
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1866.0,23.0,14.0,85.0,16.0,108.0,212.0,210.0,133.0,819.0,1.0,1.0]) |
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1867.0,20.0,15.0,108.0,19.0,120.0,208.0,206.0,132.0,808.0,1.0,1.0])|
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1868.0,27.0,16.0,67.0,17.0,95.0,212.0,2

`VectorAssembler` è un esempio di `Transformer`. Trasforma il DataFrame in input in un altro DataFrame. Può essere anche usato dentro le "pipeline* di trasformazione (vedi più avanti).

Costruiamo ora l'albero decisionale.

In [17]:
from pyspark.ml.classification import DecisionTreeClassifier              # classe "classificatori di tipo albero decis."

# creazione dell'istanza (maiusc+tab dentro la parentesi tonde fornisce la documentazione on-line del classificatore)
classifier = DecisionTreeClassifier(seed = 1234, labelCol="Cover_Type",   # la risposta effettiva
                                    featuresCol="featureVector",          # il feature vector (i predittori)
                                    predictionCol="prediction")           # il nome della colonna che conterrà la previsione

# fit del modello
model = classifier.fit(assembled_train_data)  # circa 5" di esecuzione

# display dell'albero fittato:
print(model.toDebugString)    # --> una struttura ad albero composta da una serie di IF innestate. 
                              #     Ogni IF confronta i valori della feature con un threshold.
                              #     Purtroppo l'albero riporta solo il NUMERO della feature, e non il suo NOME

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_478880a8b56b, depth=5, numNodes=43, numClasses=8, numFeatures=54
  If (feature 0 <= 3047.5)
   If (feature 0 <= 2561.5)
    If (feature 10 <= 0.5)
     If (feature 0 <= 2450.5)
      If (feature 3 <= 15.0)
       Predict: 4.0
      Else (feature 3 > 15.0)
       Predict: 3.0
     Else (feature 0 > 2450.5)
      If (feature 17 <= 0.5)
       Predict: 2.0
      Else (feature 17 > 0.5)
       Predict: 3.0
    Else (feature 10 > 0.5)
     Predict: 2.0
   Else (feature 0 > 2561.5)
    If (feature 0 <= 2952.5)
     If (feature 15 <= 0.5)
      If (feature 17 <= 0.5)
       Predict: 2.0
      Else (feature 17 > 0.5)
       Predict: 3.0
     Else (feature 15 > 0.5)
      Predict: 3.0
    Else (feature 0 > 2952.5)
     If (feature 3 <= 214.0)
      If (feature 36 <= 0.5)
       Predict: 2.0
      Else (feature 36 > 0.5)
       Predict: 1.0
     Else (feature 3 > 214.0)
      Predict: 2.0
  Else (feature 0 > 3047.5)
   If (feature 0 <= 

La classe di creazione dell'istanza ha molti argomenti, alcuni dei quali importanti da capire. [Se si posiziona il cursore dentro le parentesi tonde del creatore dell'istanza e si preme `shift+tab`, compare l'help.]<br>
Segue una slide sulla metrica dell'entropia, alternativa a quella di Gini:

![](dsfb_0303.png)

Una caratteristica importante degli alberi decisionali è la loro capacità di [**feature selection**](https://en.wikipedia.org/wiki/Feature_selection). Cioè, possono misurare quanto ogni feature contribuisce o meno all'accuratezza delle previsioni. Queste informazioni sono disponibili nel modello, e si possono facilmente visualizzare in *pandas*:

In [21]:
import pandas as pd

# per convertire il DataFrame da Spark a pandas useremo una nuova tecnica (che non avevamo usato nell'esempio dei 
# record linkage):
pd.DataFrame(model.featureImportances.toArray(),
            index=input_cols, columns=['importance']).\
            sort_values(by='importance', ascending=False) # --> elenco delle feature dall'importanza più alta in giù;
                                                          #     l'importanza della feature è un numero tra 0 ed 1,
                                                          #     ed è determinato soprattutto dalla "altezza" del nodo.

Unnamed: 0,importance
Elevation,0.803527
Horizontal_Distance_To_Hydrology,0.037767
Wilderness_Area_0,0.031511
Soil_Type_3,0.030654
Hillshade_Noon,0.02525
Soil_Type_1,0.023805
Soil_Type_31,0.01777
Wilderness_Area_2,0.01178
Soil_Type_22,0.006761
Vertical_Distance_To_Hydrology,0.003478


La *elevation* è di gran lunga la feature più importante per prevedere il *cover_type*. La maggior parte delle feature sembra non avere alcuna importanza per la previsione.

E' interessante vedere cosa il modello prevede per i dati di training e confrontarlo con il cover_type corretto (noto!):

In [28]:
predictions = model.transform(assembled_train_data)
predictions.select("Cover_Type", "prediction", "probability").\
            show(20, truncate = False)

+----------+----------+-------------------------------------------------------------------------------------------------+
|Cover_Type|prediction|probability                                                                                      |
+----------+----------+-------------------------------------------------------------------------------------------------+
|3.0       |3.0       |[0.0,0.0,0.03419660454341136,0.6303032398603091,0.05127761833961481,0.0,0.2842225372566647,0.0]  |
|3.0       |3.0       |[0.0,0.0,0.03419660454341136,0.6303032398603091,0.05127761833961481,0.0,0.2842225372566647,0.0]  |
|3.0       |3.0       |[0.0,0.0,0.03419660454341136,0.6303032398603091,0.05127761833961481,0.0,0.2842225372566647,0.0]  |
|6.0       |3.0       |[0.0,0.0,0.03419660454341136,0.6303032398603091,0.05127761833961481,0.0,0.2842225372566647,0.0]  |
|3.0       |3.0       |[0.0,0.0,0.03419660454341136,0.6303032398603091,0.05127761833961481,0.0,0.2842225372566647,0.0]  |
|3.0       |3.0       |[

In [33]:
predictions.head(3)  # --> 1. l'elenco dei valori dei predittori (feature) per la riga corrente
                    #     2. l'elenco delle 7 probbailità di classe per la riga corrente
                    #     3. la classe prevista per la riga corrente in base alla probabilità maggiore

[Row(Elevation=1859, Aspect=18, Slope=12, Horizontal_Distance_To_Hydrology=67, Vertical_Distance_To_Hydrology=11, Horizontal_Distance_To_Roadways=90, Hillshade_9am=211, Hillshade_Noon=215, Hillshade_3pm=139, Horizontal_Distance_To_Fire_Points=792, Wilderness_Area_0=0, Wilderness_Area_1=0, Wilderness_Area_2=0, Wilderness_Area_3=1, Soil_Type_0=0, Soil_Type_1=1, Soil_Type_2=0, Soil_Type_3=0, Soil_Type_4=0, Soil_Type_5=0, Soil_Type_6=0, Soil_Type_7=0, Soil_Type_8=0, Soil_Type_9=0, Soil_Type_10=0, Soil_Type_11=0, Soil_Type_12=0, Soil_Type_13=0, Soil_Type_14=0, Soil_Type_15=0, Soil_Type_16=0, Soil_Type_17=0, Soil_Type_18=0, Soil_Type_19=0, Soil_Type_20=0, Soil_Type_21=0, Soil_Type_22=0, Soil_Type_23=0, Soil_Type_24=0, Soil_Type_25=0, Soil_Type_26=0, Soil_Type_27=0, Soil_Type_28=0, Soil_Type_29=0, Soil_Type_30=0, Soil_Type_31=0, Soil_Type_32=0, Soil_Type_33=0, Soil_Type_34=0, Soil_Type_35=0, Soil_Type_36=0, Soil_Type_37=0, Soil_Type_38=0, Soil_Type_39=0, Cover_Type=3.0, featureVector=SparseVe

Calcolo dell'**accuratezza predittiva** e della **matrice di confusione** sul <u>training set</u>:

In [38]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Cover_Type",
                                        predictionCol="prediction")

evaluator.setMetricName("accuracy").evaluate(predictions)   # l'accuratezza delle previsioni --> 0.70 (non alta)
                                                            # formula: (135917 + 194948 + 25609 + 997 + 0 + 0 + 9433)/522967

0.7015815529469355

In [35]:
confusion_matrix = predictions.groupBy("Cover_Type").\
  pivot("prediction", range(1,8)).count().\
  na.fill(0.0).\
  orderBy("Cover_Type")

confusion_matrix.show()

+----------+------+------+-----+---+---+---+----+
|Cover_Type|     1|     2|    3|  4|  5|  6|   7|
+----------+------+------+-----+---+---+---+----+
|       1.0|135917| 51129|  166|  0|  0|  0|3584|
|       2.0| 55190|194948| 4315|100|  0|  0| 408|
|       3.0|     0|  5820|25609|644|  0|  0|   0|
|       4.0|     0|    22| 1483|997|  0|  0|   0|
|       5.0|    14|  7794|  714|  0|  0|  0|   0|
|       6.0|     0|  6156| 8935|552|  0|  0|   0|
|       7.0|  8964|    16|   57|  0|  0|  0|9433|
+----------+------+------+-----+---+---+---+----+



Le probabilità delle classi permettono il **ranking** delle previsioni, come si vede in questa slide; ad ogni valore di threshold è associata una **differente matrice di confusione**:

![](dsfb_0801.png)

Scelto un valore di threshold (e quindi la relativa matrice di confusione), se si dispone dei **costi / benefici** <u>indicativi</u> dei vari esiti della classificazione (TP,TN,FP,FN), che sono forniti dal **business** e non dal Data Scientist - dove i benefici sono positivi ed i costi negativi - si può prevedere il valore atteso del beneficio economico dell'utilizzo di un dato classificatore. Ovviamente questo valore atteso dev'essere almeno > 0!

![](dsfb_0702.png)

In [40]:
from pyspark.sql import DataFrame

def class_probabilities(data):
    total = data.count()
    return data.groupBy("Cover_Type").count().\
    orderBy("Cover_Type").\
    select(col("count").cast(DoubleType())).\
    withColumn("count_proportion", col("count")/total).\
    select("count_proportion").collect()


train_prior_probabilities = class_probabilities(train_data)
test_prior_probabilities = class_probabilities(test_data)

train_prior_probabilities

[Row(count_proportion=0.36483372755833543),
 Row(count_proportion=0.4875278937294323),
 Row(count_proportion=0.061328917503398875),
 Row(count_proportion=0.004784240688226982),
 Row(count_proportion=0.016295483271411008),
 Row(count_proportion=0.02991202121739995),
 Row(count_proportion=0.0353177160317955)]

In [41]:
train_prior_probabilities = [p[0] for p in train_prior_probabilities]
test_prior_probabilities = [p[0] for p in test_prior_probabilities]

sum([train_p * cv_p for train_p, cv_p in zip(train_prior_probabilities,
                                              test_prior_probabilities)])

0.3766123651454656

# Il tuning dell'albero

In [42]:
from pyspark.ml import Pipeline

assembler = VectorAssembler(inputCols=input_cols, outputCol="featureVector")
classifier = DecisionTreeClassifier(seed=1234, labelCol="Cover_Type",
                                    featuresCol="featureVector",
                                    predictionCol="prediction")

pipeline = Pipeline(stages=[assembler, classifier])

In [43]:
from pyspark.ml.tuning import ParamGridBuilder  # classe costruttore (builder) della GRIGLIA dei PARAMETRI

paramGrid = ParamGridBuilder(). \
  addGrid(classifier.impurity, ["gini", "entropy"]). \
  addGrid(classifier.maxDepth, [1, 20]). \
  addGrid(classifier.maxBins, [40, 300]). \
  addGrid(classifier.minInfoGain, [0.0, 0.05]). \
  build()

multiclassEval = MulticlassClassificationEvaluator(). \
  setLabelCol("Cover_Type"). \
  setPredictionCol("prediction"). \
  setMetricName("accuracy")

In [44]:
from pyspark.ml.tuning import TrainValidationSplit

validator = TrainValidationSplit(seed=1234,
  estimator=pipeline,
  evaluator=multiclassEval,
  estimatorParamMaps=paramGrid,
  trainRatio=0.9)

validator_model = validator.fit(train_data)  # circa 1.5 minuti di esecuzione sul mio PC (4GHz, 16 GB RAM);
                                             # sono 16 fit (2^4 = numero di combinazioni dei parametri di input dell'albero)

In [45]:
from pprint import pprint

best_model = validator_model.bestModel           # estrazione best model, cioè dell'albero migliore (rispetto ad una metrica)
pprint(best_model.stages[1].extractParamMap())   # display dei parametri ottimali (quelli dell'albero migliore)

{Param(parent='DecisionTreeClassifier_ef0d3d720050', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name.'): 'rawPrediction',
 Param(parent='DecisionTreeClassifier_ef0d3d720050', name='minWeightFractionPerNode', doc='Minimum fraction of the weighted sample count that each child must have after split. If a split causes the fraction of the total weight in the left or right child to be less than minWeightFractionPerNode, the split will be discarded as invalid. Should be in interval [0.0, 0.5).'): 0.0,
 Param(parent='DecisionTreeClassifier_ef0d3d720050', name='predictionCol', doc='prediction column name.'): 'prediction',
 Param(parent='DecisionTreeClassifier_ef0d3d720050', name='seed', doc='random seed.'): 1234,
 Param(parent='DecisionTreeClassifier_ef0d3d720050', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidenc

Dall'esame (non banale) delle info della cella precedente, si ricava che la miglior combinazione di parametri dell'albero è data da: maxDepth = 20, impurity = 'entropy', maxBins = 40, minInfoGain = 0.

In [46]:
validator_model = validator.fit(train_data)   # circa 1.5 minuti di esecuzione

metrics = validator_model.validationMetrics
params = validator_model.getEstimatorParamMaps()
metrics_and_params = list(zip(metrics, params))

metrics_and_params.sort(key=lambda x: x[0], reverse=True)
metrics_and_params

[(0.909049299132259,
  {Param(parent='DecisionTreeClassifier_ef0d3d720050', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini'): 'entropy',
   Param(parent='DecisionTreeClassifier_ef0d3d720050', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 20,
   Param(parent='DecisionTreeClassifier_ef0d3d720050', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 40,
   Param(parent='DecisionTreeClassifier_ef0d3d720050', name='minInfoGain', doc='Minimum information gain for a split to be considered at a tree node.'): 0.0}),
 (0.9050824830742824,
  {Param(parent='DecisionTreeClassifier_ef0d3d720050', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy,

In [47]:
metrics.sort(reverse=True)
print(metrics[0])

0.909049299132259


In [48]:
multiclassEval.evaluate(best_model.transform(test_data)) # l'accuratezza predittiva del miglior albero sui dati di test.

0.9049875096907571

# Le variabili categoriche

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def unencode_one_hot(data):
    wilderness_cols = ['Wilderness_Area_' + str(i) for i in range(4)]
    wilderness_assembler = VectorAssembler().\
                            setInputCols(wilderness_cols).\
                            setOutputCol("wilderness")

    unhot_udf = udf(lambda v: v.toArray().tolist().index(1))

    with_wilderness = wilderness_assembler.transform(data).\
      drop(*wilderness_cols).\
      withColumn("wilderness", unhot_udf(col("wilderness")).cast(IntegerType()))

    soil_cols = ['Soil_Type_' + str(i) for i in range(40)]
    soil_assembler = VectorAssembler().\
                      setInputCols(soil_cols).\
                      setOutputCol("soil")
    with_soil = soil_assembler.\
                transform(with_wilderness).\
                drop(*soil_cols).\
                withColumn("soil", unhot_udf(col("soil")).cast(IntegerType()))

    return with_soil

In [None]:
unenc_train_data = unencode_one_hot(train_data)
unenc_train_data.printSchema()

In [None]:
unenc_train_data.groupBy('wilderness').count().show()

In [None]:
from pyspark.ml.feature import VectorIndexer

cols = unenc_train_data.columns
input_cols = [c for c in cols if c!='Cover_Type']

assembler = VectorAssembler().setInputCols(input_cols).setOutputCol("featureVector")

indexer = VectorIndexer().\
  setMaxCategories(40).\
  setInputCol("featureVector").setOutputCol("indexedVector")

classifier = DecisionTreeClassifier().setLabelCol("Cover_Type").\
                                      setFeaturesCol("indexedVector").\
                                      setPredictionCol("prediction")

pipeline = Pipeline().setStages([assembler, indexer, classifier])

# Foreste di alberi (*random forest*)
La costruzione di ogni albero della foresta è indipendente dalle altre ed utilizza un sottoinsieme casuale dei predittori.

In [49]:
from pyspark.ml.classification import RandomForestClassifier

# creazione dell'istanza:
classifier = RandomForestClassifier(seed=1234, labelCol="Cover_Type",
                                    featuresCol="indexedVector",
                                    predictionCol="prediction")

In [53]:
unenc_train_data.show

NameError: name 'unenc_train_data' is not defined

In [54]:
# Skipped in book

cols = unenc_train_data.columns
input_cols = [c for c in cols if c!='Cover_Type']

assembler = VectorAssembler().setInputCols(input_cols).setOutputCol("featureVector")

indexer = VectorIndexer().\
  setMaxCategories(40).\
  setInputCol("featureVector").setOutputCol("indexedVector")

pipeline = Pipeline().setStages([assembler, indexer, classifier])

paramGrid = ParamGridBuilder(). \
  addGrid(classifier.impurity, ["gini", "entropy"]). \
  addGrid(classifier.maxDepth, [1, 20]). \
  addGrid(classifier.maxBins, [40, 300]). \
  addGrid(classifier.minInfoGain, [0.0, 0.05]). \
  build()

multiclassEval = MulticlassClassificationEvaluator(). \
  setLabelCol("Cover_Type"). \
  setPredictionCol("prediction"). \
  setMetricName("accuracy")

validator = TrainValidationSplit(seed=1234,
  estimator=pipeline,
  evaluator=multiclassEval,
  estimatorParamMaps=paramGrid,
  trainRatio=0.9)

validator_model = validator.fit(unenc_train_data)

best_model = validator_model.bestModel

NameError: name 'unenc_train_data' is not defined

In [55]:
forest_model = best_model.stages[1]  # e non [2] (nota mia)

feature_importance_list = list(zip(input_cols,
                                  forest_model.featureImportances.toArray()))
feature_importance_list.sort(key=lambda x: x[1], reverse=True)

pprint(feature_importance_list)

[('Elevation', 0.4461487371081402),
 ('Horizontal_Distance_To_Fire_Points', 0.11037833749499719),
 ('Horizontal_Distance_To_Roadways', 0.10441123449330059),
 ('Horizontal_Distance_To_Hydrology', 0.0539079699518013),
 ('Vertical_Distance_To_Hydrology', 0.03757878547476739),
 ('Hillshade_Noon', 0.0267531803902),
 ('Aspect', 0.02597171743170898),
 ('Wilderness_Area_0', 0.025184332508522863),
 ('Hillshade_9am', 0.022950425272983688),
 ('Slope', 0.019093146216324422),
 ('Hillshade_3pm', 0.013008164267475177),
 ('Soil_Type_3', 0.008735886291470128),
 ('Wilderness_Area_2', 0.008294107551219232),
 ('Soil_Type_21', 0.008117574971734605),
 ('Soil_Type_22', 0.007624249436234323),
 ('Soil_Type_31', 0.007233492036864359),
 ('Soil_Type_1', 0.006660088011108301),
 ('Wilderness_Area_1', 0.006367980167698458),
 ('Soil_Type_28', 0.00620464903456658),
 ('Soil_Type_30', 0.006164553726460858),
 ('Soil_Type_38', 0.00536598920615657),
 ('Soil_Type_32', 0.005176133107166592),
 ('Soil_Type_9', 0.00485033312388

# Le previsioni

In [56]:
unenc_test_data = unencode_one_hot(test_data)

best_model.transform(unenc_test_data.drop("Cover_Type")).\
                    select("prediction").show(1)   # --> l'accuratezza del random forest (rispetto all'albero singolo)
                                                   #     aumenta leggermente

NameError: name 'unencode_one_hot' is not defined

# Chiusura della sessione Spark

E' sempre bene chiudere la sessione Spark, come anche suggerito [qui](https://stackoverflow.com/questions/44058122/what-happens-if-sparksession-is-not-closed).

In [None]:
spark.stop()