# Modelling with lasagne

## Import and options

In [1]:
from IPython.core.display import HTML
css = open('style-table.css').read() + open('style-notebook.css').read()
HTML('<style>{}</style>'.format(css))

In [2]:
import pickle

import h5py
import dask
# from dask import array as da
from dask import dataframe as dd
# from dask import delayed
# from dask.multiprocessing import get
import pandas as pd
import pathlib2 as pl
import mmh3  # The hash function used to hash sites. See the preprocessor script.

In [3]:
pd.set_option('display.max_colwidth', 100)
pd.set_option('display.max_rows', 200)
pd.set_option('display.max_columns', 6)
pd.set_option('display.width', 1000)

# dask.set_options(get=get);  # Due to a bug we can't read files in different processes so set this option after reading.

## Loading data into dask dataframes

Our preprocessor supports output into `numpy` `arrays` and `pandas` `DataFrames` and `scikit-learn` supports the latter.

In [4]:
CHUNK_SIZE = int(2e5)
DF_DIR = pl.Path('/Volumes/CompanionEx/Data/dfs_pandas/PP_TS_2016-05-24-00_2016-06-01-00*.hdf')
str(DF_DIR)

'/Volumes/CompanionEx/Data/dfs_pandas/PP_TS_2016-05-24-00_2016-06-01-00*.hdf'

In [5]:
# data = dd.read_hdf(str(DF_DIR), key='dataset', chunksize=CHUNK_SIZE)
data = pd.read_hdf('/Volumes/CompanionEx/Data/dfs_pandas/PP_TS_2016-05-24-00_2016-06-01-00_0-200_20160624101922.hdf')
data.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,precipitation mm/h,temperature C,timestamp_start,...,trafficspeed km/h,windspeed m/s,site_hash
site,datetime_start,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
rws01_monibas_0010vwa0056ra,2016-05-23 23:00:00,0.0,10.4,1464044400,...,55.0,3.0,-1854210412049763327
rws01_monibas_0010vwa0056ra,2016-05-24 00:00:00,0.0,10.3,1464048000,...,55.0,6.0,-1854210412049763327
rws01_monibas_0010vwa0056ra,2016-05-24 01:00:00,0.0,10.2,1464051600,...,55.0,4.0,-1854210412049763327
rws01_monibas_0010vwa0056ra,2016-05-24 02:00:00,0.0,10.1,1464055200,...,55.0,4.0,-1854210412049763327
rws01_monibas_0010vwa0056ra,2016-05-24 03:00:00,0.0,10.3,1464058800,...,55.0,3.0,-1854210412049763327


In [6]:
with open('../selected_sites.pkl', mode='rb') as fname:
    sites = pickle.load(fname)

print(len(sites))
sites[:5]

17650


['rws01_monibas_0010vwa0065ra',
 'rws01_monibas_0010vwa0223ra',
 'rws01_monibas_0010vwa0248ra',
 'rws01_monibas_0010vwa0269ra',
 'rws01_monibas_0010vwa0286ra']

In [7]:
from random import sample

In [8]:
samples_sites = sample(sites, 2000)

In [9]:
print(len(data))
data = data.query('site in @samples_sites')
print(len(data))

3790662
426024


In [10]:
datetime_index = data.index.get_level_values('datetime_start')
datetime_index = pd.DatetimeIndex(datetime_index)

data['day'] = datetime_index.to_period(freq='d')
data.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,precipitation mm/h,temperature C,timestamp_start,...,windspeed m/s,site_hash,day
site,datetime_start,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
rws01_monibas_0010vwa0065ra,2016-05-23 23:00:00,0.0,10.4,1464044400,...,3.0,8446559662630338889,2016-05-23
rws01_monibas_0010vwa0065ra,2016-05-24 00:00:00,0.0,10.3,1464048000,...,6.0,8446559662630338889,2016-05-24
rws01_monibas_0010vwa0065ra,2016-05-24 01:00:00,0.0,10.2,1464051600,...,4.0,8446559662630338889,2016-05-24
rws01_monibas_0010vwa0065ra,2016-05-24 02:00:00,0.0,10.1,1464055200,...,4.0,8446559662630338889,2016-05-24
rws01_monibas_0010vwa0065ra,2016-05-24 03:00:00,0.0,10.3,1464058800,...,3.0,8446559662630338889,2016-05-24


We can apply a query at this stage to limit the dataset.

## Split into train, test and validation sets

We will split by selecting a day as test and another as validation sets.

In [11]:
from datetime import date, datetime, timedelta

In [12]:
days = data['day'].value_counts()
days

2016-05-24    230472
2016-05-30     27936
2016-05-29     27936
2016-05-28     27936
2016-05-27     27936
2016-05-26     27936
2016-05-25     27936
2016-05-31     26772
2016-05-23      1164
Freq: D, Name: day, dtype: int64

In [13]:
test_day = days.keys()[-2]
validation_day = days.keys()[-3]
print(test_day, validation_day)

2016-05-31 2016-05-25


In [14]:
def filter_for_day(data, day, complement=False):
    datetime_index = data.index.get_level_values('datetime_start')
    datetime_index = pd.DatetimeIndex(datetime_index)

    if complement:
        return data[(datetime_index.year != day.year) | (datetime_index.month != day.month) | (datetime_index.day != day.day)]
    else:
        return data[(datetime_index.year == day.year) & (datetime_index.month == day.month) & (datetime_index.day == day.day)]


In [15]:
test_data = filter_for_day(data, test_day)
validation_data = filter_for_day(data, validation_day)
train_data = filter_for_day(data, test_day, complement=True)  # Exclude the test day...
train_data = filter_for_day(train_data, validation_day, complement=True)  # ... and the validation day.

In [16]:
print(data.shape, train_data.shape, test_data.shape, validation_data.shape)

(426024, 8) (371316, 8) (26772, 8) (27936, 8)


In [17]:
features = ['site_hash', 'timestamp_start', 'precipitation mm/h', 'temperature C', 'windspeed m/s']
targets = ['trafficspeed km/h']#, 'trafficflow counts/h']

Note that `site_hash` is the `mmh3.hash64` of the `site` column (the last component actually):

In [None]:
mmh3.hash64('rws01_monibas_0010vwa0056ra')[-1]

In [None]:
# features.npartitions  # Only for dask dataframes

As you can see we (lazy) loaded the entire dataset. It has been distributed into the above number of partitions.

## Modeling

In [18]:
import numpy as np
import theano
import theano.tensor as T
# theano.config.exception_verbosity = 'high'

Using gpu device 0: GeForce GT 750M (CNMeM is disabled, cuDNN 5005)


In [19]:
import lasagne  # Ignore any errors for now

  "downsample module has been moved to the theano.tensor.signal.pool module.")


In [20]:
theano.config.device

'gpu0'

### Prepare batches

We model one sequence as whole-day measurement of a site. Batches are sets of such sequences of (possibly) various sizes. We expect a periodicity on the day level and try to fit a model to such behaviour.

We generate mini batches with the following function.

In [21]:
from itertools import product
from random import shuffle

def batches(source_df, sites=None, days=None, max_batches=1000, max_batch_length=100):
    if sites is None:
        site_bag = set(source_df.index.get_level_values(0))
        
    if days is None:
        day_bag = set(source_df['day'].unique())
        
    sample_bag = product(sites, days)
#     sample_bag = list(sample_bag)  # Takes too long
#     shuffle(sample_bag)  # Takes too long

    for i in range(max_batches):
        samples = list()
        for j in range(max_batch_length):
            try:
                samples.append(next(sample_bag))
            except StopIteration:
                break
        
        if len(samples) == 0:
#             print("No samples at batch %i" % i)
            raise StopIteration
        
        # Prepare batch
        batch_length = len(samples)
        batch = np.zeros([batch_length, max_seq_length, len(features)], dtype='float64')
        mask = np.zeros([batch_length, max_seq_length], dtype='float64')
        target = np.zeros([batch_length, max_seq_length, len(targets)], dtype='float64')
        
        for j in range(batch_length):
            site, period = samples[j]
            
            # query measurements
            data = source_df.query("site == '%s'" % site)
            data = data[data['day'] == period]
            
            data_f = data[features].values
            data_t = data[targets].values

            seq_length = data.shape[0]
            assert seq_length <= max_seq_length, "Error: sequence longer than `max_seq_length` found"

            batch[j, :seq_length, :] = data_f
            target[j, :seq_length, :] = data_t
            mask[j, :seq_length] = np.ones([seq_length])
            
        yield i, batch, target, mask


Time the batch generation:

In [22]:
site_bag = set(train_data.index.get_level_values(0))

In [23]:
tdt = pd.DatetimeIndex(train_data.index.get_level_values('datetime_start'))
day_bag = set(tdt.to_period(freq='d'))

In [24]:
max_batches = 1000
max_batch_length = 15  # Maximum number os day-long measurement sequence (of one site) per batch

max_seq_length = 24*60 # Maximum number of measurements per site per day

In [None]:
# for batch_num, batch, target, mask in batches(train_data, sites=site_bag, days=day_bag):
#     print(batch_num, batch.size)

### Create the input and target variables

In [25]:
input_var = T.tensor3('input', dtype=theano.config.floatX)

In [26]:
target_values = T.tensor3('target', dtype=theano.config.floatX)
# target_values = T.matrix('target', dtype=theano.config.floatX)

### Define the model

In [27]:
l_in = lasagne.layers.InputLayer(shape=(None, None, 5), input_var=input_var, name='input_layer')

In [28]:
l_mask = lasagne.layers.InputLayer(shape=(None,None), name='mask')

In [None]:
help(lasagne.layers.LSTMLayer)

In [29]:
num_lstm_units = 20
max_grad = 5.0
l_lstm = lasagne.layers.LSTMLayer(l_in, num_units=num_lstm_units,
                                  gradient_steps=-1, grad_clipping=max_grad, unroll_scan=False,
                                  mask_input=l_mask, name='l_lstm_1')

In [None]:
# We want to combine the LSTM with a dense layer and need to reshape the input. We dot this with a `ReshapeLayer`
help(lasagne.layers.ReshapeLayer)

In [30]:
# First, retrieve symbolic variables for the input shape
n_batch, n_time_steps, n_features = l_in.input_var.shape

# Now, squash the n_batch and n_time_steps dimensions
l_reshape_in = lasagne.layers.ReshapeLayer(l_lstm, (-1, num_lstm_units)) 

In [31]:
# Now, we can apply feed-forward layers as usual.
l_dense_1 = lasagne.layers.DenseLayer(l_reshape_in, num_units=20, nonlinearity=lasagne.nonlinearities.tanh, name='l_dense_1')
l_dense_2 = lasagne.layers.DenseLayer(l_dense_1, num_units=1, nonlinearity=lasagne.nonlinearities.tanh, name='l_dense_2')
# Now, the shape will be n_batch*n_timesteps, 1.  We can then reshape to
# n_batch, n_timesteps to get a single value for each timstep from each sequence
l_reshape_out = lasagne.layers.ReshapeLayer(l_dense_2, (n_batch, n_time_steps, 1), name='output_layer')

### Training

In [32]:
# lasagne.layers.get_output produces an expression for the output of the net
network_output = lasagne.layers.get_output(l_reshape_out)
# The value we care about is the final value produced for each sequence
# so we simply slice it out.
predicted_values = network_output#[:, -1]

In [33]:
# Our cost will be mean-squared error
# help(lasagne.objectives.squared_error)
loss = T.mean(lasagne.objectives.squared_error(predicted_values, target_values))
# loss = T.mean((predicted_values - target_values)**2)

In [34]:
# Retrieve all parameters from the network
all_params = lasagne.layers.get_all_params(l_reshape_out)
# all_params

In [35]:
# Compute adam updates for training
help(lasagne.updates.adam)
updates = lasagne.updates.adam(loss, all_params)

Help on function adam in module lasagne.updates:

adam(loss_or_grads, params, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-08)
    Adam updates
    
    Adam updates implemented as in [1]_.
    
    Parameters
    ----------
    loss_or_grads : symbolic expression or list of expressions
        A scalar loss expression, or a list of gradient expressions
    params : list of shared variables
        The variables to generate update expressions for
    learning_rate : float
        Learning rate
    beta_1 : float
        Exponential decay rate for the first moment estimates.
    beta_2 : float
        Exponential decay rate for the second moment estimates.
    epsilon : float
        Constant for numerical stability.
    
    Returns
    -------
    OrderedDict
        A dictionary mapping each parameter to its update expression
    
    Notes
    -----
    The paper [1]_ includes an additional hyperparameter lambda. This is only
    needed to prove convergence of the algorit

Theano functions for training computing cost and inference

In [36]:
train = theano.function([l_in.input_var, target_values, l_mask.input_var], loss, updates=updates)

  ret += x.c_compile_args()


In [37]:
compute_cost = theano.function([l_in.input_var, target_values, l_mask.input_var], loss)

In [38]:
ide = theano.function([target_values], outputs=[target_values])

In [39]:
ff = theano.function([l_in.input_var, l_mask.input_var], outputs=[predicted_values])

Check the shapes

In [None]:
batch_num, batch, target, mask = next(iter(batches(train_data, sites=site_bag, days=day_bag)))

In [None]:
print(batch.shape, target.shape, mask.shape)

In [None]:
o = ff(batch, mask)[0]

In [None]:
o.shape

In [None]:
t = ide(target)[0]

In [None]:
t.shape

In [None]:
train(batch, target, mask)

Perform the training

In [40]:
test_sites = set(test_data.index.get_level_values(0))

In [41]:
max_batches = 1000
max_batch_length = 100  # Maximum number os day-long measurement sequence (of one site) per batch

max_seq_length = 24*60 # Maximum number of measurements per site per day

In [None]:
# We'll train the network with 10 epochs of a maximum of `max_batches` each
num_epochs = 10
for epoch in range(num_epochs):
    print('TRAIN', end=' ')
    for batch_num, batch, target, mask in batches(train_data, sites=site_bag, days=day_bag,
                                                  max_batches=max_batches, max_batch_length=max_batch_length):
        train(batch, target, mask)
        if batch_num % 10 == 0:
            if batch_num % 100 == 0:
                print(batch_num, end='')
            print(".", end='')
    
    print('')

    cost_val = 0.0
    print('TEST', end=' ')
    for batch_num, batch, target, mask in batches(test_data, days=set((test_day,)), sites=test_sites):
        cost_val += compute_cost(batch, target, mask)
        if batch_num % 10 == 0:
            print(batch_num, end='')

    cost_val = cost_val/(batch_num + 1)
    print('')
    
    print("Epoch {} validation cost = {}".format(epoch + 1, cost_val))

TRAIN 0..

### Validation