# Chapter 3 - Building a weather station with TensorFlow Lite for Microcontrollers




### Python libraries

In [None]:
import csv
import datetime
import matplotlib.pyplot as plt
from matplotlib import cm
import numpy as np
import os
import pandas as pd
import seaborn as sns
import sklearn.metrics
import tensorflow as tf

from numpy import mean
from numpy import std
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras import activations
from tensorflow.keras import layers

### Constants

In [None]:
BATCH_SIZE = 64
MIN_SNOW_CM = 5 # Above this value, we consider it as snow
NUM_EPOCHS = 20
OUTPUT_DATASET_FILE = "snow_dataset.csv"
TFL_MODEL_FILE = "snow_model.tflite"
TFL_MODEL_HEADER_FILE = "model.h"
TF_MODEL = "snow_forecast"

## Importing weather data from WorldWeatherOnline

### Install the www-hist package

In [None]:
!pip install wwo-hist

### Import retrieve_hist_data function from wwo-hist

In [None]:
from wwo_hist import retrieve_hist_data # WorldWeatherOnline

### Method 1: Acquire data using the www_hist Python module

In [None]:
frequency=1
api_key = 'CHANGE-ME'
location_list = ['canazei']

# retrieve_hist_data returns a list of dataframe(s)
hist_df = retrieve_hist_data(api_key,
                             location_list,
                             '01-JAN-2011',
                             '31-DEC-2020',
                             frequency,
                             location_label = False,
                             export_csv = False,
                             store_df = True)

# Extract temperature, humidity and precipitation
t_list = hist_df[0].tempC.astype(float).to_list()
h_list = hist_df[0].humidity.astype(float).to_list()
s_list = hist_df[0].totalSnow_cm.astype(float).to_list()

### Method 2: Acquire data using the Historical Weather API

In [None]:
import calendar
import requests

api_key = 'CHANGE-ME'
city = 'canazei'

t_list = []
h_list = []
s_list = []

for year in range(2011, 2021):
  for month in range(1, 13):
    num_days_month = calendar.monthrange(year, month)[1]
    start_date='{year}-{month}-01'.format(year=year, month=month)
    end_date='{year}-{month}-{last_day}'.format(year=year, month=month, last_day=num_days_month)

    url_base = 'http://api.worldweatheronline.com/premium/v1/past-weather.ashx'
    api_url = url_base + "?key={key}&q={city}&format=json&date={start_date}&enddate={end_date}&tp=1".format(key=api_key,
                                                                                                            city=city,
                                                                                                            start_date=start_date,
                                                                                                            end_date=end_date)

    print(api_url)

    response = requests.get(api_url)

    if response.status_code == 200:
      json = response.json()

      for x in json['data']['weather']:
        snow_in_cm = float(x['totalSnow_cm'])
        for y in x['hourly']:
          t = float(y['tempC'])
          h = float(y['humidity'])
          t_list.append(t)
          h_list.append(h)
          s_list.append(snow_in_cm)

## Preparing the dataset

### Explore the extracted physical quantities in a 2D scatter chart

In [None]:
t_bin_list = []
h_bin_list = []

for snow, t, h in zip(s_list, t_list, h_list):
  if snow > MIN_SNOW_CM:
    t_bin_list.append(t)
    h_bin_list.append(h)

plt.figure(dpi=100)
sc = plt.scatter(t_bin_list, h_bin_list, c='#000000', label="Snow")
plt.grid(color = '#AAAAAA', linestyle = '--', linewidth = 0.5)
plt.legend()
plt.title("Snowfall")
plt.xlabel("Temperature - °C")
plt.ylabel("Humidity - %")
plt.show()

### Generate the output labels (Yes and No)

In [None]:
def gen_label(snow):
  if snow > MIN_SNOW_CM:
    return "Yes"
  else:
    return "No"

labels_list = []

for snow, temp in zip(s_list, t_list):
  labels_list.append(gen_label(snow))

### Build the dataset

In [None]:
csv_header = ["Temp0", "Temp1", "Temp2", "Humi0", "Humi1", "Humi2", "Snow"]

dataset_df = pd.DataFrame(list(zip(t_list[:-2], t_list[1:-1], t_list[2:], h_list[:-2], h_list[1:-1], h_list[2:], labels_list[2:])), columns = csv_header)

### Balance the dataset by undersampling the majority class


In [None]:
df0 = dataset_df[dataset_df['Snow'] == "No"]
df1 = dataset_df[dataset_df['Snow'] == "Yes"]

nosnow_samples_old_percent = round((len(df0.index) / (len(dataset_df.index))) * 100, 2)
snow_samples_old_percent   = round((len(df1.index) / (len(dataset_df.index))) * 100, 2)

print(len(df0.index), len(df1.index))
print(nosnow_samples_old_percent, snow_samples_old_percent)

# Random subsampling of the majority class to guarantee 50% split
if len(df1.index) < len(df0.index):
  df0_sub = df0.sample(len(df1.index))
  dataset_df = pd.concat([df0_sub, df1])
else:
  df1_sub = df1.sample(len(df0.index))
  dataset_df = pd.concat([df1_sub, df0])

df0 = dataset_df[dataset_df['Snow'] == "No"]
df1 = dataset_df[dataset_df['Snow'] == "Yes"]

nosnow_samples_new_percent = round((len(df0.index) / (len(dataset_df.index))) * 100, 2)
snow_samples_new_percent = round((len(df1.index) / (len(dataset_df.index))) * 100, 2)

# Show number of samples
df_samples_results = pd.DataFrame.from_records(
                [["% No Snow", nosnow_samples_old_percent, nosnow_samples_new_percent],
                ["% Snow", snow_samples_old_percent, snow_samples_new_percent]],
            columns = ["Class", "Before - %", "After - %"], index="Class").round(2)

display(df_samples_results)

### Scale the input features with Z-score independently


In [None]:
# Get all values
t_list = dataset_df['Temp0'].tolist()
h_list = dataset_df['Humi0'].tolist()
t_list = t_list + dataset_df['Temp2'].tail(2).tolist()
h_list = h_list + dataset_df['Humi2'].tail(2).tolist()

# Calculate mean and standard deviation
t_avg = mean(t_list)
h_avg = mean(h_list)
t_std = std(t_list)
h_std = std(h_list)
print("COPY ME!")
print("Temperature - [MEAN, STD]  ", round(t_avg, 5), round(t_std, 5))
print("Humidity - [MEAN, STD]     ", round(h_avg, 5), round(h_std, 5))

# Scaling with Z-score function
def scaling(val, avg, std):
  return (val - avg) / (std)

dataset_df['Temp0'] = dataset_df['Temp0'].apply(lambda x: scaling(x, t_avg, t_std))
dataset_df['Temp1'] = dataset_df['Temp1'].apply(lambda x: scaling(x, t_avg, t_std))
dataset_df['Temp2'] = dataset_df['Temp2'].apply(lambda x: scaling(x, t_avg, t_std))
dataset_df['Humi0'] = dataset_df['Humi0'].apply(lambda x: scaling(x, h_avg, h_std))
dataset_df['Humi1'] = dataset_df['Humi1'].apply(lambda x: scaling(x, h_avg, h_std))
dataset_df['Humi2'] = dataset_df['Humi2'].apply(lambda x: scaling(x, h_avg, h_std))

### Visualize raw/scaled input features distributions


In [None]:
t_norm_list = dataset_df['Temp0'].tolist()
h_norm_list = dataset_df['Humi0'].tolist()
t_norm_list = t_norm_list + dataset_df['Temp2'].tail(2).tolist()
h_norm_list = h_norm_list + dataset_df['Humi2'].tail(2).tolist()

fig, ax=plt.subplots(1,2)
plt.subplots_adjust(wspace = 0.4)
ax[0].set_title("Raw temperature")
ax[1].set_title("Raw humidity")
sns.histplot(t_list, ax=ax[0], kde=True)
sns.histplot(h_list, ax=ax[1], kde=True)

fig, ax=plt.subplots(1,2)
plt.subplots_adjust(wspace = 0.5)
sns.histplot(t_norm_list, ax=ax[0], kde=True)
ax[0].set_title("Scaled temperature")
ax[1].set_title("Scaled humidity")
sns.histplot(h_norm_list, ax=ax[1], kde=True)

### Export to CSV file

In [None]:
dataset_df.to_csv(OUTPUT_DATASET_FILE, index=False)

## Training the ML model with TF

### Extract the input features and output labels from the dataset_df Pandas DataFrame

In [None]:
f_names = dataset_df.columns.values[0:6]
l_name  = dataset_df.columns.values[6:7]
x = dataset_df[f_names]
y = dataset_df[l_name]

### Encode the labels to numerical values


In [None]:
labelencoder = LabelEncoder()
labelencoder.fit(y.Snow)
y_encoded = labelencoder.transform(y.Snow)

### Split the dataset into train, validation, and test datasets

In [None]:
# Split 1 (80% vs 20%)
x_train, x_validate_test, y_train, y_validate_test = train_test_split(x, y_encoded, test_size=0.20, random_state = 1)
# Split 2 (50% vs 50%)
x_test, x_validate, y_test, y_validate = train_test_split(x_validate_test, y_validate_test, test_size=0.50, random_state = 3)

### Create the model with Keras API

In [None]:
model = tf.keras.Sequential()
model.add(layers.Dense(12, activation='relu', input_shape=(len(f_names),)))
model.add(layers.Dropout(0.4))
model.add(layers.Dense(1, activation='sigmoid'))
model.summary()

### Compile the model

In [None]:
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

### Train the model

In [None]:
history = model.fit(x_train, y_train, epochs=NUM_EPOCHS, batch_size=BATCH_SIZE, validation_data=(x_validate, y_validate))

### Analyze the accuracy and loss after each training epoch


In [None]:
loss_train = history.history['loss']
loss_val   = history.history['val_loss']
acc_train  = history.history['accuracy']
acc_val    = history.history['val_accuracy']
epochs     = range(1, NUM_EPOCHS + 1)

def plot_train_val_history(x, y_train, y_val, type_txt):
  plt.figure(dpi=150)
  plt.plot(x, y_train, 'g', label='Training'+type_txt)
  plt.plot(x, y_val, 'b', label='Validation'+type_txt)
  plt.title('Training and Validation'+type_txt)
  plt.xlabel('Epochs')
  plt.ylabel(type_txt)
  plt.legend()
  plt.show()

plot_train_val_history(epochs, loss_train, loss_val, "Loss")
plot_train_val_history(epochs, acc_train, acc_val, "Accuracy")

### Save the entire TensorFlow model as a SavedModel

In [None]:
model.save(TF_MODEL)

## Evaluating the model effectiveness

### Use the trained model to predict the output classes of the test dataset

In [None]:
y_test_pred = model.predict(x_test)
y_test_pred = (y_test_pred > 0.5).astype("int32")

### Compute the confusion matrix with scikit-learn

In [None]:
cm = sklearn.metrics.confusion_matrix(y_test, y_test_pred)

### Visualize the confusion matrix

In [None]:
index_names  = ["Actual No Snow", "Actual Snow"]
column_names = ["Predicted No Snow", "Predicted Snow"]

df_cm = pd.DataFrame(cm, index = index_names, columns = column_names)

plt.figure(dpi=150)
sns.heatmap(df_cm, annot=True, fmt='d', cmap="Blues")

### Calculate Recall, Precision, and F-score performance metrics

In [None]:
TN = cm[0][0]
TP = cm[1][1]
FN = cm[1][0]
FP = cm[0][1]

accur  = (TP + TN) / (TP + TN + FN + FP)
precis = TP / (TP + FP)
recall = TP / (TP + FN)
f_score = (2 * recall * precis) / (recall + precis)

print("Accuracy:  ", round(accur, 3))
print("Recall:    ", round(recall, 3))
print("Precision: ", round(precis, 3))
print("F-score:   ", round(f_score, 3))

## Quantizing the model with TFLite converter

### Select a few hundred of samples randomly from the test dataset to calibrate the quantization

In [None]:
def representative_data_gen():
  data = tf.data.Dataset.from_tensor_slices(x_test)
  for i_value in data.batch(1).take(100):
    i_value_f32 = tf.dtypes.cast(i_value, tf.float32)
    yield [i_value_f32]

### Import the TensorFlow SavedModel directory into TensorFlow Lite Converter

In [None]:
converter = tf.lite.TFLiteConverter.from_saved_model(TF_MODEL)

### Initialize TensorFlow Lite converter for the 8-bit quantization

In [None]:
converter.representative_dataset = tf.lite.RepresentativeDataset(representative_data_gen)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

### Convert the model to TensorFlow Lite file format (FlatBuffers) as save it as .tflite

In [None]:
tflite_model_quant = converter.convert()
open(TFL_MODEL_FILE, "wb").write(tflite_model_quant)

### Convert the TensorFlow Lite model to C-byte array with xxd

In [None]:
!apt-get update && apt-get -qq install xxd
!xxd -i $TFL_MODEL_FILE > model.h
!sed -i 's/unsigned char/const unsigned char/g' model.h
!sed -i 's/const/alignas(8) const/g' model.h

### Get the TensorFlow model size in bytes to estimate the program memory usage

In [None]:
size_tfl_model = len(tflite_model_quant)
print(len(tflite_model_quant), "bytes")