# Traffic in Bolzano
## Connection to MySQL database

In [0]:
%scala

// Connection with MySQL DB
Class.forName("org.mariadb.jdbc.Driver")

val jdbcHostname = "traffic-db.ce2ieg6xrefy.us-east-2.rds.amazonaws.com"
val jdbcPort = 3306
val jdbcDatabase = "bluetoothstations"
var jdbcUsername = "marshall"
var jdbcPassword = "happyslashgiving"
// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"

// Create a Properties() object to hold the parameters.
import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")

import java.sql.DriverManager
val connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)

In [0]:
%scala
// Opening the connection to measurement table inside the DB
val measurement = spark.read.jdbc(jdbcUrl, "measurement", connectionProperties)

// Opening the connection to station table inside the DB
val station = spark.read.jdbc(jdbcUrl, "station", connectionProperties)

In [0]:
%scala 
// Create temporary view of tables to pass data from scala to python
measurement.createOrReplaceTempView("measurement")
station.createOrReplaceTempView("station")

## Train and test splitting

In [0]:
!pip install keras
!pip install tensorflow

In [0]:
%python
# Libraries

## Model
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
import tensorflow as tf
import keras

## Data Transformation
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from numpy import array
import math
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler

## Query
from pyspark.sql.functions import hour, mean
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

In [0]:
%python
# Creating tables in Python
df = spark.table("measurement")
stations = spark.table('station')

# Dropping useless information for the model
station_codes = stations.drop("longitude","latitude")

# Joining measurements and station to map stations as numerical code
df = df.join(station_codes,df.station == stations.name)\
.drop("name")

In [0]:
# Splitting data into training and testing based on the latest timestamp inside the DB
latest_timestamp = df.agg({"timestamp":"max"}).collect()[0]

# Data between latest timestamp and 2 days before that
test = df.filter(df["timestamp"] >= latest_timestamp[0]-timedelta(hours=48))
# Oldest data till 2 days before the latest timestamp
train = df.filter(df["timestamp"] < latest_timestamp[0]-timedelta(hours=48))

## Model for predicting traffic

In [None]:
# Create a subset of dataframe for each different station
def prepare_dataset_for_sequential(dataframe, last_date, stations):
    data = dataframe.copy()
    
    # Create a list of dataframe for each station
    station_list = [x.to_list()[:2] for x in stations]
    codes = dict()
    for station in station_list:
        codes[station[1]] = station[0]
    data['station'] = [codes[x] for x in data['station']]
    data_per_station = [data[data['station']== x] for x in range(1,len(station_list)+1)]
    data = data[['count','timestamp','station']]
    return data_per_station, codes

In [None]:
# Consider 5 temporal stages of data
def create_model_dataset(dataframes):
	scaler = StandardScaler()
	final_df = pd.DataFrame()
	for df in dataframes:
        df = df.toPandas() # for extracting values
		if not df.empty:
			count = scaler.fit_transform(df['count'].values.astype('float32').reshape(1,-1).T)
			scaled = np.concatenate((count,df['station'].values.astype('float32').reshape(1,-1).T), axis=1)
			
   			# frame as supervised learning
			reframed = series_to_supervised(scaled, 5, 1)
			reframed.drop(['var2(t)'], axis=1, inplace=True)
			final_df = final_df.append(reframed)
	return final_df.reset_index(drop=True), scaler

In [0]:
# Convert series to supervised learning
def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
	n_vars = 1 if type(data) is list else data.shape[1]
	df = pd.DataFrame(data)
	cols, names = list(), list()
	# input sequence (t-n, ... t-1)
	for i in range(n_in, 0, -1):
		cols.append(df.shift(i))
		names += [('var%d(t-%d)' % (j+1, i)) for j in range(n_vars)]
	# forecast sequence (t, t+1, ... t+n)
	for i in range(0, n_out):
		cols.append(df.shift(-i))
		if i == 0:
			names += [('var%d(t)' % (j+1)) for j in range(n_vars)]
		else:
			names += [('var%d(t+%d)' % (j+1, i)) for j in range(n_vars)]
	# put it all together
	agg = pd.concat(cols, axis=1)
	agg.columns = names
	# drop rows with NaN values
	if dropnan:
		agg.dropna(inplace=True)
	return agg

In [None]:
def data_split(data):
    train_X = data[:,:-1]
    train_y = data[:,-1]
    train_X = train_X.reshape((train_X.shape[0], 1, train_X.shape[1]))
    return train_X, train_y

In [0]:
%python
# Train preparation
data_per_station_train, codes = prepare_dataset_for_sequential(train, latest_timestamp, db)
train, scaler_train = create_model_dataset(data_per_station)
train = train.toPandas().values
train_X, train_y = data_split(train)

# Test preparation
data_per_station_test, _ = prepare_dataset_for_sequential(test, latest_timestamp, db)
test, scaler_test = create_model_dataset(data_per_station)
test = test.toPandas().values
test_X, test_y = data_split(test)

In [0]:
# Neural network with LSTM
model = Sequential()
model.add(LSTM(50, input_shape=(train_X.shape[1], train_X.shape[2])))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam')
# fit network
history = model.fit(train_X, train_y, epochs=2, batch_size=72, 
                    validation_data=(test_X, test_y), 
                    verbose=2, shuffle=False)

In [0]:
y_hat = model.predict(test_X)

In [0]:
# To better visualize predictions
def obtain_prediction_dataframe(model, X, data, scaler, station_codes):
    yhat = model.predict(X)
    predictions = scaler.inverse_transform(yhat)

    indexes = dict()
    for label in station_codes.values():
        indexes[label] = 0
    i = 1
    for j in range(len(data[:,1])-1):
        if data[j,1]>i:
            indexes[data[j-1,1]] = predictions.ravel()[j-1]
            i = i+1
        
    indexes[list(indexes.keys())[-1]] = predictions.ravel()[-1]
    preds = indexes.values()
    # 9. Creating the output csv
    output = pd.DataFrame()
    output['count'] = [max(int(np.round(x,0)),0) for x in preds]
    output['station'] = station_codes.keys()
    output['timestamp'] = [(last_date+timedelta(minutes=10))] * len(preds)
    return output

predictions = obtain_prediction_dataframe(model, test_X, test, scaler_test, codes)

### Save the ML model

In [0]:
# Save model structure
model_json = model.to_json()
with open("/tmp/model.json", "w") as json_file:
    json_file.write(model_json)

# Serialize weights to HDF5
model.save_weights("/tmp/model.h5")

# Move files to filestore (accessible with files/model/...)
dbutils.fs.cp('file:/tmp/model.h5', "dbfs:/FileStore/model/model.h5")
dbutils.fs.cp('file:/tmp/model.json', "dbfs:/FileStore/model/model.json")