# Massive scale regression

**Warning:** this dataset will occupy 76GB on space your disk. Check that the download location is appropriate for data this size. You will also need a machine with about 16GB of memory to run this code. 

The taxi data set consists of 1.21 billion yellow taxi journeys in New York. We obtained the data from http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml

The processing was as follows:
- We extracted the following features: time of day; day of the week; day of the month; month; pick-up latitude and longitude; drop-off latitude and longitude; travel distance; journey time (the target)
- We discarded journeys that are less than 10 s or greater than 5 h, or start/end outside the New York region, which we judge to have squared distance less than $5^o$ from the centre of New York
- As we read in the data we calculated $\sum x$ and $\sum x^2$. These are in the file `taxi_data_stats.p`. We use these for normalizing the data. In the paper we normalise the outputs and restore the scaling, but here we use a mean function and set the variance accordingly. 
- We shuffled the entire data set (we used a machine with 224GB of memory to do this) and then split the data into 101 files each with $10^7$ lines. We use the first 100 chunks for training and final chunk for testing 

To use this data set managably on a standard machine we read in two chunks at a time, the second loading asynchronously as the first chunk is used for training. We have a special `DataHolder` class for this  





In [1]:
import sys
sys.path.append('../src')

import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
%matplotlib inline 

from GPflow.likelihoods import Gaussian
from GPflow.kernels import RBF, White
from GPflow.mean_functions import Constant, Zero
from GPflow.svgp import SVGP
from GPflow.param import DataHolder, Parentable

from scipy.cluster.vq import kmeans2
from scipy.stats import norm
from get_data import get_taxi_data, get_taxi_stats

from threading import Thread
from Queue import Queue

from dgp import DGP
import time

data_path = '/mnt/' # requires 76GB of free space. Download size is approx 28GB

In [2]:
def wrapper(func, arg, queue):
    queue.put(func(arg))

class TaxiData(DataHolder):
    def __init__(self, minibatch_size=10000):
        Parentable.__init__(self)
        self._shape = [minibatch_size, 10]
        self.minibatch_size = minibatch_size
        self.counter = 0
        self.chunk_counter = 0
        
        self.num_data = int(10**9)
        self.chunk_size = int(10**7)
        self.num_chunks = int(self.num_data/self.chunk_size)

        self.X_mean, self.X_std = get_taxi_stats(data_path=data_path) 

        self.current_chunk = self.get_chunk(0) # get first chunk
        self.chunk_counter += 1
        self.start_get_chunk(self.chunk_counter) # start loading next one
        
    
    def start_get_chunk(self, i):
        self.next_chunk_queued = Queue() 
        Thread(target=wrapper, args=(self.get_chunk, i, 
                                     self.next_chunk_queued)).start()
    
    def get_chunk(self, i):
        return self.whiten(get_taxi_data(i, data_path=data_path))
    
    def whiten(self, data):
        return (data - self.X_mean)/self.X_std
    
    def _get_type(self):
        return np.float64

    def make_tf_array(self):
        self._tf_array = tf.placeholder(dtype=self._get_type(),
                                        shape=[None, self._shape[1]],
                                        name=self.name)

    @property
    def value(self):
        raise NotImplementedError #can't access this data directly 
        
    @property
    def size(self):
        return np.prod(self.shape)

    @property
    def shape(self):
        return self._shape

    def __str__(self, prepend='Data:'):
        return prepend + \
               '\033[1m' + self.name + '\033[0m' + \
               '\n Data much too large to print!' + \
               '\n First 10 lines of current chunk are: ' + \
                '\n' + str(self.current_chunk[:10, :])
                
    def update_feed_dict(self, key_dict, feed_dict):
        if self.counter + self.minibatch_size > self.chunk_size:
            self.current_chunk = self.next_chunk_queued.get()
            self.chunk_counter = (self.chunk_counter + 1) % self.num_chunks
            self.start_get_chunk(self.chunk_counter)
            self.counter = 0     
       
        start = self.counter
        end = self.counter + self.minibatch_size
        
        self.counter += self.minibatch_size
        
        feed_dict[key_dict[self]] = self.current_chunk[start:end, :]



We'll use the $10^6$ from the first chunk to find the initial inducing locations.

In [3]:
taxi_data = TaxiData(minibatch_size=10000)
Z = kmeans2(taxi_data.current_chunk[:int(1e6), :-1], 100, minit='points')[0]

Y_std = taxi_data.X_std[:, -1]

test_data = taxi_data.get_chunk(101)

Ns = int(1e6)
Xs, Ys = test_data[:Ns, :-1], test_data[:Ns, -1, None]

num_val = int(1e5) # some validation data (from training set) to see what's going on
X_val, Y_val = taxi_data.current_chunk[:num_val, :-1], taxi_data.current_chunk[:num_val, -1, None]


We need some tools to assess the model

In [4]:
def batch_assess(model, assess_model, X, Y):
    n_batches = max(int(X.shape[0]/1000.), 1)
    lik, sq_diff = [], []
    for X_batch, Y_batch in zip(np.array_split(X, n_batches), np.array_split(Y, n_batches)):
        l, sq = assess_model(model, X_batch, Y_batch)
        lik.append(l)
        sq_diff.append(sq)
    lik = np.concatenate(lik, 0)
    sq_diff = np.array(np.concatenate(sq_diff, 0), dtype=float)
    return np.average(lik), np.average(sq_diff)**0.5

def assess_single_layer(model, X_batch, Y_batch):
    mean, var = model.predict_y(X_batch)
    lik = norm.logpdf(Y_std*Y_batch, loc=Y_std*mean, scale=Y_std*var**0.5)
    sq_diff = ((mean - Y_batch)**2)
    return lik, sq_diff * Y_std**2

S = 100
def assess_sampled(model, X_batch, Y_batch):
    mean_samples, var_samples = model.predict_y(X_batch, S)
    Y = Y_batch[None, :, :] * np.ones(S, None, None)
    lik = np.average(norm.logpdf(Y_std*Y, loc=Y_std*mean_samples, scale=Y_std*var_samples**0.5), 0)
    mean = np.average(mean_samples, 0)
    sq_diff = ((mean - Y_batch)**2)
    return lik, sq_diff * Y_std**2


class CB(object):
    def __init__(self, model, assess_model):
        self.model = model
        self.assess_model = assess_model
        self.i = 0
        self.t = time.time()
        self.train_time = 0
        self.ob = []
        self.train_lik = []
        self.train_rmse = []
    def cb(self, x):
        self.i += 1
        if self.i % 10000 == 0:
            # time how long we've be training 
            self.train_time += time.time() - self.t
            self.t = time.time()
            
            # assess the model on the training data
            self.model.set_state(x)
            lik, rmse = batch_assess(self.model, self.assess_model, X_val, Y_val)
            self.train_lik.append(lik)
            self.train_rmse.append(rmse)
            
            # calculate the objective, averaged over 100 samples 
            ob = 0
            num_samples = 100
            for _ in range(num_samples):
                ob += self.model.compute_log_likelihood()/float(num_samples)
            self.ob.append(ob)
            
            st = 'it: {}, ob: {:.1f}, train lik: {:.4f}, train rmse {:.4f}'
            print st.format(self.i, ob, lik, rmse)

To create a single layer model we need to slightly modify the base SVGP

In [5]:
class MassiveDataSVGP(SVGP):
    def __init__(self, dataholder, kernel, likelihood, Z, 
                 q_diag=False, whiten=True, num_latent=1, mean_function=Zero()):
        D = dataholder.shape[1] - 1
        SVGP.__init__(self, np.zeros((1, D)), np.zeros((1, 1)), kernel, likelihood, Z, 
                      q_diag=q_diag, whiten=whiten, num_latent=num_latent)
        del self.X
        del self.Y
        self.dataholder = dataholder
        self.num_data = dataholder.num_data
    
    def build_likelihood(self):
        self.X = self.dataholder[:, :-1]
        self.Y = self.dataholder[:, -1, None]        
        return SVGP.build_likelihood(self)

class TaxiSVGP(MassiveDataSVGP):
    def __init__(self, data=taxi_data):
        D = data.shape[1] - 1
        MassiveDataSVGP.__init__(self, data, RBF(D, ARD=True), Gaussian(), Z.copy())
        self.likelihood.variance = 0.01
        

Train the sgp for 2 epoch (which is $2\times10^5$ iterations, since the minibatch size is $10^4$)

In [6]:
num_iterations = int(2e5)

def train(model, name, assess, Xs, Ys):
    cb = CB(model, assess)
    model.optimize(tf.train.AdamOptimizer(), # 1e-3 is the default here
                   maxiter=num_iterations,
                   callback=cb.cb)

    l, rmse = batch_assess(model, assess, Xs, Ys)
    print '\n{} test lik {:.4f}, test rmse {:.4f}. Train time: {:.4f}'.format(name, l, rmse, cb.train_time)


In [7]:
svgp= TaxiSVGP(taxi_data)
train(svgp, 'sgp', assess_single_layer,  Xs, Ys)


it: 10000, ob: -1008499961.5, train lik: -7.2885, train rmse 367.3656
it: 20000, ob: -831362087.6, train lik: -7.1444, train rmse 313.0104
it: 30000, ob: -797394924.7, train lik: -7.1372, train rmse 309.7279
it: 40000, ob: -794511397.8, train lik: -7.1330, train rmse 308.1813
it: 50000, ob: -790479894.8, train lik: -7.1304, train rmse 306.9690
it: 60000, ob: -791620609.2, train lik: -7.1316, train rmse 307.4513
it: 70000, ob: -789349648.5, train lik: -7.1306, train rmse 307.0009
it: 80000, ob: -794149819.7, train lik: -7.1322, train rmse 307.2403
it: 90000, ob: -782359641.2, train lik: -7.1275, train rmse 305.6874
it: 100000, ob: -790395807.3, train lik: -7.1255, train rmse 305.0272
it: 110000, ob: -788365500.9, train lik: -7.1284, train rmse 305.9350
it: 120000, ob: -784702584.6, train lik: -7.1249, train rmse 304.8569
it: 130000, ob: -792324020.9, train lik: -7.1257, train rmse 304.9313
it: 140000, ob: -788712499.5, train lik: -7.1229, train rmse 303.8370
it: 150000, ob: -781951105.0

Similarly, we can modify the DGP class to work with the dataholder 

In [8]:
class MassiveDataDGP(DGP):
    def __init__(self, dataholder, Z, kernels, likelihood, num_latent_Y=1, mean_function=Zero()):
        DGP.__init__(self, np.zeros((1, 9)), np.zeros((1, 1)), Z, kernels, likelihood, 
                     num_latent_Y=num_latent_Y, mean_function=mean_function)
        del self.X
        del self.Y
        self.dataholder = dataholder
        self.num_data = dataholder.num_data
        
    def build_likelihood(self):
        self.X = self.dataholder[:, :-1]
        self.Y = self.dataholder[:, -1, None]        
        return DGP.build_likelihood(self)

class TaxiDGP(MassiveDataDGP):
    def __init__(self, L, data=taxi_data):
        D = data.shape[1] - 1
        kernels = []
        for _ in range(L):
            kernels.append(RBF(D, ARD=True))
            
        MassiveDataDGP.__init__(self, data, Z.copy(), kernels, Gaussian())
        self.likelihood.variance = 0.01
        
        for layer in self.layers[:-1]:
            layer.q_sqrt = layer.q_sqrt.value * 1e-5


Now we train a 2 layer DGP model, with the RBF kernel

In [9]:
train(TaxiDGP(2), 'dgp 2', assess_sampled, Xs, Ys)


it: 10000, ob: -831684006.7, train lik: -7.1327, train rmse 308.8983
it: 20000, ob: -789538389.3, train lik: -7.1273, train rmse 303.8887
it: 30000, ob: -775163174.0, train lik: -7.1124, train rmse 299.2709
it: 40000, ob: -757446849.7, train lik: -7.1032, train rmse 296.7112
it: 50000, ob: -762769346.6, train lik: -7.1014, train rmse 296.1190
it: 60000, ob: -746506036.4, train lik: -7.0982, train rmse 294.9962
it: 70000, ob: -755334438.2, train lik: -7.1011, train rmse 295.7932
it: 80000, ob: -739731776.6, train lik: -7.0885, train rmse 291.9556
it: 90000, ob: -745961139.5, train lik: -7.0856, train rmse 290.9572
it: 100000, ob: -746717410.4, train lik: -7.0842, train rmse 290.8403
it: 110000, ob: -732544156.2, train lik: -7.0822, train rmse 290.0791
it: 120000, ob: -725234954.4, train lik: -7.0755, train rmse 288.3537
it: 130000, ob: -723717283.0, train lik: -7.0779, train rmse 288.7962
it: 140000, ob: -736924627.5, train lik: -7.0736, train rmse 287.5269
it: 150000, ob: -728163835.1,

And here's the three layer

In [10]:
train(TaxiDGP(3) , 'dgp 3', assess_sampled, Xs, Ys)


it: 10000, ob: -833053149.1, train lik: -7.1659, train rmse 312.8360
it: 20000, ob: -752739757.8, train lik: -7.1060, train rmse 295.0622
it: 30000, ob: -736369740.7, train lik: -7.0865, train rmse 289.6944
it: 40000, ob: -730033940.3, train lik: -7.0770, train rmse 286.8153
it: 50000, ob: -729822623.7, train lik: -7.0781, train rmse 286.8667
it: 60000, ob: -717538402.5, train lik: -7.0711, train rmse 284.8011
it: 70000, ob: -718024639.9, train lik: -7.0657, train rmse 283.3281
it: 80000, ob: -721011115.5, train lik: -7.0617, train rmse 282.1505
it: 90000, ob: -709860263.0, train lik: -7.0624, train rmse 282.2137
it: 100000, ob: -708479977.7, train lik: -7.0624, train rmse 282.2003
it: 110000, ob: -694890143.9, train lik: -7.0579, train rmse 280.9410
it: 120000, ob: -699679708.3, train lik: -7.0562, train rmse 280.4285
it: 130000, ob: -701069667.8, train lik: -7.0558, train rmse 280.2378
it: 140000, ob: -708757433.3, train lik: -7.0559, train rmse 280.3539
it: 150000, ob: -707762715.6,

and the 4:

In [11]:
train(TaxiDGP(4) , 'dgp 4', assess_sampled, Xs, Ys)


it: 10000, ob: -905088531.7, train lik: -7.2492, train rmse 334.4848
it: 20000, ob: -768081018.5, train lik: -7.1158, train rmse 295.1995
it: 30000, ob: -742491827.8, train lik: -7.0965, train rmse 290.1173
it: 40000, ob: -720274156.0, train lik: -7.0800, train rmse 285.6062
it: 50000, ob: -725725617.7, train lik: -7.0784, train rmse 285.3012
it: 60000, ob: -713218710.4, train lik: -7.0698, train rmse 282.9368
it: 70000, ob: -704333463.9, train lik: -7.0621, train rmse 281.1464
it: 80000, ob: -708119434.7, train lik: -7.0658, train rmse 282.2222
it: 90000, ob: -704155779.7, train lik: -7.0585, train rmse 280.3807
it: 100000, ob: -697184788.7, train lik: -7.0568, train rmse 279.9439
it: 110000, ob: -688436584.1, train lik: -7.0523, train rmse 278.8712
it: 120000, ob: -691448553.3, train lik: -7.0569, train rmse 280.3083
it: 130000, ob: -698351611.3, train lik: -7.0508, train rmse 278.6431
it: 140000, ob: -702594860.1, train lik: -7.0481, train rmse 277.8065
it: 150000, ob: -684664023.1,