<a href="https://colab.research.google.com/github/FG2511/ARE/blob/master/model1_provaPrecisione.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
'''
@File name: model1.ipynb
@Created on 2018-12-20
@Authors: Federica Gerina, Francesca Moi, Silvia Maria Massa
@Description: Given a time-series dataset that contains minute-by-minute data 
about different kind of gases, collected by the uHoo air quality sensor, train
a NN that classifies if a minute belongs to the class "Pasto" (1) otherwise to
the class "Other" (0).
'''

!pip install liac-arff

import arff
import numpy as np

from keras import optimizers
from keras.models import Sequential
from keras.models import load_model
from keras.layers import Dense, Dropout, LeakyReLU, BatchNormalization, Activation
from keras.callbacks import EarlyStopping

from sklearn.utils import compute_class_weight
from sklearn.metrics import confusion_matrix

import sys
sys.path.append('local_modules')

import postprocessing_sw

In [0]:
#fix random seed for reproducibility
seed = 5
np.random.seed(seed)

In [0]:
'''
@Description: generate a multilayer perceptron with LeakyRelu as activation
function.
@param: 
  - shape : int, the shape of the input
  - n_features: int, the number of features given
'''

def generate_model_leaky(shape, n_features):

  units_1 = int(n_features/2)
  units_2 = n_features
  units_3 = n_features*2

  model = Sequential()
  model.add(BatchNormalization())
  
  model.add(Dense(units_1, input_dim=shape, kernel_initializer='random_uniform',  use_bias = False))
  model.add(BatchNormalization())
  model.add(LeakyReLU(alpha = 0.2))
  model.add(Dropout(0.5))
  
  model.add(Dense(units_2, kernel_initializer='random_uniform',  use_bias = False))
  model.add(BatchNormalization())
  model.add(LeakyReLU(alpha = 0.2))
  model.add(Dropout(0.5))
  
  model.add(Dense(units_3, kernel_initializer='random_uniform',  use_bias = False))
  model.add(BatchNormalization())
  model.add(LeakyReLU(alpha = 0.2))
  model.add(Dropout(0.5))
  
  model.add(Dense(1, activation='sigmoid'))
  #print(model.summary())

  return model

In [0]:
#@title CHOOSE

'''
@Description: MAIN
'''

#LOAD DATA
print("Loading data...")

dataset = 'datasetName.arff' #@param {type:"string"}

with open (dataset, encoding='utf-8') as f:
  dataDictionary = arff.load(f)

data = np.array(dataDictionary['data'])
print("DATASET LOADED")

#CONVERTING VALUES
print("\nConverting values...")
for i in data:
  if(i[-1] == 'Other'): i[-1] = 0
  elif(i[-1] == 'Pasto') : i[-1] = 1

dataset = data.astype('float32')
print("CONVERSION DONE")

#SPLIT INTO INPUT (X) AND OUTPUT (Y) VARIABLES
s = dataset.shape[-1]
#print(s)
X = dataset[:,0:s-1]
Y = dataset[:,s-1]

#print(s-1)

n_features = s-1

#SPLIT INTO TRAINING, VALIDATION AND TEST SETS
print("\nSplit into training, validation and test sets...")

train_rate = 80
val_rate = 10
train = round(int((dataset.shape[0]*train_rate)/100))
val = round(int((dataset.shape[0]*(train_rate+val_rate))/100))

train_data = X[:train]
train_label = Y[:train]

val_data = X[train+1:val]
val_label = Y[train+1:val]

test_data = X[val+1:]
test_label = Y[val+1:]
print("DATASET SPLITTED")

#COMPUTE CLASS WEIGHT
labels = np.unique(train_label)
classWeight = compute_class_weight('balanced', labels, train_label)
classWeight = dict(zip(labels,classWeight))

#GENERATE MODEL
print("\nGenerate model...")

model = generate_model_leaky(train_data.shape[-1], n_features)

#OPTIMIZERS
adm = optimizers.Adam(lr=0.0001)

#COMPILE MODEL
print("\nCompile model...")
model.compile(loss='binary_crossentropy', optimizer = adm , metrics=['accuracy'])

#EARLY STOPPING
es = EarlyStopping(monitor='val_loss', min_delta=0, patience=2, verbose=0, mode='auto')

#FIT MODEL
print("\nFit model...")
history = model.fit(train_data, train_label, epochs=10, validation_data = (val_data, val_label), batch_size = 128, shuffle = True, class_weight = classWeight, verbose=1, callbacks = [es])

#EVALUATE MODEL
print("\nEvaluate model...")
scores_test = model.evaluate(test_data, test_label, batch_size=128, verbose = 1)
print("Test loss: %.2f%%" % (scores_test[0] * 100))
print("Test accuracy: %.2f%%" % (scores_test[1] * 100))

#CALCULATE PREDICTIONS
print("\nCalculate predictions...")
pred = model.predict_classes(test_data, batch_size=128, verbose=0)
flat_pred = [item for sublist in pred for item in sublist]

#CONFUSION MATRIX BEFORE POST PROCESSING
print("\nCompute confusion matrix BEFORE POST PROCESSING...")
y_true = test_label
y_pred = pred
tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
print("TN", tn)
print("FP", fp)
print("FN", fn)
print("TP", tp)
other = 100*tn/(tn+fp)
pasto = 100*tp/(fn+tp)
precision = 100*(tp/(tp+fp))
print("Other corretti: %.2f %%" % other)
print("Pasto corretti: %.2f %%" % pasto)
print("Precisione: %.2f %%" % precision)

time = []
for i in test_data:
  time.append(i[-5])

In [0]:
new_pred = postprocessing_sw.sliding_windows(flat_pred,5)

#CONFUSION MATRIX AFTER POST PROCESSING
print("\nCompute NEW confusion matrix AFTER POST PROCESSING...")
y_true = test_label
tn, fp, fn, tp = confusion_matrix(y_true, new_pred).ravel()
print("TN", tn)
print("FP", fp)
print("FN", fn)
print("TP", tp)
other = 100*tn/(tn+fp)
pasto = 100*tp/(fn+tp)
print("Other corretti: %.2f %%" % other)
print("Pasto corretti: %.2f %%" % pasto)

In [0]:
def get_precision(p, r):
  index_pred = []
  index_real = []

  prediction_meals = []
  real_meals = []

  #p = flat_pred
  #r = test_label

  #Costruzione delle liste di indici: una di pasti predetti e una di pasti reali

  for i in range(0, len(p)):
    if p[i] == 1:
      index_pred.append(i)
      if ((i+1) in range(0, len(p)) and p[i+1] == 0) or (i+1) not in range(0, len(p)): 
        if(len(index_pred)>5):        
          prediction_meals.append(index_pred)
        index_pred = []
    if r[i] == 1:
      index_real.append(i)
      if ((i+1) in range(0, len(p)) and r[i+1] == 0) or (i+1) not in range(0, len(p)): 
        real_meals.append(index_real)
        index_real = []

  #Ricerca delle intersezioni

  intersection = []
  tp = 0
  fp = 0

  for i in range(0, len(real_meals)): #inizializzazione
    intersection.append(0)

  for i in range(0, len(prediction_meals)): #per ogni pasto predetto
    flag_found = 0 #per tenere conto se l'intersezione è già stata trovata all'interno del pasto reale

    j=0
    while j in range(0, len(real_meals)) and flag_found==0: # per ogni pasto reale    
      #se almeno un elemento di prediction_meals[i] è presente in j
      flag_visited = 0 

      for x in prediction_meals[i]: #da qui potrei mettere una funz

        if x in real_meals[j]: #se c'è l'intersezione        

          if intersection[j]==0: #controlla che non sia già stata trovata          
            intersection[j] = 1
            flag_visited = 1 #imposta il flag a uno per indicare che il vero positivo è stato già individuato e per non contarlo più di una volta
            flag_found = 1 #imposta il flag a uno per indicare che il pasto predetto è stato trovato nei pasti reali
            tp=tp+1

          elif flag_visited == 0: #se è stata già trovata per un altro pasto e quindi è la prima volta che controllo questo pasto predetto,
            fp=fp+1
            flag_visited = 1 #imposta il flag a uno per indicare che il falso positivo è stato già individuato e per non contarlo più di una volta
            flag_found = 1

      j = j + 1  
    #end for j

    if flag_found == 0: #se l'intersezione nn è stata trovata nei pasti reali il pasto predetto è falso
      fp=fp+1
      #print("FP3")

  #Calcolo accuratezza
  print("\nPrecision: ")
  print("N° pasti reali:", len(real_meals))
  print("N° pasti predetti:", len(prediction_meals))

  print("TP:", tp)
  print("FP:", fp)

  somma = tp+fp
  precision = (tp/somma)*100
  TPR = (tp/len(real_meals))*100

  print("Tot = ", somma)
  print("TPR = ", TPR)
  print("Precisione: %.2f %%" % precision)


In [0]:
get_precision(new_pred_1,test_label)

In [0]:
def getMeals(pred, real):
  index=[]
  index_pred = []
  index_real = []

  prediction_meals = []
  real_meals = []

  ''' Costruzione delle liste: una di pasti predetti e una di pasti reali '''

  #Costruzione pasti predetti

  for i in range(0, len(pred)):

    if pred[i] == 0:  

      index.append(i)

      if ((i+1) in range(0, len(pred)) and pred[i+1] == 1) or (i+1) not in range(0, len(pred)): 

        index_pred.append(index)
        prediction_meals.append(0)
        index = []

    if pred[i] == 1:

      index.append(i)

      if ((i+1) in range(0, len(pred)) and pred[i+1] == 0) or (i+1) not in range(0, len(pred)): 

        index_pred.append(index)
        prediction_meals.append(1)
        index = []

  #Costruzione pasti reali  

  for i in range(0, len(real)):

    if real[i] == 0:  

      index.append(i)

      if ((i+1) in range(0, len(real)) and real[i+1] == 1) or (i+1) not in range(0, len(real)): 

        index_real.append(index)
        real_meals.append(0)
        index = []

    if real[i] == 1:

      index.append(i)

      if ((i+1) in range(0, len(real)) and real[i+1] == 0) or (i+1) not in range(0, len(real)): 

        index_real.append(index)
        real_meals.append(1)
        index = []

  return prediction_meals

In [0]:
real_meals = getMeals(flat_pred, test_label)