In [2]:
from __future__ import absolute_import, division, print_function, unicode_literals

import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow import keras

import time
import numpy as np
import matplotlib.pyplot as plt
import os
import pandas as pd
import json
from collections import defaultdict
import random as rn

from sklearn.model_selection import train_test_split

# 1. Data input pipeline

Data refers to the kaggle competition (Porto seguro Safe Driver Prediction).

The data input pipelines achieves the goals:

1. Re-encode the id for values of each column for the convenience of feteching embeddings. 
2. Transform the format of data from standard csv format to "{feat_id: feat_value}" format, as shown in write_to_csv function.
3. Distinguish categorical columns from numerical columns. Deal with them in different ways.
4. Split training data into two partitions: training data and validation data.


In [13]:
def read_ori_data(root_dir="../data/porto-seguro-safe-driver-prediction", train_fn="train.csv", test_fn="test.csv"):
    train_fn = os.path.join(root_dir, train_fn)
    test_fn = os.path.join(root_dir, test_fn)
    
    train = pd.read_csv(train_fn, index_col='id')
    test = pd.read_csv(test_fn, index_col='id')
    
    train_label = train["target"]
    train.drop("target", axis=1, inplace=True)
    
    return train, train_label, test

In [4]:
def split_cols_by_type(data, threshold):
    '''
    Distinguish the categorical columns from the numerical columns based on the number of unique values.
    '''
    num_cols = []
    cat_cols = []
    for col in data.columns:
        if data[col].unique().shape[0] > threshold:
            num_cols.append(col)
        else:
            cat_cols.append(col)
    return np.array(num_cols), np.array(cat_cols)            

In [5]:
def write_to_csv(fn, data, field_size, feat_size, label=None, label_name=""):   
    '''
    The format is as follows:
    The first line includes two integer. They are respectively field_size andd feat_size
    The following contains data which is in the format of "feat_index: feat_value"
    '''
    with open(fn, 'w') as f:
        f.write("{} {}\n".format(field_size, feat_size))
        label_name_str = " {}".format(label_name) if label_name else ""
        f.write(data.index.name + " " + " ".join(data.columns) + label_name_str + "\n")
        indices = data.index
        for row_idx, row in enumerate(data.values):
            label_val_str = " " + str(label.iloc[row_idx]) if label_name else ""     
            
            line =  str(indices[row_idx]) + " " + " ".join(["{}:{}".format(val[0], val[1]) for val in row]) + label_val_str + "\n"
            f.write(line)
        f.close()

In [6]:
def write_mapping_tabel(fn, data):
    with open(fn, 'w') as f:
        json.dump(data, f)

In [7]:
def encode_data(root_dir="../data/porto-seguro-safe-driver-prediction", train_ds_fn="tmp/train_ds.csv", test_ds_fn="tmp/test_ds.csv"):
    train_ds_fn = os.path.join(root_dir, train_ds_fn)
    test_ds_fn = os.path.join(root_dir, test_ds_fn)
    
    print("reading raw data...")
    train, train_label, test = read_ori_data(root_dir)
    
    train_test = pd.concat([train, test], axis=0)    
    
    field_size = train_test.shape[1]
    feat_size = 0
    featidx2onehot = defaultdict(dict)
    onehot2featidx = defaultdict(dict)

    num_cols, cat_cols = split_cols_by_type(train_test, 100)
    
    print("encoding data...")
    for col_idx, column in enumerate(train_test.columns):    
        start = time.time()
        if column in cat_cols:
            temp = {}
            # create mapping table         
            col = train_test[column].unique()
            col.sort()
            for idx in col:
                temp[idx] = feat_size
                featidx2onehot[column][feat_size] = idx.item()
                onehot2featidx[column][idx.item()] = feat_size
                feat_size += 1        

            # encode data
            train_test[column] = train_test[column].map(lambda x: (temp[x], 1))
        else:
            featidx2onehot[column][feat_size] = 0
            onehot2featidx[column][0] = feat_size
            feat_size += 1

            train_test[column] = train_test[column].map(lambda x: (feat_size-1, x))        

        print("column: {} time: {}".format(column, time.time() - start))    
    
    processed_train = train_test.loc[train.index]
    processed_test = train_test.loc[test.index]
    
    print("Write data to files...")
    write_to_csv(train_ds_fn, processed_train, field_size, feat_size, train_label, "target")
    write_to_csv(test_ds_fn, processed_test, field_size, feat_size)    
    
    featidx2onehot_fn = os.path.join(root_dir, "tmp/featidx_to_onehot.csv")
    onehot2featidx_fn = os.path.join(root_dir, "tmp/onehot_to_featidx.csv")
    
    write_mapping_tabel(featidx2onehot_fn, dict(featidx2onehot))
    write_mapping_tabel(onehot2featidx_fn, dict(onehot2featidx))
    
    print("Done!")

In [8]:
def parse_line(is_training):
    
    def parse(line):
        features = tf.strings.split(line)
        if is_training:
            label = features[-1]
            label = tf.strings.to_number(label, out_type=tf.int32)        
            features = tf.strings.split(features[1:-1], ":")
        else:
            features = tf.strings.split(features[1:], ":")            
        feat_id = features[:, 0:1].values
        feat_values = features[:, 1:2].values
        
        feat_id = tf.strings.to_number(feat_id, out_type=tf.int32)
        feat_values = tf.strings.to_number(feat_values, out_type=tf.float32)        
        
        if is_training:
            res = {"feat_id": feat_id, "feat_values": feat_values, "label": label}
        else:
            res = {"feat_id": feat_id, "feat_values": feat_values}
        return res
    
    return parse

In [9]:
def read_by_dataset(root_dir="../data/porto-seguro-safe-driver-prediction", fn="tmp/train_ds.csv", is_training=True):
    '''
    Reading data by the TextLineDataset datastructure.
    
    is_training: is_training being true denotes that the data contains label column
    '''
    ds_fn = os.path.join(root_dir, fn)
    
    with open(ds_fn, 'r') as f:
        field_size, feat_size = list(map(int, f.readline().strip().split()))
        f.close()
    
    data = tf.data.TextLineDataset(ds_fn).skip(2)    
    data = data.map(parse_line(True), num_parallel_calls=4)
    
    return field_size, feat_size, data

In [10]:
def read_by_pd(root_dir="../data/porto-seguro-safe-driver-prediction", fn="tmp/train_ds.csv", is_training=True):
    '''
    Readd data by pandas
    '''
    ds_fn = os.path.join(root_dir, fn)

    with open(ds_fn, 'r') as f:
        field_size, feat_size = list(map(int, f.readline().strip().split()))
        f.close()    
        
    data = pd.read_csv(ds_fn, skiprows=1, sep=" ", index_col='id')  
#     print(data.head(1))
    if is_training:
        label = data['target']
        data = data.drop(['target'], axis=1)
        res = {"feat_id": data.copy(), "feat_values": data.copy(), "label": label}
        
        res["label"] = tf.convert_to_tensor(res["label"].values, dtype=tf.int32)            
    else:
        res = {"feat_id": data.copy(), "feat_values": data.copy()}

    for col in data.columns:
        res["feat_id"][col] = data[col].map(lambda x: x.split(":")[0])
        res["feat_values"][col] = data[col].map(lambda x: x.split(":")[1])
        
    res["feat_id"] = tf.convert_to_tensor(res["feat_id"].values, dtype=tf.int32)
    res["feat_values"] = tf.convert_to_tensor(res["feat_values"].values, dtype=tf.float32)

    
    return field_size, feat_size, res

In [11]:
def train_test_stratification(root_dir="../data/porto-seguro-safe-driver-prediction", fn="tmp/train_ds.csv", train_fn="tmp/train_train.csv", val_fn="tmp/train_val.csv"):
    input_fn = os.path.join(root_dir, fn)
    train_fn = os.path.join(root_dir, train_fn)
    val_fn = os.path.join(root_dir, val_fn)
    
    with open(input_fn, 'r') as f:
        line = f.readline()
        f.close()
    
    data = pd.read_csv(input_fn, skiprows=1, sep=" ", index_col='id')    
    train, validation = train_test_split(data, test_size=0.1, random_state=200, stratify=data['target'])
    
    def write_to(fn, data, first_lines):
        with open(fn, 'w') as f:
            f.write(first_lines)
            f.close()
            
        data.to_csv(fn, sep=" ", mode='a')
    
    write_to(train_fn, train, line)
    write_to(val_fn, validation, line)    

In [14]:
encode_data()
train_test_stratification()

reading raw data...
encoding data...
column: ps_ind_01 time: 0.8766317367553711
column: ps_ind_02_cat time: 0.746898889541626
column: ps_ind_03 time: 0.719944953918457
column: ps_ind_04_cat time: 0.7144441604614258
column: ps_ind_05_cat time: 0.6973628997802734
column: ps_ind_06_bin time: 0.6936299800872803
column: ps_ind_07_bin time: 0.7055909633636475
column: ps_ind_08_bin time: 0.6214621067047119
column: ps_ind_09_bin time: 1.1251420974731445
column: ps_ind_10_bin time: 0.9410827159881592
column: ps_ind_11_bin time: 0.794482946395874
column: ps_ind_12_bin time: 1.069087028503418
column: ps_ind_13_bin time: 0.8360259532928467
column: ps_ind_14 time: 0.7426760196685791
column: ps_ind_15 time: 0.6626708507537842
column: ps_ind_16_bin time: 0.601754903793335
column: ps_ind_17_bin time: 0.5626480579376221
column: ps_ind_18_bin time: 0.6173157691955566
column: ps_reg_01 time: 0.5989120006561279
column: ps_reg_02 time: 0.5386519432067871
column: ps_reg_03 time: 0.4512488842010498
column: p

Note: The original data is imblanced. The one of solutions is to downsample the majority class. 

In [12]:
def down_sample(root_dir, fn, output_fn):
    input_fn = os.path.join(root_dir, fn)
    output_fn = os.path.join(root_dir, output_fn)
    
    with open(input_fn, 'r') as f:
        line = f.readline()
        f.close()
    
    data = pd.read_csv(input_fn, skiprows=1, sep=" ", index_col='id')    
    
    positive_data = data[data['target'] == 1]
    negative_data = data[data['target'] == 0]
    negative_data = negative_data.sample(n = positive_data.shape[0])
    data = pd.concat([positive_data, negative_data], axis=0)
    
    def write_to(fn, data, first_lines):
        with open(fn, 'w') as f:
            f.write(first_lines)
            f.close()
            
        data.to_csv(fn, sep=" ", mode='a')
    
    write_to(output_fn, data, line)

In [14]:
# down_sample('../data/porto-seguro-safe-driver-prediction', 'tmp/train_train.csv', 'tmp/train_train_balanced.csv')

Note: Training data is read through the TextlineDataset proveded by Tensorflow. It takes the advantages of efficiently processing data and automatically generating batch data. Validation and testing data is read by pandas with the goal of readily programming.

In [15]:
field_size, feat_size, train = read_by_dataset(fn="tmp/train_train_balanced.csv", is_training=True)
_, _, validation = read_by_pd(fn="tmp/train_val.csv", is_training=True)
_, _,  test = read_by_pd(fn="tmp/test_ds.csv", is_training=False)

# 2. Model

In [30]:
class DeepPart(keras.layers.Layer):
    def __init__(self, deep_hidden_units, deep_drop_probs):
        super(DeepPart, self).__init__()
        self.dense_layers = [keras.layers.Dense(hidden_units, activation='relu') for hidden_units in deep_hidden_units]
        self.dropout_layers = [keras.layers.Dropout(dropout_prob) for dropout_prob in deep_drop_probs]
        self.bn_layers = [keras.layers.BatchNormalization() for _ in range(3)]
        
    def call(self, inputs, training):
        outputs = inputs
        for dense_layer, dropout_layer, bn_layer in zip(self.dense_layers, self.dropout_layers, self.bn_layers):
            outputs = dense_layer(outputs)
            outputs = bn_layer(outputs, training=training)
            outputs = dropout_layer(outputs, training=training)
        return outputs

class DeepFM(keras.Model):
    
    def __init__(self, field_size, feat_size, embedding_dim, deep_hidden_units, deep_drop_probs):
        super(DeepFM, self).__init__()
        
        self.field_size = field_size
        self.feat_size = feat_size
        self.embedding_dim = embedding_dim
        
        self.one_order_embed = keras.layers.Embedding(self.feat_size, 1)
        self.embedding = keras.layers.Embedding(self.feat_size, embedding_dim)
        
        self.deep = DeepPart(deep_hidden_units, deep_drop_probs)
        
        self.output_dense = keras.layers.Dense(1)
        
    def call(self, feat_idx, feat_values, training):
        # feat_id: [None, field_size]
        # feat_values: [None, field_size]
        
        # FM one-order part        
        one_order_weights = self.one_order_embed(feat_idx) # [None, field_size, 1]
        one_order_weights = tf.reshape(one_order_weights, shape=(one_order_weights.shape[0], -1)) # [None, field_size]
        weighted_values = tf.multiply(one_order_weights, feat_values) # [None, field_size]
        sum_weighted_values = tf.reduce_sum(weighted_values, axis=1) # [None, ]
        
        # FM second-order part
        embeddings = self.embedding(feat_idx) # [None, field_size, embedding_dim]
        high_dim_feat_values = tf.reshape(feat_values, shape=(feat_values.shape[0], feat_values.shape[1], 1)) # [None, field_size, 1]
        squared_of_sum = tf.square(tf.reduce_sum(tf.multiply(embeddings, high_dim_feat_values), axis=1)) # [None, embedding_dim]
        sum_of_squared = tf.reduce_sum(tf.square(tf.multiply(embeddings, high_dim_feat_values)), axis=1) # [None, embedding_dim]        
        second_order_weighted_valaues = 0.5 * tf.reduce_sum(squared_of_sum - sum_of_squared, axis=1) # [None, ]

        # Deep part
        cat_embeddings = tf.reshape(embeddings, shape=(-1, self.field_size * self.embedding_dim)) # [None, self.field_size * self.embedding_dim]        
        deep_weighted_values = tf.reshape(self.deep(cat_embeddings, training), [-1]) # [None, 1]
        
        out = self.output_dense(tf.stack([sum_weighted_values, second_order_weighted_valaues, deep_weighted_values], axis=1)) # [None, 3] -> [None, 1]
        
        return out
    
class DeepFMWrapper:
    def __init__(self, model):
        self.model = model
    
    def __loss_function(self, real, prediction):
        return tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(real, prediction), axis=0)
        # Deal with imbalanced Data        
        # return tf.reduce_mean(tf.nn.weighted_cross_entropy_with_logits(real, prediction, 10), axis=0)

    def __evaluate(self, real, prediction):
        prediction = tf.nn.sigmoid(prediction)

        metric = keras.metrics.AUC()        
        auc = metric(real, prediction)
        
        round_pred = tf.round(prediction)
        
        metric_recall = keras.metrics.Recall()
        metric_precision = keras.metrics.Precision()
        recall = metric_recall(real, round_pred)
        precision = metric_precision(real, round_pred)
        
        res = {"auc": auc.numpy(), "recall": recall.numpy(), "precision": precision.numpy()}
        return res
    
    def __run_on_single_batch(self, batch_data, is_training=False, optimizer=None):
        batch_label = tf.cast(tf.reshape(batch_data['label'], shape=[-1, 1]), dtype=tf.float32)
        with tf.GradientTape() as tape:
            prediction = self.model(batch_data['feat_id'], batch_data['feat_values'], training=is_training)
            loss = self.__loss_function(batch_label, prediction)
            evaluation = self.__evaluate(batch_label, prediction)        
            
        if is_training:
            gradients = tape.gradient(loss, self.model.trainable_variables)
            optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))            
            
        return loss, evaluation
    
    def fit(self, train, validation, epochs=10, learning_rate=1e-4, batch_size=128, optimizer=None):
        '''
        train: dataset
        validation: dic, {'feature_id': [], 'feature_values': [], 'label': []}
        '''
        start = time.time()
        for epoch in range(epochs):
            for batch, batch_data in enumerate(train.shuffle(50000).batch(batch_size)):
                loss, evaluation = self.__run_on_single_batch(batch_data, is_training=True, optimizer=optimizer)

                if batch % 100 == 0:
                    end = time.time() - start
                    print("Time: {}, Epoch: {}, Batch: {}, training loss: {}, training evaluation: {}".format(end, epoch, batch, loss, evaluation))

            val_loss, val_eval = self.__run_on_single_batch(validation)
            print("Epoch: {}, validation loss: {}, validation evaluation: {}".format(epoch, val_loss, val_eval))    
    
    def predict(self, test):
        prediction = self.model(test['feat_id'], test['feat_values'], training=False)
        return prediction    
    
    def save_model(self, fn):
        self.model.save_weights(fn, save_format='tf')
    
    def load_model(self, fn):
        self.model.load_weights(fn)    

# 3. Training

In [31]:
# If we want to reprodduce the result, the seed must be the same at the begining. Specifically in jupyter,
# both of seed function and the main code should be rerun.
np.random.seed(1000)
rn.seed(30)
tf.random.set_seed(100)

epochs = 20
learning_rate = 1e-4
batch_size = 256

embedding_dim = 50
deep_hidden_units = [200, 100, 1]
deep_drop_probs = [0.5, 0.5, 0.5]

model = DeepFM(field_size, feat_size, embedding_dim, deep_hidden_units, deep_drop_probs)
optimizer = keras.optimizers.Adam(learning_rate=learning_rate)  
model_wrapper = DeepFMWrapper(model)
model_wrapper.fit(train, validation, epochs=epochs, learning_rate=learning_rate, batch_size=batch_size, optimizer=optimizer)

Time: 5.173868894577026, Epoch: 0, Batch: 0, training loss: [0.9974171], training evaluation: {'auc': 0.48882443, 'recall': 0.55725193, 'precision': 0.49324325}
Time: 16.367104053497314, Epoch: 0, Batch: 100, training loss: [0.81736386], training evaluation: {'auc': 0.54886156, 'recall': 0.4651163, 'precision': 0.5555556}
Epoch: 0, validation loss: [0.53288054], validation evaluation: {'auc': 0.56550205, 'recall': 0.20470263, 'precision': 0.049854033}
Time: 35.46135187149048, Epoch: 1, Batch: 0, training loss: [0.78109705], training evaluation: {'auc': 0.5650735, 'recall': 0.45833334, 'precision': 0.52380955}
Time: 48.46153116226196, Epoch: 1, Batch: 100, training loss: [0.7517545], training evaluation: {'auc': 0.53317446, 'recall': 0.5, 'precision': 0.48818898}
Epoch: 1, validation loss: [0.5812465], validation evaluation: {'auc': 0.575358, 'recall': 0.23928078, 'precision': 0.049736463}
Time: 66.0647759437561, Epoch: 2, Batch: 0, training loss: [0.73345333], training evaluation: {'au

Time: 506.766236782074, Epoch: 18, Batch: 100, training loss: [0.6817192], training evaluation: {'auc': 0.61473346, 'recall': 0.5620438, 'precision': 0.6581197}
Epoch: 18, validation loss: [0.69145375], validation evaluation: {'auc': 0.62234527, 'recall': 0.5578608, 'precision': 0.05250141}
Time: 521.9950368404388, Epoch: 19, Batch: 0, training loss: [0.65253425], training evaluation: {'auc': 0.6545088, 'recall': 0.62903225, 'precision': 0.58208954}
Time: 534.2088561058044, Epoch: 19, Batch: 100, training loss: [0.66510916], training evaluation: {'auc': 0.63361436, 'recall': 0.5645161, 'precision': 0.5882353}
Epoch: 19, validation loss: [0.6243641], validation evaluation: {'auc': 0.6197378, 'recall': 0.4472107, 'precision': 0.056874815}


In [32]:
test_prediction = model_wrapper.predict(test)

# 4. Model Saving and loading

As mentioned in the official document of Tensorflow, the subclassed model cannot be saved in the way of the entire model, since it's the piece of code written in the call method. Instead, keras functional API see the model as a kind of data structure.

In [33]:
model_fn = '../models/deepFM/deepfm'
model_wrapper.save_model(model_fn)

In [34]:
# new_model = DeepFM(field_size, feat_size, embedding_dim, deep_hidden_units, deep_drop_probs)
# new_model_wrapper = DeepFMWrapper(model)
# new_model_wrapper.load_model(model_fn)

In [35]:
# new_prediction = new_model_wrapper.predict(test)

In [36]:
# new_prediction[:10]

# 5. Submission

Kaggle Competition: https://www.kaggle.com/c/porto-seguro-safe-driver-prediction/leaderboard#score

In [37]:
fn = "../data/porto-seguro-safe-driver-prediction/sample_submission.csv"
submission = pd.read_csv(fn)
submission['target'] = tf.sigmoid(test_prediction).numpy()

In [38]:
out_fn = "../data/porto-seguro-safe-driver-prediction/submission.csv"
submission.to_csv(out_fn, index=False)