In [None]:
from wwo_hist import retrieve_hist_data
import matplotlib.pyplot as plt
from matplotlib import colormaps
import pandas as pd
from numpy import mean, std

# Collect data

In [None]:
api_key = '934cf43240344cfb8ea21455232111'
location_list = ['M5B0C3']
#hist_df = retrieve_hist_data(api_key, location_list, '19-NOV-2017', '19-NOV-2023', frequency=1, location_label=False, export_csv=True, store_df=False)
hist_df = pd.read_csv('M5B0C3.csv')

In [None]:
t_list = hist_df.tempC.astype(float).to_list()
h_list = hist_df.humidity.astype(float).to_list()
s_list = hist_df.totalSnow_cm.astype(float).to_list()

# Process data and prepare dataset

In [None]:
def binarize(snow, thershold):
    if snow > thershold:
        return 1
    else:
        return 0
    
s_bin_list = [binarize(snow, 0.5) for snow in s_list]

cm = colormaps['gray_r']
sc = plt.scatter(t_list, h_list, c=s_bin_list, cmap=cm, label='Snow')
plt.colorbar(sc)
plt.legend()
plt.grid(True)
plt.title('Snowfall')
plt.xlabel('Temperature (*C)')
plt.ylabel('Humidity (%)')

In [None]:
def gen_label(snow):
    if snow > 0.5:
        return "Yes"
    else:
        return "No"
    
labels_list = [gen_label(snow) for snow in s_list]

In [None]:
csv_header = ['Temp0', 'Temp1', 'Temp2', 'Humi0', 'Humi1', 'Humi2', 'Snow']

dataset_df = pd.DataFrame(list(zip(t_list[:-2], t_list[1:-1], t_list[2:], h_list[:-2], h_list[1:-1], h_list[2:], labels_list[2:])), columns=csv_header)

In [None]:
df0 = dataset_df[dataset_df['Snow'] == 'No']
df1 = dataset_df[dataset_df['Snow'] == 'Yes']

if len(df1.index) < len(df0.index):
    df0_sub = df0.sample(len(df1.index))
    dataset_df = pd.concat([df0_sub, df1])
else:
    df1_sub = df1.sample(len(df0.index))
    dataset_df = pd.concat([df1_sub, df0])

In [None]:
t_list = dataset_df['Temp0'].tolist() + dataset_df['Temp2'].tail(2).tolist()
h_list = dataset_df['Humi0'].tolist() + dataset_df['Humi2'].tail(2).tolist()

Scale input features with Z-score

In [None]:
t_avg = mean(t_list)
h_avg = mean(h_list)
t_std = std(t_list)
h_std = std(h_list)

print('COPY ME!')
print('Temperature - [MEAN, STD] ', round(t_avg, 5), round(t_std, 5))
print('Humidity - [MEAN, STD] ', round(h_avg, 5), round(h_std, 5))

In [None]:
def scaling(val, avg, std):
    return (val - avg)/std

dataset_df['Temp0'] = dataset_df['Temp0'].apply(lambda x: scaling(x, t_avg, t_std))
dataset_df['Temp1'] = dataset_df['Temp1'].apply(lambda x: scaling(x, t_avg, t_std))
dataset_df['Temp2'] = dataset_df['Temp2'].apply(lambda x: scaling(x, t_avg, t_std))

dataset_df['Humi0'] = dataset_df['Humi0'].apply(lambda x: scaling(x, h_avg, h_std))
dataset_df['Humi1'] = dataset_df['Humi1'].apply(lambda x: scaling(x, h_avg, h_std))
dataset_df['Humi2'] = dataset_df['Humi2'].apply(lambda x: scaling(x, h_avg, h_std))

Visualize scaled inputs

In [None]:
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

In [None]:
f_names = dataset_df.columns.values[:6]
l_name = dataset_df.columns.values[6:7]
x = dataset_df[f_names]
y = dataset_df[l_name]

In [None]:
labelencoder = LabelEncoder()
labelencoder.fit(y.Snow)
y_encoded = labelencoder.transform(y.Snow)

Split into 80% train, 10% validate, 10% test

In [None]:
x_train, x_validate_test, y_train, y_validate_test = train_test_split(x, y_encoded, test_size=0.2, random_state=1)
x_test, x_validate, y_test, y_validate = train_test_split(x_validate_test, y_validate_test, test_size=0.5, random_state=3)

# Create model (binary classifier)

## Train neural network
Input (6 features) -> [Fully connected layer (12 neurons) + Relu] -> [Dropout 20%] -> [Fully connected (1 neuron) + sigmoid] -> Output

In [None]:
from sklearn.metrics import confusion_matrix
import tensorflow as tf

In [None]:
model = tf.keras.Sequential()
model.add(tf.keras.layers.Dense(12, activation='relu', input_shape=(len(f_names),)))
model.add(tf.keras.layers.Dropout(0.2))
model.add(tf.keras.layers.Dense(1, activation='sigmoid'))
model.summary()

In [None]:
model.compile(loss='binary_crossentropy', optimizer='adam',metrics=['accuracy'])

callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)

In [None]:
history = model.fit(x_train, y_train, 
                    epochs=75, batch_size=64, 
                    validation_data=(x_validate, y_validate), 
                    callbacks=[callback])

In [None]:
loss_train = history.history['loss']
loss_val = history.history['val_loss']
acc_train = history.history['accuracy']
acc_val = history.history['val_accuracy']
epochs = range(1, len(history.history['loss'])+1)

def plot_train_val_history(x, train, val, type_txt):
    plt.figure(figsize=(10,7))
    plt.plot(x, train, 'g', label='Training ' + type_txt)
    plt.plot(x, val, 'b', label='Validation ' + type_txt)
    plt.xlabel('Epochs')
    plt.ylabel(type_txt)
    plt.legend()
    plt.show()

plot_train_val_history(epochs, loss_train, loss_val, 'Loss')
plot_train_val_history(epochs, acc_train, acc_val, 'Accuracy')

In [None]:
model.save('snow_forecast')

In [None]:
y_test_pred = model.predict(x_test)
y_test_pred = (y_test_pred > 0.5).astype('int32') # binarize output probabilities

## Quantization aware training

In [None]:
import tensorflow_model_optimization as tfmot

In [None]:
quantize_model = tfmot.quantization.keras.quantize_model

q_aware_model = quantize_model(model)

q_aware_model.compile(loss='binary_crossentropy', optimizer='adam',metrics=['accuracy'])

q_aware_model.summary()

In [None]:
x_train_subset, _, y_train_subset, _ = train_test_split(x_train, y_train, test_size=0.25, random_state=8)

q_aware_model.fit(x_train_subset, y_train_subset, batch_size=64, epochs=1, validation_split=0.1)


Get operations used in model

In [None]:
@tf.function
def func(x):
    return tflite_model_quant(x)

model_func = func.get_concrete_function(pd.DataFrame(pd.DataFrame([(0,0,0,0,0,0)], columns=csv_header[:6])))
ops = model_func.graph.get_operations()

unique_ops = set()

for op in ops:
    unique_ops.add(op)

for op in unique_ops:
    print(f'Name: {op.name}')
    print(f'Op: {op.type}')
    print()

## Evaluate model performance

In [None]:
cm = confusion_matrix(y_test, y_test_pred)

index_names = ['Actual No Snow', 'Actual Snow']
column_names = ['Predicted No Snow', 'Predicted Snow']

df_cm = pd.DataFrame(cm, index=index_names, columns=column_names)

plt.figure(dpi=150)

import seaborn as sns

sns.heatmap(df_cm, annot=True, fmt='d', cmap='Blues')

In [None]:
TN,TP,FN,FP = cm[0][0],cm[1][1],cm[1][0],cm[0][1]
accur = (TP+TN)/(TP+TN+FN+FP)
precis = TP/(TP+FP)
recall = TP/(TP+FN)
specificity = TN/(TN+FP)
f_score = 2*precis*recall/(precis+recall)
_, q_aware_model_acc = q_aware_model.evaluate(x_test, y_test, verbose=0)
print(f'Accuracy: {round(accur, 3)}')
print(f'Quant accuracy: {round(q_aware_model_acc, 3)}')
print(f'Precision: {round(precis, 3)}')
print(f'Recall: {round(recall, 3)}')
print(f'Specificity: {round(specificity, 3)}')
print(f'F-score: {round(f_score, 3)}')


# Convert model to TensorFlow Lite and byte array

In [None]:
def representative_data_gen():
    data = tf.data.Dataset.from_tensor_slices(x_test)
    for i_value in data.batch(1).take(100):
        i_value_f32 = tf.dtypes.cast(i_value, tf.float32)
        yield [i_value_f32]

In [None]:
# initialize the TFLite converter
#converter = tf.lite.TFLiteConverter.from_saved_model('snow_forecast')
converter = tf.lite.TFLiteConverter.from_keras_model(q_aware_model)
converter.representative_dataset = tf.lite.RepresentativeDataset(representative_data_gen)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

# convert to TFLite format
tflite_model_quant = converter.convert()


View TfLite model

In [6]:
import netron

netron.start('snow_model_q_aware.tflite')
netron.start('snow_model.tflite')

Serving 'snow_model_q_aware.tflite' at http://localhost:18518
Serving 'snow_model.tflite' at http://localhost:23479


('localhost', 23479)

rm: cannot remove '/home/kpaps/.config/wslu/baseexec': No such file or directory


In [None]:
# save model and convert it to C byte-array
# open('snow_model.tflite', 'wb').write(tflite_model_quant)
# !xxd -i 'snow_model.tflite' > 'model.h'

In [None]:
# open('snow_model_q_aware.tflite', 'wb').write(tflite_model_quant)
# !xxd -i 'snow_model_q_aware.tflite' > 'q_aware_model.h'