# Massive scale regression: 
**Warning:** this dataset will occupy 76GB on space your disk. Check that the download location is appropriate for data this size. You will also need about 16GB of memory to run this code. 

The taxi data set consists of 1.21 billion yellow taxi journeys in New York. We obtained the data from http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml

The processing was as follows:
- We extracted the following features: time of day; day of the week; day of the month; month; pick-up latitude and longitude; drop-off latitude and longitude; travel distance; journey time (the target)
- We discarded journeys that are less than 10 s or greater than 5 h, or start/end outside the New York region, which we judge to have squared distance less than $5^o$ from the centre of New York
- As we read in the data we calculated $\sum x$ and $\sum x^2$. These are in the file `taxi_data_stats.p`. We use these for normalizing the data. In the paper we normalise the outputs and restore the scaling, but here we use a mean function and set the variance accordingly. 
- We shuffled the entire data set (we used a machine with 224GB of memory to do this) and then split the data into 101 files each with $10^7$ lines. We use the first 100 chunks for training and final chunk for testing 

To use this data set managably on a standard machine we read in two chunks at a time, the second loading asynchronously as the first chunk is used for training. We have a special `DataHolder` class for this  





In [1]:
import sys
sys.path.append('../src')

import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
%matplotlib inline 

from GPflow.likelihoods import Gaussian
from GPflow.kernels import RBF, White
from GPflow.mean_functions import Constant, Zero
from GPflow.svgp import SVGP
from GPflow.param import DataHolder, Parentable

from scipy.cluster.vq import kmeans2
from get_data import get_taxi_data, get_taxi_stats

from threading import Thread
from Queue import Queue

from dgp import DGP
import time

data_path = '/mnt/' # requires 76GB of free space. Download size is approx 28GB

In [2]:
def wrapper(func, arg, queue):
    queue.put(func(arg))

class TaxiData(DataHolder):
    def __init__(self, minibatch_size=10000):
        Parentable.__init__(self)
        self._shape = [minibatch_size, 10]
        self.minibatch_size = minibatch_size
        self.counter = 0
        self.chunk_counter = 0
        
        self.num_data = int(10**9)
        self.chunk_size = int(10**7)
        self.num_chunks = int(self.num_data/self.chunk_size)

        self.X_mean, self.X_std = get_taxi_stats(data_path=data_path) 

        self.current_chunk = self.get_chunk(0) # get first chunk
        self.chunk_counter += 1
        self.start_get_chunk(self.chunk_counter) # start loading next one
        
    
    def start_get_chunk(self, i):
        self.next_chunk_queued = Queue() 
        Thread(target=wrapper, args=(self.get_chunk, i, 
                                     self.next_chunk_queued)).start()
    
    def get_chunk(self, i):
        return self.whiten_X(get_taxi_data(i, data_path=data_path))
    
    def whiten_X(self, data):
        X = data[:, :-1]
        Xw = (X - self.X_mean)/self.X_std
        return np.concatenate([Xw, data[:, -1, None]], 1)
    
    def _get_type(self):
        return np.float64

    def make_tf_array(self):
        self._tf_array = tf.placeholder(dtype=self._get_type(),
                                        shape=[None, self._shape[1]],
                                        name=self.name)

    @property
    def value(self):
        raise NotImplementedError #can't access this data directly 
        
    @property
    def size(self):
        return np.prod(self.shape)

    @property
    def shape(self):
        return self._shape

    def __str__(self, prepend='Data:'):
        return prepend + \
               '\033[1m' + self.name + '\033[0m' + \
               '\n Data much too large to print!' + \
               '\n First 10 lines of current chunk are: ' + \
                '\n' + str(self.current_chunk[:10, :])
                
    def update_feed_dict(self, key_dict, feed_dict):
        if self.counter + self.minibatch_size > self.chunk_size:
            self.current_chunk = self.next_chunk_queued.get()
            self.chunk_counter = (self.chunk_counter + 1) % self.num_chunks
            self.start_get_chunk(self.chunk_counter)
            self.counter = 0     
       
        start = self.counter
        end = self.counter + self.minibatch_size
        
        self.counter += self.minibatch_size
        
        feed_dict[key_dict[self]] = self.current_chunk[start:end, :]



In [3]:
taxi_data = TaxiData()
test_data = taxi_data.get_chunk(101)
Ns = int(1e6)
Xs, Ys = test_data[:Ns, :-1], test_data[:Ns, -1, None]

We'll use the $10^6$ from the first chunk to find the initial inducing locations.

In [4]:
Z = kmeans2(taxi_data.current_chunk[:int(1e6), :-1], 100, minit='points')[0]

To create a single layer model we need to slightly modify the base SVGP

In [5]:
class MassiveDataSVGP(SVGP):
    def __init__(self, dataholder, kernel, likelihood, Z, q_diag=False, whiten=True, num_latent=1, mean_function=Zero()):
        SVGP.__init__(self, np.zeros((1, 9)), np.zeros((1, 1)), kernel, likelihood, Z, 
                      q_diag=q_diag, whiten=whiten, num_latent=num_latent)
        del self.X
        del self.Y
        self.dataholder = dataholder
        self.num_data = dataholder.num_data
        
    def build_likelihood(self):
        self.X = self.dataholder[:, :-1]
        self.Y = self.dataholder[:, -1, None]        
        return SVGP.build_likelihood(self)


The noise variance on this set is sufficiently large it overflows the transform (we don't trust the behaviour of `tf.sotfplus` for large values). We make a new Gaussian likeliood with no transform. In this situation this should be pretty safe

In [6]:
# from GPflow.likelihoods import Likelihood
# from GPflow.param import Param
# class NoTransformGaussian(Gaussian):
#     def __init__(self):
#         Likelihood.__init__(self)
#         self.variance = Param(1.0)
np.seterr(over='ignore')


{'divide': 'warn', 'invalid': 'warn', 'over': 'warn', 'under': 'ignore'}

In [7]:
Y_mean, Y_std = np.average(taxi_data.current_chunk[:, -1]), np.std(taxi_data.current_chunk[:, -1])

num_val = int(1e5)
X_val, Y_val = taxi_data.current_chunk[:num_val, :-1], taxi_data.current_chunk[:num_val, -1, None]

m_sgp = MassiveDataSVGP(taxi_data, RBF(9, ARD=True, variance=Y_std**2), Gaussian(), Z.copy(), 
                        mean_function=Constant(Y_mean.copy()))
m_sgp.likelihood.variance = 0.1*Y_std**2


We need some tools to assess the model

In [8]:
def batch_assess(model, assess_model, X, Y):
    n_batches = max(int(X.shape[0]/1000.), 1)
    lik, sq_diff = [], []
    for X_batch, Y_batch in zip(np.array_split(X, n_batches), np.array_split(Y, n_batches)):
        l, sq = assess_model(model, X_batch, Y_batch)
        lik.append(l)
        sq_diff.append(sq)
    lik = np.concatenate(lik, 0)
    sq_diff = np.array(np.concatenate(sq_diff, 0), dtype=float)
    return np.average(lik), np.average(sq_diff)**0.5

def assess_single_layer(model, X_batch, Y_batch):
    lik = model.predict_density(X_batch, Y_batch)
    mean, var = model.predict_y(X_batch)
    sq_diff = ((mean - Y_batch)**2)
    return lik, sq_diff 

S = 100
def assess_sampled(model, X_batch, Y_batch):
    lik = model.predict_density(X_batch, Y_batch, S)
    mean_samples, var_samples = model.predict_y(X_batch, S)
    mean = np.average(mean_samples, 0)
    sq_diff = ((mean - Y_batch)**2)
    return lik, sq_diff 

class CB(object):
    def __init__(self, model, assess_model):
        self.model = model
        self.assess_model = assess_model
        self.i = 0
        self.t = time.time()
        self.train_time = 0
        self.ob = []
        self.train_lik = []
        self.train_rmse = []
    def cb(self, x):
        self.i += 1
        if self.i % 10000 == 0:
            # time how long we've be training 
            self.train_time += time.time() - self.t
            self.t = time.time()
            
            # assess the model on the training data
            self.model.set_state(x)
            lik, rmse = batch_assess(self.model, self.assess_model, X_val, Y_val)
            self.train_lik.append(lik)
            self.train_rmse.append(rmse)
            
            # calculate the objective, averaged over S samples 
            ob = 0
            for _ in range(100):
                ob += self.model.compute_log_likelihood()/float(100)
            self.ob.append(ob)
            
            st = 'it: {}, ob: {:.1f}, train lik: {:.4f}, train rmse {:.4f}'
            print st.format(self.i, ob, lik, rmse)

Train the sgp for 1 epoch (which is $10^5$ iterations, since the minibatch size is $10^4$)

In [9]:
num_iterations = int(2e4)
cb_sgp = CB(m_sgp, assess_single_layer)
_ = m_sgp.optimize(tf.train.AdamOptimizer(0.01), 
               maxiter=num_iterations,
               callback=cb_sgp.cb)


it: 10000, ob: -8696774181.2, train lik: -8.3145, train rmse 411.2833
it: 20000, ob: -8614634441.0, train lik: -8.2654, train rmse 406.2074


In [10]:
l, rmse = batch_assess(m_sgp, assess_single_layer, Xs, Ys)
print 'sgp test lik {:.4f}, test rmse {:.4f}. Train time: {:.4f}'.format(l, rmse, cb_sgp.train_time)

sgp test lik -8.2373, test rmse 404.4808. Train time: 305.8784


Similarly, we can modify the DGP class to work with the dataholder 

In [11]:
class MassiveDataDGP(DGP):
    def __init__(self, dataholder, Z, kernels, likelihood, num_latent_Y=1, mean_function=Zero()):
        DGP.__init__(self, np.zeros((1, 9)), np.zeros((1, 1)), Z, kernels, likelihood, 
                     num_latent_Y=num_latent_Y, mean_function=mean_function)
        del self.X
        del self.Y
        self.dataholder = dataholder
        self.num_data = dataholder.num_data
        
    def build_likelihood(self):
        self.X = self.dataholder[:, :-1]
        self.Y = self.dataholder[:, -1, None]        
        return DGP.build_likelihood(self)


Now we train a 2 layer DGP model, with the RBF kernel

In [12]:
kernels_2 = [RBF(9, ARD=True) + White(9, variance=1e-5), RBF(9, ARD=True, variance=Y_std**2)]
m_dgp_2 = MassiveDataDGP(taxi_data, Z.copy(), kernels_2, Gaussian(), mean_function=Constant(Y_mean.copy()))
m_dgp_2.likelihood.variance = 0.1*Y_std**2

In [13]:
cb_dgp_2 = CB(m_dgp_2, assess_sampled)
_ = m_dgp_2.optimize(tf.train.AdamOptimizer(0.01), maxiter=num_iterations, callback=cb_dgp_2.cb)


it: 10000, ob: -7841586373.2, train lik: -7.6574, train rmse 338.2384
it: 20000, ob: -7798820462.1, train lik: -7.6387, train rmse 334.2714


In [None]:
l, rmse = batch_assess(m_dgp_2, assess_sampled, Xs, Ys)
print 'dgp 2 lik {:.4f}, rmse {:.4f}. Train time: {:.4f}'.format(l, rmse, cb_dgp_2.train_time)


dgp 2 lik -7.6225, rmse 333.3533. Train time: 1107.6005


And here's the three layer

In [None]:
kernels_3 = [RBF(9, ARD=True) + White(9, variance=1e-5),
             RBF(9, ARD=True) + White(9, variance=1e-5), 
             RBF(9, ARD=True, variance=Y_std**2)]

m_dgp_3 = MassiveDataDGP(taxi_data, Z.copy(), kernels_3, Gaussian(), mean_function=Constant(Y_mean.copy()))
m_dgp_3.likelihood.variance = 0.01*Y_std**2

t = time.time()
m_dgp_3.optimize(tf.train.AdamOptimizer(0.01), maxiter=num_iterations)
l, rmse = batch_assess(m_dgp_3, assess_sampled, Xs, Ys)
print 'dgp 3 lik {:.4f}, rmse {:.4f}. Train time: {:.4f}'.format(l, rmse, time.time() - t)


In [None]:
print m_dgp_2.likelihood
print m_sgp.likelihood

