In [0]:
pip install mlflow

In [0]:
import mlflow
import mlflow.sklearn
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import os
import pickle

from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import IsolationForest

## Caricamento dei dati in PySpark e conversione in dataframe Pandas

In [0]:
# File location and type
file_location_group = "dbfs:/FileStore/shared_uploads/marco.sasso@reti.it/group.csv"
file_location_building = "dbfs:/FileStore/shared_uploads/marco.sasso@reti.it/building.csv"
file_location_log = "dbfs:/FileStore/shared_uploads/marco.sasso@reti.it/log2.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
encoder = "cp1252"
virgola = ","
punto_e_virgola = ";"

# Loading files
df_group = spark.read.format(file_type).option("inferSchema", infer_schema).option("header", first_row_is_header).option("sep", virgola).option("encoding", encoder).load(file_location_group)
df_group = df_group.toPandas()

df_building = spark.read.format(file_type).option("inferSchema", infer_schema).option("header", first_row_is_header).option("sep", virgola).option("encoding", encoder).load(file_location_building)
df_building = df_building.toPandas()

df_log = spark.read.format(file_type).option("inferSchema", infer_schema).option("header", first_row_is_header).option("sep", punto_e_virgola).option("encoding", encoder).load(file_location_log)
df_log = df_log.toPandas()

## Definizione delle funzioni principali per il preprocessing e il training del modello

In [0]:
def signals_selection(logs, groups):
    
    # Rimuove le rilevazioni di tipo boolean, quelle non mappate nel dataset "group" e quelle che non presentano un valore nei
    # campi "Data", "Value" e "Id". Restituisce un dataframe di segnali che saranno effettivamente preprocessati e passati
    # come input al modello per l'addestramento
    
    # Unità di misura da tenere e tipologie di rilevazioni da rimuovere
    measures_to_keep = ['ppm', 'C°', '%', 'W', 'Wh']
    description_to_remove = ['Daikin Active Power Total','Consumo enel di E1 + Villa']
    
     # Rimuovo i record associati a rilevazioni di tipo booleano
    logs = logs[logs['ValueType'].isin(measures_to_keep) == True]
    
    # Rimuovo i record associati a rilevazioni di tipo "Daikin Active Power Total" e "Consumo Enel di E1 + Villa"
    logs = logs[logs['Description'].isin(description_to_remove) == False]
    
    # Effettuo il left join con la tabella group per ricavare l'ID della rilevazione
    logs = logs.merge(groups[['Description', 'Id']], how='left', on='Description')
    logs = logs.dropna(subset=['Data', 'Id', 'Value']).reset_index(drop=True)
    
    # Converto la colonna 'Data' in formato datetime
    logs['Data'] = pd.to_datetime(logs['Data'])
    
    # Converto la colonna 'Value' in formato numerico
    logs["Value"] = pd.to_numeric(np.char.replace(logs['Value'].to_numpy().astype(str),',','.'))
    
    return logs

In [0]:
def preprocessing(sel_logs):
    
    # Ricavo il mese e il giorno della settimana della rilevazione
    sel_logs['Month'] = pd.DatetimeIndex(sel_logs['Data']).month
    sel_logs['Weekday'] = pd.DatetimeIndex(sel_logs['Data']).weekday
    
    # 3) Rimuovo le features che non servono ad addestrare il modello
    features_to_drop = ['Data', 'IdBuilding', 'IndividualAddress', 'GroupAddress', 'TelegramType', 'ValueType', 'Description']
    sel_logs = sel_logs.drop(columns=features_to_drop)   
    
    return sel_logs

In [0]:
def forest_train_save(train_logs, num_est=100, cnt_rate=0.01):
    
    
    # INSERIRE AUTOLOG MLFLOW
    
    # Abilito l'autolog di ML Flow 
    mlflow.sklearn.autolog()
    
    with mlflow.start_run():
      iso_forest = IsolationForest(n_estimators=num_est, random_state=19, contamination=cnt_rate, behaviour='deprecated').fit(train_logs)
      
    return True

## Run del training del modello

In [0]:
sel_logs = signals_selection(df_log, df_group)
sel_logs.head()

Unnamed: 0,Data,IdBuilding,IndividualAddress,GroupAddress,TelegramType,Value,ValueType,Description,Id
0,2019-02-22 12:27:53.323333300,1.0,3.1.6,2/2/0,write,-33429.7,W,Produzione totale fotovoltaico,13.0
1,2019-02-22 12:28:56.400000000,1.0,3.1.201,2/0/3,write,35.68625,%,C1/0/M102 Umidità,10.0
2,2019-02-22 12:29:37.176666700,1.0,3.2.202,2/0/0,write,400.96,ppm,C1/0/M101 CO2,1.0
3,2019-02-22 12:30:21.933333300,1.0,3.3.201,2/1/8,write,36.86272,%,C1/0/U105 Umidità,12.0
4,2019-02-22 12:31:23.480000000,1.0,3.3.202,2/1/0,write,536.96,ppm,C1/0/U101 C02,3.0


In [0]:
train_logs = preprocessing(sel_logs)
train_logs.head()

Unnamed: 0,Value,Id,Month,Weekday
0,-33429.7,13.0,2,4
1,35.68625,10.0,2,4
2,400.96,1.0,2,4
3,36.86272,12.0,2,4
4,536.96,3.0,2,4


In [0]:
forest_model = forest_train_save(train_logs)
print(forest_model)