# FFM Model

## Imports

In [1]:
import random
import pandas as pd
import numpy as np
from hashlib import sha256

from pyspark.sql import types
from pyspark.sql import SQLContext
from pyspark.sql.functions import isnan
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

import time
import numpy as np
import matplotlib.pyplot as plt

# set the seed
np.random.seed(1)

In [2]:
import pyspark
# sc = pyspark.SparkContext()
train_parquet = pyspark.read.parquet("data/mediumTrain.parquet")

AttributeError: module 'pyspark' has no attribute 'read'

In [3]:
from pyspark.sql import SparkSession

app_name = "final_project_notebook"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

In [4]:
PWD = !pwd
PWD = PWD[0]

## Generate a small sample to work with

In [5]:
%%writefile sample.txt
1	10	ESPN	Nike
1	15	ESPN	Nike
0	2	ESPN	Gucci
1	10	ESPN	Adidas
1	10	ESPN	Adidas
0	3	Vogue	Nike
1	20	Vogue	Gucci
0	5	Vogue	Adidas
1	50	NBC	Nike
0	0	NBC	Gucci
0	4	NBC	Adidas
0	4	NBC	Adidas

Overwriting sample.txt


In [6]:
sample_RDD = sc.textFile('sample.txt')
split_RDD = sample_RDD.map(lambda line: line.split('\t')).cache()

In [7]:
sample_df = split_RDD.toDF()
sample_df.show()

+---+---+-----+------+
| _1| _2|   _3|    _4|
+---+---+-----+------+
|  1| 10| ESPN|  Nike|
|  1| 15| ESPN|  Nike|
|  0|  2| ESPN| Gucci|
|  1| 10| ESPN|Adidas|
|  1| 10| ESPN|Adidas|
|  0|  3|Vogue|  Nike|
|  1| 20|Vogue| Gucci|
|  0|  5|Vogue|Adidas|
|  1| 50|  NBC|  Nike|
|  0|  0|  NBC| Gucci|
|  0|  4|  NBC|Adidas|
|  0|  4|  NBC|Adidas|
+---+---+-----+------+



# Load Dataset

## 1. Only run this if you haven't generated a train.parquet file

In [23]:
train_data = spark.read.csv(f"{PWD}/data/train.txt", sep="\t")
train_data.write.format("parquet").save(f"{PWD}/data/train.parquet")

AnalysisException: 'path file:/media/notebooks/f19-final-project-f19-team-15/data/train.parquet already exists.;'

## 2. Adjust this value to match the desired level of data to work with

In [8]:
# select which data to load:
# 1->sample.parquet
# 2->smallTrain.parquet
# 3->mediumTrain.parquet
# 4->train.parquet (full dataset)

DATA_TO_LOAD = 3

In [9]:
if DATA_TO_LOAD == 1:
    train_parquet = spark.read.parquet(f"{PWD}/data/sample.parquet")
    cate_field_start = 2
    cate_field_end = 4
else:
    if DATA_TO_LOAD == 2:
        train_parquet = spark.read.parquet(f"{PWD}/data/smallTrain.parquet")
    elif DATA_TO_LOAD == 3:
        train_parquet = spark.read.parquet(f"{PWD}/data/mediumTrain.parquet")
    else:
#         train_parquet = spark.read.parquet(f"{PWD}/data/train.parquet")
        train_parquet = spark.read.parquet(f"{PWD}/data/train.parquet")
    cate_field_start = 14
    cate_field_end = 40

In [10]:
#rename files and recast integer types on the numeric features

oldColNames = train_parquet.schema.names

train_parquet = train_parquet.withColumn("label", train_parquet["_c0"])
for colNum in range(1,cate_field_start): 
    colName = "_c" + str(colNum)
    train_parquet = train_parquet.withColumn("int_feature_"+ str(colNum), train_parquet[colName].cast(types.IntegerType()))
for colNum in range(cate_field_start,cate_field_end): 
    colName = "_c" + str(colNum)
    train_parquet = train_parquet.withColumn("cate_feature_"+ str(colNum-cate_field_start+1), train_parquet[colName])

#drop the old columns
adjusted_labels_train_parquet = train_parquet.drop(*oldColNames)

In [11]:
intFieldNames = [colName for colName, dType in adjusted_labels_train_parquet.dtypes if dType == 'int']
cateFieldNames = [colName for colName, dType in adjusted_labels_train_parquet.dtypes if dType == 'string' and colName != 'label']

In [12]:
adjusted_labels_train_parquet.show(1)

+-----+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+
|label|int_feature_1|int_feature_2|int_feature_3|int_feature_4|int_feature_5|int_feature_6|int_feature_7|int_feature_8|int_feature_9|int_feature_10|int_feature_11|int_feature_12|int_feature_13|cate_feature_1|cate_feature_2|cate_feature_3|cate_feature_4|cate_feature_5|cate_feature_6|cate_feature_7|cate_feature_8|cate_feature_9|cate_feature_10|cate_feature_11|cate_feature_12|cate_feature_13|cate_fe

# Feature Engineering
## Categorical Variables

In [13]:
threshold = 10

train_parquet_MD = adjusted_labels_train_parquet

for col in cateFieldNames:
    valuesToKeep = adjusted_labels_train_parquet.groupBy(col).count().filter(f"count >= {threshold}").select(col)
    valuesToKeep = valuesToKeep.withColumn("_"+col, adjusted_labels_train_parquet[col])
    valuesToKeep = valuesToKeep.drop(col)

    train_parquet_MD = train_parquet_MD.join(F.broadcast(valuesToKeep), train_parquet_MD[col] == valuesToKeep["_"+col], 'leftouter')
    train_parquet_MD = train_parquet_MD.withColumn(col, F.when(F.col("_"+col).isNull(), "***").otherwise(F.col("_"+col)))
    train_parquet_MD = train_parquet_MD.drop("_"+col)

In [14]:
# view data after the replacement
start = time.time()
train_parquet_reduced_dimensions = train_parquet_MD
train_parquet_reduced_dimensions.show(5)
print(f'categorical columns processed in {time.time() - start} seconds.')

+-----+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+
|label|int_feature_1|int_feature_2|int_feature_3|int_feature_4|int_feature_5|int_feature_6|int_feature_7|int_feature_8|int_feature_9|int_feature_10|int_feature_11|int_feature_12|int_feature_13|cate_feature_1|cate_feature_2|cate_feature_3|cate_feature_4|cate_feature_5|cate_feature_6|cate_feature_7|cate_feature_8|cate_feature_9|cate_feature_10|cate_feature_11|cate_feature_12|cate_feature_13|cate_fe

## Numeric Variables

In [15]:
for col in intFieldNames:
    train_parquet_reduced_dimensions = train_parquet_reduced_dimensions.withColumn(col, F.floor(F.log(F.col(col) + 1)))

In [16]:
start = time.time()
train_parquet_reduced_dimensions.show()
print(f'... completed job in {time.time() - start} seconds.')

+-----+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+
|label|int_feature_1|int_feature_2|int_feature_3|int_feature_4|int_feature_5|int_feature_6|int_feature_7|int_feature_8|int_feature_9|int_feature_10|int_feature_11|int_feature_12|int_feature_13|cate_feature_1|cate_feature_2|cate_feature_3|cate_feature_4|cate_feature_5|cate_feature_6|cate_feature_7|cate_feature_8|cate_feature_9|cate_feature_10|cate_feature_11|cate_feature_12|cate_feature_13|cate_fe

## Feature Hashing

In [17]:
n_features = 10000
n_fields = len(intFieldNames) + len(cateFieldNames)

In [18]:
from pyspark.ml.feature import FeatureHasher
hasher = FeatureHasher()
hasher.setCategoricalCols(intFieldNames)
hasher.setNumFeatures(n_features)

for col in intFieldNames + cateFieldNames:
    hasher.setInputCols([col])
    hasher.setOutputCol(col+"_hashed")
    train_parquet_reduced_dimensions = hasher.transform(train_parquet_reduced_dimensions)

In [19]:
start = time.time()
train_parquet_reduced_dimensions.show(2)
print(f'... completed job in {time.time() - start} seconds.')

+-----+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+-------------

In [20]:
hashed_columns = train_parquet_reduced_dimensions.schema.names[-n_fields:]

## Adjust the dataframe to only contain the hashed value

In [21]:
def parse_sparse_vectors(vector, field_ind):
    if vector.indices.size > 0:
        return int(vector.indices[0])
    else:
        return None

vector_parser = F.udf(parse_sparse_vectors, types.IntegerType())

In [22]:
train_parquet_hashed = train_parquet_reduced_dimensions
for field_ind, col in enumerate(hashed_columns):
    
    train_parquet_hashed = train_parquet_hashed.withColumn(col, vector_parser(col, F.lit(field_ind)))

train_parquet_hashed = train_parquet_hashed.drop(*(intFieldNames + cateFieldNames))

In [23]:
start = time.time()
train_parquet_hashed.show(1)
print(f'... completed job in {time.time() - start} seconds.')

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|label|int_feature_1_hashed|int_feature_2_hashed|int_feature_3_hashed|int_feature_4_hashed|int_feature_5_hashed|int_feature_6_

In [24]:
# Change labels to be -1 and 1
train_parquet_hashed = train_parquet_hashed.withColumn("label", F.when(F.col("label") == 0, -1).otherwise(F.col("label")))

## Model

### FFM
Mathematically, FMM can be expressed as:

$ \phi_{FFM}(w, x) = \sum\limits^{n}_{j_1=1} \sum\limits^{n}_{j_2=j_1+1}(w_{j_1} \cdot w_{j_2})x_{j_1}x_{j_2}$

However, in the models considered, either all variables are categorical or all integer values are binned effectively making them categorical and the $x_{j_1}$ and $x_{j_2}$ are both equal to 1. This reduces the formula to:

$ \phi_{FFM}(w, x) = \sum\limits^{n}_{j_1=1} \sum\limits^{n}_{j_2=j_1+1}(w_{j_1} \cdot w_{j_2})$

The optimization function considered for this model is log loss with regularization and the following formula is to be minimized.

$\underset{w}{min}$   $\dfrac{\lambda}{2}||w||_2^2 + \sum\limits^{m}_{i=1}log(1 + exp(-y_i\phi_{FFM}(w,x_i)))$

Currently, a closed-form solution for minimizing log loss is not known and therefore gradient descent is applied. The gradients for $\phi_{FFM}(w, x)$ are:

$g_{j_1,f_2} = \triangledown_{w_{j_1,f_2}} f(w) = \lambda \cdot w_{j_1,f_2} + \kappa \cdot w_{j_2,f_1}$

$g_{j_2,f_1} = \triangledown_{w_{j_2,f_1}} f(w) = \lambda \cdot w_{j_2,f_1} + \kappa \cdot w_{j_1,f_2}$

where,

$\kappa = \dfrac{\partial log(1 + exp(-y\phi_{FFM}(w,x)))}{\partial \phi_{FFM}(w, x)} = \dfrac{-y}{1 + exp(y\phi_{FFM}(w,x))}$

Initially we define two helper function for $\phi_{FFM}$ and $\kappa$

In [26]:
def phi(x, W):
    total = 0
    for i in range(len(x) - 1):
        if not x[i]:
            continue
            
        for j in range(i + 1, len(x)):
            if x[j]:
                total += np.dot(W[x[i], j, :], W[x[j], i, :])
                            
    return total

def kappa(y, features, W):
    return -int(y)/(1 + np.exp(y*phi(features, W)))

## Evaluation
In addition to these helper functions, an associated cost function to minimize and evaluate the performance of the model during training and validation was required. As noted above, the evaluation method used is the log loss. A helper function is created to evaluate model performance.

In [27]:
def log_loss(dataRDD, W):
    return dataRDD.map(lambda x: np.log(1 + np.exp(-int(x[0]) * phi(x[1:], W)))).mean()

## Implementation 1
The first implementation of the algorithm applies batch gradient descent to the model and the algorithm takes the following form for $t$ epochs:

1. Map over instance, calculate all subgradients and emit the form ((feature, field) $g_{j_1,f_2}$) and ((feature, field) $g_{j_2,f_1}$) for all field pair combinations
2. Reduce by the key to summing values for each key and number of occurances and incrementing a count variable so an average can be calculated. Emit ((feature, field), gradient, count).
3. Map over each value, diving the gradient by the count to find an average gradient step.
4. Create a tensor of zeroes $R^{n x f x k}$ where $n$ is the number of features, $f$ is the number of fields and $k$ is the size of each latent vector.
5. Add each latent vector gradient step to the tensor of zeroes
6. Subtract the gradient step multiplied by a learning rate from the original model: $W - eta * gradient$ where W is the original model.

In [28]:
train_parquet_hashed = train_parquet_hashed.cache()

The model is first split into train and test for later evaluation before the training set is split into 100 mini batches.

In [29]:
# Test Train Split
train_df, test_df = train_parquet_hashed.randomSplit([0.8, 0.2])
batches = train_df.randomSplit([0.01] * 100)

In [30]:
# Initialize model parameters
k = 2
n_features = 10000
n_fields = 39
eta = 0.3
reg_c = 0.1
sc.broadcast(k)
sc.broadcast(n_features)
sc.broadcast(n_fields)
sc.broadcast(reg_c)
sc.broadcast(eta)

<pyspark.broadcast.Broadcast at 0x7fcda50495f8>

In [31]:
def gradient(x, W):
    y = int(x[0])
    features = x[1:]
    kap = kappa(y, features, W)
    
    for i in range(len(features) - 1):
        if not features[i]:
            continue
            
        for j in range(i+1, len(features)):
            if features[j]:
                yield ((features[i], j), (kap * W[int(features[j]), i, :], 1))
                yield ((features[j], i), (kap * W[int(features[i]), j, :], 1))
            
    

In [32]:
def log_loss(dataRDD, W):
    return dataRDD.map(lambda x: np.log(1 + np.exp(-int(x[0]) * phi(x[1:], W)))).mean()

def gd_update(dataRDD, W):
    grad = dataRDD.flatMap(lambda x: gradient(x, W))\
                .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))\
                .map(lambda x: ((x[0][0], x[0][1]), x[1][0] / x[1][1]))\
                .collect()
    
    grad_update = np.zeros(shape=(n_features, n_fields, k))
    
    for indices, vector in grad:
        feature_index = indices[0]
        field_index = indices[1]
        
        grad_update[int(feature_index), field_index, :] += vector
    
    new_model = W - eta * grad_update
    
    return new_model

In [33]:
def gradient_descent(split_data, w_init, n_steps = 10):
    
    model = sc.broadcast(w_init)

    start = time.time()
    for i in range(n_steps):
        train_rdd = split_data[i].rdd
        print("----------")
        print(f"STEP: {i+1}")
        new_model = gd_update(train_rdd, model.value)
        model = sc.broadcast(new_model)
        train_loss = log_loss(train_rdd, model.value)
        print(f"Training Loss: {train_loss}")
        test_loss = log_loss(split_data[i + 50].rdd, model.value)
        print(f"Test Loss: {test_loss}")
    print(f"\n... trained {n_steps} iterations in {time.time() - start} seconds")

np.random.seed(1)
w_init = np.random.uniform(0, 1/np.sqrt(k), size=(n_features, n_fields, k))
gradient_descent(batches, w_init)

----------
STEP: 1
Training Loss: 62.73936057037036
Test Loss: 60.10183410156437
----------
STEP: 2
Training Loss: 22.328150602460546
Test Loss: 36.45676808231048
----------
STEP: 3
Training Loss: 4.307155265608798
Test Loss: 13.901616519936255
----------
STEP: 4
Training Loss: 0.8664094322811013
Test Loss: 3.973589613149072
----------
STEP: 5
Training Loss: 1.475896230765925
Test Loss: 4.674988641481719
----------
STEP: 6
Training Loss: 2.440370214571568
Test Loss: 2.0451115558223574
----------
STEP: 7
Training Loss: 0.882557069689214
Test Loss: 5.096213680944408
----------
STEP: 8
Training Loss: 0.018436663839698046
Test Loss: 3.3981814430053627
----------
STEP: 9
Training Loss: 1.6968265521501424
Test Loss: 2.6162523317611956
----------
STEP: 10
Training Loss: 0.24230267188769136
Test Loss: 3.4301517522897385

... trained 10 iterations in 31.028324842453003 seconds


## Implementation 1 Evaluation
As can be seen from the outputs, the gradient steps are performing correctly with the log loss improving on each iteration. In this case the training occurs over the first 10 batches and testing is performed on batches 50-59. The training loss unsurprisingly converges a lot more than the test loss due to the presence of the most recent gradient step being performed on the training data that was just tested. A suggestion would be to add regularization, which we tried, but it didn't improve performance all that much, which is similar to the conclusions noted in the paper this model is based off of. It should be noted that the losses shown above are for a locally run model and performance here is particularly bad because there are as many as 200,000 unique features and only 10000 features are used for the hashing. Moreover, the number of latent vectors is set to 2, which is fairly low.

The example above is on a 5000 instance training set, however, the full dataset was evaluated similarly broken into 100 equal blocks. However, training on just one of the blocks (450k instances) took close to 3 hours with the following computational set up:
 - 1 n1-standard driver 
 - 6 n1-standard workers
     - 8 cores each
     - 30GB of RAM

That said, it was calculating logloss that took close to an hour each time. It was determined that our implementation was too computational inefficient to continue to attempt on the full dataset and we decided to pivot.

<img src="image (3).png">


## Implementation 2
A second implementation was devised that would split the data into separate partitions, performing stochastic gradient descent locally before collecting and summing all gradients at the driver. The hope here was that having the model coefficients locally would enable SGD, but also reduce the amount of emitting done by the mappers, improving performance of the model.

In [34]:
k = 2
n_features = 10000
n_fields = 39
eta = 0.3
reg_c = 0.1
sc.broadcast(k)
sc.broadcast(n_features)
sc.broadcast(n_fields)
sc.broadcast(reg_c)
sc.broadcast(eta)

def update_gradient(x, w_old, w_new, G, reg_c = 0.1):
    """Update the gradient of a FFM
    
    Args:
        x: the input row from the RDD, assume that it has been hashed and that 
            the first entry is the label the update takes values from w_old and 
            updates w_new. The format of x is [label, h_1, ..., h_F]
            where h_i is the hashed value of the field i and F is the total number of fields.
        w_old: the old value of the coefficient matrix (this is usually a broadcast variable
            from the driver)
        w_new: the new coefficient matrix, this will be mutated/updated within this function
        n_features: number of features
        n_fiels: number of fields
        reg_c: regularization parameter
    
    Implements steps from Algorithm 1 (lines 5 - 11) in paper 
    https://www.csie.ntu.edu.tw/~cjlin/papers/ffm.pdf
    """
    eta = 0.3
    y = x[0]
    features = x[1:]
    # Keep a mapping from the hashed value back to the field it came from
    w_v = w_old.value
#     D = {h:i for i, h in enumerate(H)}
    kappa = -int(y)/(1 + np.exp(int(y)*phi(features, w_v)))
    # Iterate through the features and update the gradient, use D for lookups
    for f1 in range(len(features) - 1):
        j1 = features[f1]
        if not j1:
            continue
            
        for f2 in range(1, len(features)):
            j2 = features[f2]
            if not j2:
                continue         
            
            g1 = kappa*w_v[j2,f1,:] + reg_c*w_v[j1, f2, :]
            g2 = kappa*w_v[j1,f2,:] + reg_c*w_v[j2, f1, :]
#             G[j1,f2, :] += g1**2
#             G[j2,f1, :] += g2**2
            w_new[j1,f2,:] = w_v[j1,f2,:] - eta * g1 # eta*g1/np.sqrt(G[j1,f2,:])
            w_new[j2,f1,:] = w_v[j2,f1,:] - eta * g2 # eta*g2/np.sqrt(G[j2,f1,:])

def apply_sgd_step(rdd, W_old, n_features, n_fields, k, reg_c):
    """
    Implements the sampling part of Algorithm 1 
    https://www.csie.ntu.edu.tw/~cjlin/papers/ffm.pdf, lines 3 -- 4
    
    Args:
        rdd: The RDD on each partition
        W_old: the old value of the coefficient matrix (this is usually a broadcast variable
            from the driver)
        n_features: number of features
        n_fiels: number of fields
        reg_c: regularization parameter
    """
    
    G = np.ones((n_features, n_fields, k))
    W_new = np.zeros((n_features, n_fields, k))
    for x in rdd: 
        #= rdd.sample(False, 0.1)
        if np.random.uniform() < 0.1:
            update_gradient(x, w_old=W_old, w_new=W_new, G = G, reg_c = reg_c)
    yield W_new
    
def run_ffm_train(rdd_hashed, n_features, n_fields, k, n_iters=10, learning_rate=0.1, reg_c=1):
    """Runs the FFM algorithm and returns the update coefficient matrix. 
    
    
    Implements the SGD parallelization in Algorithm 1 
    https://www.csie.ntu.edu.tw/~cjlin/papers/ffm.pdf, lines 1 - 2
    
    Args:
        rdd_hashed: An RDD of hashed values in the format where each row is of the form
            [label, feature_hash_1, feature_hash_2, ..., feature_hash_n]
        n_features: The number of features, for example 10**6
        n_fields: The number of fields, e.g. 39 for Criteo dataset
        k: dimension of the latent vector
        n_iters: number of iterations of gradient descent
        reg_c: regulization parameter 
        
    Returns:
        The returned matrix is of the type W[j,f,k] where the first index is the feature, 
        the second is the field and the third is the latent component 
        of the vector. So if there are 1,000,000 features, 39 fields and 10 latent vectors 
        the return value is a 1,000,000 x 39 x 10 tensor

    """
    W_old = sc.broadcast(np.random.uniform(0, 1/np.sqrt(k), size=(n_features, n_fields, k)))
    for i in range(n_iters):
        W_new = rdd_hashed.mapPartitions(lambda rdd, w_old=W_old, n_features=n_features, n_fields=n_fields, k=k, reg_c = reg_c: apply_sgd_step(rdd, W_old, n_features, n_fields, k, reg_c))\
        .sum()
        train_loss = log_loss(rdd_hashed, W_new)
        print(f"Training Loss: {train_loss}")
        W_old = sc.broadcast(W_new)
    return W_new

In [35]:
run_ffm_train(train_df.rdd , 10000, 39, 2, n_iters=10)

Training Loss: 51.371761288573545
Training Loss: 23.607517955943024
Training Loss: 10.84718644190816
Training Loss: 5.006273222880837
Training Loss: 2.3819632135011095
Training Loss: 1.3357208715515054
Training Loss: 0.973598711605685
Training Loss: 0.8419300960457365
Training Loss: 0.7821930373896475
Training Loss: 0.7496892202336619


array([[[0., 0.],
        [0., 0.],
        [0., 0.],
        ...,
        [0., 0.],
        [0., 0.],
        [0., 0.]],

       [[0., 0.],
        [0., 0.],
        [0., 0.],
        ...,
        [0., 0.],
        [0., 0.],
        [0., 0.]],

       [[0., 0.],
        [0., 0.],
        [0., 0.],
        ...,
        [0., 0.],
        [0., 0.],
        [0., 0.]],

       ...,

       [[0., 0.],
        [0., 0.],
        [0., 0.],
        ...,
        [0., 0.],
        [0., 0.],
        [0., 0.]],

       [[0., 0.],
        [0., 0.],
        [0., 0.],
        ...,
        [0., 0.],
        [0., 0.],
        [0., 0.]],

       [[0., 0.],
        [0., 0.],
        [0., 0.],
        ...,
        [0., 0.],
        [0., 0.],
        [0., 0.]]])

## Implementation 2 Evaluation
The second implementation struggled with similar constraints as the first in that it simply wasn't performany enough. Even on much smaller subsets of the full dataset (1%) it still took close to an hour when run on GCP.

## Future Work

Beyond the two main implementations above, the team read some other literature to better understand how to boost performance of the model. First, the libFFM implementation that won the kaggle competition was compiled and run on a 1% subset of the dataset, which it was able to complete in some minutes. This 

Tried to get it compiled in the cloud and create a python wrapper to call the C++ library, but time constraints and complexity limited this solutions. 

After noting that performance of numpy arrays and pandas are close to that of C++. Another possible solution, yet to be attempted, is to install pyarrows and enable arrows in the spark environment. This allows writing of pandas UDFs, which allows for vectorization of method across all rows at once that can greatly improve performance. That said, the dataset is too large to hold in memory requiring the data to be chunked into smaller pieces that are run on their own partition.

# Anu's implementation

In [338]:
# def phi(W, H):  
#     """Calculate the factorization function $\phi_{FMM}"""
#     H = sorted(H)
#     D = {h:i for i, h in enumerate(H)}
#     return sum(np.dot(W[j,D[l],:], W[l,D[j],:]) for j in H for l in H if l >= j+1)

k = 2
n_features = 10000
n_fields = 39
eta = 0.3
reg_c = 0.1
sc.broadcast(k)
sc.broadcast(n_features)
sc.broadcast(n_fields)
sc.broadcast(reg_c)
sc.broadcast(eta)

def update_gradient(x, w_old, w_new, G, reg_c = 0.1):
    """Update the gradient of a FFM
    
    Args:
        x: the input row from the RDD, assume that it has been hashed and that 
            the first entry is the label the update takes values from w_old and 
            updates w_new. The format of x is [label, h_1, ..., h_F]
            where h_i is the hashed value of the field i and F is the total number of fields.
        w_old: the old value of the coefficient matrix (this is usually a broadcast variable
            from the driver)
        w_new: the new coefficient matrix, this will be mutated/updated within this function
        n_features: number of features
        n_fiels: number of fields
        reg_c: regularization parameter
    
    Implements steps from Algorithm 1 (lines 5 - 11) in paper 
    https://www.csie.ntu.edu.tw/~cjlin/papers/ffm.pdf
    """
    eta = 0.3
    y = x[0]
    features = x[1:]
    # Keep a mapping from the hashed value back to the field it came from
    w_v = w_old.value
#     D = {h:i for i, h in enumerate(H)}
    kappa = -int(y)/(1 + np.exp(int(y)*phi(features, w_v)))
    # Iterate through the features and update the gradient, use D for lookups
    for f1 in range(len(features) - 1):
        j1 = features[f1]
        if not j1:
            continue
            
        for f2 in range(1, len(features)):
            j2 = features[f2]
            if not j2:
                continue         
            
            g1 = kappa*w_v[j2,f1,:] + reg_c*w_v[j1, f2, :]
            g2 = kappa*w_v[j1,f2,:] + reg_c*w_v[j2, f1, :]
#             G[j1,f2, :] += g1**2
#             G[j2,f1, :] += g2**2
            w_new[j1,f2,:] = w_v[j1,f2,:] - eta * g1 # eta*g1/np.sqrt(G[j1,f2,:])
            w_new[j2,f1,:] = w_v[j2,f1,:] - eta * g2 # eta*g2/np.sqrt(G[j2,f1,:])

def apply_sgd_step(rdd, W_old, n_features, n_fields, k, reg_c):
    """
    Implements the sampling part of Algorithm 1 
    https://www.csie.ntu.edu.tw/~cjlin/papers/ffm.pdf, lines 3 -- 4
    
    Args:
        rdd: The RDD on each partition
        W_old: the old value of the coefficient matrix (this is usually a broadcast variable
            from the driver)
        n_features: number of features
        n_fiels: number of fields
        reg_c: regularization parameter
    """
    
    G = np.ones((n_features, n_fields, k))
    W_new = np.zeros((n_features, n_fields, k))
    for x in rdd: 
        #= rdd.sample(False, 0.1)
        if np.random.uniform() < 0.1:
            update_gradient(x, w_old=W_old, w_new=W_new, G = G, reg_c = reg_c)
    yield W_new
    
def run_ffm_train(rdd_hashed, n_features, n_fields, k, n_iters=10, learning_rate=0.1, reg_c=1):
    """Runs the FFM algorithm and returns the update coefficient matrix. 
    
    
    Implements the SGD parallelization in Algorithm 1 
    https://www.csie.ntu.edu.tw/~cjlin/papers/ffm.pdf, lines 1 - 2
    
    Args:
        rdd_hashed: An RDD of hashed values in the format where each row is of the form
            [label, feature_hash_1, feature_hash_2, ..., feature_hash_n]
        n_features: The number of features, for example 10**6
        n_fields: The number of fields, e.g. 39 for Criteo dataset
        k: dimension of the latent vector
        n_iters: number of iterations of gradient descent
        reg_c: regulization parameter 
        
    Returns:
        The returned matrix is of the type W[j,f,k] where the first index is the feature, 
        the second is the field and the third is the latent component 
        of the vector. So if there are 1,000,000 features, 39 fields and 10 latent vectors 
        the return value is a 1,000,000 x 39 x 10 tensor

    """
    W_old = sc.broadcast(np.random.uniform(0, 1/np.sqrt(k), size=(n_features, n_fields, k)))
    for i in range(n_iters):
        W_new = rdd_hashed.mapPartitions(lambda rdd, w_old=W_old, n_features=n_features, n_fields=n_fields, k=k, reg_c = reg_c: apply_sgd_step(rdd, W_old, n_features, n_fields, k, reg_c))\
        .sum()
        train_loss = log_loss(rdd_hashed, W_new)
        print(f"Training Loss: {train_loss}")
        W_old = sc.broadcast(W_new)
    return W_new

In [339]:
run_ffm_train(train_df.rdd , 10000, 39, 2, n_iters=10)

Training Loss: 51.32719269429185
Training Loss: 23.286917665494684
Training Loss: 10.763089486067768
Training Loss: 5.004336898513017
Training Loss: 2.395190453154146
Training Loss: 1.3326793380997923
Training Loss: 0.9781263711459354
Training Loss: 0.8444198268814129
Training Loss: 0.7846071228212534
Training Loss: 0.7514901123707874


array([[[0., 0.],
        [0., 0.],
        [0., 0.],
        ...,
        [0., 0.],
        [0., 0.],
        [0., 0.]],

       [[0., 0.],
        [0., 0.],
        [0., 0.],
        ...,
        [0., 0.],
        [0., 0.],
        [0., 0.]],

       [[0., 0.],
        [0., 0.],
        [0., 0.],
        ...,
        [0., 0.],
        [0., 0.],
        [0., 0.]],

       ...,

       [[0., 0.],
        [0., 0.],
        [0., 0.],
        ...,
        [0., 0.],
        [0., 0.],
        [0., 0.]],

       [[0., 0.],
        [0., 0.],
        [0., 0.],
        ...,
        [0., 0.],
        [0., 0.],
        [0., 0.]],

       [[0., 0.],
        [0., 0.],
        [0., 0.],
        ...,
        [0., 0.],
        [0., 0.],
        [0., 0.]]])

In [183]:
train_df.take(1)

[Row(label='-1', int_feature_1_hashed=None, int_feature_2_hashed=None, int_feature_3_hashed=None, int_feature_4_hashed=None, int_feature_5_hashed=None, int_feature_6_hashed=None, int_feature_7_hashed=None, int_feature_8_hashed=None, int_feature_9_hashed=None, int_feature_10_hashed=None, int_feature_11_hashed=None, int_feature_12_hashed=None, int_feature_13_hashed=None, cate_feature_1_hashed=4192, cate_feature_2_hashed=8153, cate_feature_3_hashed=9906, cate_feature_4_hashed=8694, cate_feature_5_hashed=4640, cate_feature_6_hashed=3340, cate_feature_7_hashed=1202, cate_feature_8_hashed=5036, cate_feature_9_hashed=855, cate_feature_10_hashed=3494, cate_feature_11_hashed=7462, cate_feature_12_hashed=7894, cate_feature_13_hashed=8815, cate_feature_14_hashed=3612, cate_feature_15_hashed=7855, cate_feature_16_hashed=7102, cate_feature_17_hashed=6795, cate_feature_18_hashed=2289, cate_feature_19_hashed=2179, cate_feature_20_hashed=5247, cate_feature_21_hashed=6487, cate_feature_22_hashed=2721, 

In [147]:
sorted([None, None])

TypeError: '<' not supported between instances of 'NoneType' and 'NoneType'

# Section 4 Write up

# Pandas Implementation

In [347]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [346]:
!pip install pyarrow

Collecting pyarrow
[?25l  Downloading https://files.pythonhosted.org/packages/dd/77/5865b367a6792da2f811ae49391c1f85c29b29663555aac0a118fe8e153e/pyarrow-0.15.1-cp36-cp36m-manylinux1_x86_64.whl (59.0MB)
[K    100% |################################| 59.0MB 295kB/s 
[31mtwisted 18.7.0 requires PyHamcrest>=1.9.0, which is not installed.[0m
Installing collected packages: pyarrow
Successfully installed pyarrow-0.15.1
[33mYou are using pip version 10.0.1, however version 19.3.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [371]:
train_small_df = train_df.limit(10)
train_from_pd = spark.createDataFrame(train_small_df.toPandas())

In [370]:
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import IntegerType

def multiply_func(a, b):
    return a * b

multiply = pandas_udf(multiply_func, returnType=IntegerType())

In [376]:
train_small_pd = train_small_df.toPandas()
multiply_func(train_small_pd["cate_feature_17_hashed"], train_small_pd["cate_feature_18_hashed"])

0    49875300
1    15553755
2    49970430
3    15553755
4    15553755
5    36808515
6    15553755
7    15553755
8    36808515
9    36808515
dtype: int32

In [378]:
train_from_pd.select(multiply(col("cate_feature_17_hashed"), col("cate_feature_18_hashed"))).collect()

Py4JJavaError: An error occurred while calling o14813.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1868.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1868.0 (TID 146738, localhost, executor driver): java.lang.NullPointerException
	at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:256)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:242)
	at org.apache.arrow.vector.ipc.ArrowFileReader.readRecordBatch(ArrowFileReader.java:162)
	at org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:113)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.nextBatch(ArrowConverters.scala:170)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.<init>(ArrowConverters.scala:138)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$.fromPayloadIterator(ArrowConverters.scala:135)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$3.apply(ArrowConverters.scala:211)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$3.apply(ArrowConverters.scala:209)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3195)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3192)
	at sun.reflect.GeneratedMethodAccessor382.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:256)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:242)
	at org.apache.arrow.vector.ipc.ArrowFileReader.readRecordBatch(ArrowFileReader.java:162)
	at org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:113)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.nextBatch(ArrowConverters.scala:170)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.<init>(ArrowConverters.scala:138)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$.fromPayloadIterator(ArrowConverters.scala:135)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$3.apply(ArrowConverters.scala:211)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$3.apply(ArrowConverters.scala:209)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


# Playground

In [3]:
sample_data = spark.read.csv("sample.txt", sep="\t")
sample_data.write.format("parquet").save("sample.parquet")

In [6]:
sample_df = spark.read.parquet("sample.parquet")

In [7]:
sample_df.show()

+---+---+-----+------+
|_c0|_c1|  _c2|   _c3|
+---+---+-----+------+
|  1| 10| ESPN|  Nike|
|  1| 15| ESPN|  Nike|
|  0|  2| ESPN| Gucci|
|  1| 10| ESPN|Adidas|
|  1| 10| ESPN|Adidas|
|  0|  3|Vogue|  Nike|
|  1| 20|Vogue| Gucci|
|  0|  5|Vogue|Adidas|
|  1| 50|  NBC|  Nike|
|  0|  0|  NBC| Gucci|
|  0|  4|  NBC|Adidas|
|  0|  4|  NBC|Adidas|
+---+---+-----+------+



In [17]:
def feature_hash(x, modulo=10**6):
    """
    A function that can be used to hash the features in each observation in the RDD. 
    We replace the label with 1, -1 and we hash all other features using sha256 
    and then we take modulo some power of 10. 
    """
    print(x)
#     x[0] = 2*int(x[0]) - 1
#     for i, value in enumerate(x[1:], 1):
#         h = sha256("{i}-{val}".format(i=i,val=value).encode('ascii'))
#         hashed_value = int(h.hexdigest(), base=16) 
#         hashed_value_mod = hashed_value % modulo
#         x[i] = hashed_value_mod
#     return x

# from pyspark.sql.functions import col
# sample_df.select(*(feature_hash(col(c)).alias(c) for c in sample_df.columns)).show()
sample_df.withColumn("_c0", feature_hash(sample_df["_c0"]))

Column<b'_c0'>


AssertionError: col should be Column

# Running Full Models

In [311]:
train_data = spark.read.csv("data/dac/train.txt", sep="\t")
train_data.write.format("parquet").save(f"data/dac/train.parquet")
full_rdd = sc.textFile('data/dac/train.txt')
train_rdd, test_rdd = full_rdd.randomSplit([0.8,0.2], seed = 2018)

In [47]:
train_parquet = spark.read.parquet("data/dac/train.parquet")

In [48]:
from pyspark.sql import types

oldColNames = train_parquet.schema.names
train_parquet = train_parquet.withColumn("label", train_parquet["_c0"])
for colNum in range(1,14): 
    colName = "_c" + str(colNum)
    train_parquet = train_parquet.withColumn("int_feature_"+ str(colNum), train_parquet[colName].cast(types.IntegerType()))
for colNum in range(14,40): 
    colName = "_c" + str(colNum)
    train_parquet = train_parquet.withColumn("cate_feature_"+ str(colNum-13), train_parquet[colName])

#drop the old columns
train_parquet = train_parquet.drop(*oldColNames)

In [49]:
n_features = 100
n_fields = 3

In [50]:
intFieldNames = [colName for colName, dType in train_parquet.dtypes if dType == 'int']
cateFieldNames = [colName for colName, dType in train_parquet.dtypes if dType == 'string' and colName != 'label']

In [51]:
from pyspark.ml.feature import FeatureHasher
hasher = FeatureHasher()
hasher.setCategoricalCols(intFieldNames)
hasher.setNumFeatures(n_features)

# for col in intFieldNames + cateFieldNames:
hasher.setInputCols(intFieldNames + cateFieldNames)
hasher.setOutputCol("hashed_features")
train_parquet = hasher.transform(train_parquet)

In [53]:
train_parquet.show(1)

+-----+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+--------------------+
|label|int_feature_1|int_feature_2|int_feature_3|int_feature_4|int_feature_5|int_feature_6|int_feature_7|int_feature_8|int_feature_9|int_feature_10|int_feature_11|int_feature_12|int_feature_13|cate_feature_1|cate_feature_2|cate_feature_3|cate_feature_4|cate_feature_5|cate_feature_6|cate_feature_7|cate_feature_8|cate_feature_9|cate_feature_10|cate_feature_11|cate_feature_12|ca

In [56]:
train_parquet.select("int_feature_1").collect()

Py4JJavaError: An error occurred while calling o2531.collectToPython.
: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:282)
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:276)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.foreach(SparkPlan.scala:276)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:298)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:297)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3195)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3192)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [338]:
for col in intFieldNames + cateFieldNames:

['cate_feature_1',
 'cate_feature_2',
 'cate_feature_3',
 'cate_feature_4',
 'cate_feature_5',
 'cate_feature_6',
 'cate_feature_7',
 'cate_feature_8',
 'cate_feature_9',
 'cate_feature_10',
 'cate_feature_11',
 'cate_feature_12',
 'cate_feature_13',
 'cate_feature_14',
 'cate_feature_15',
 'cate_feature_16',
 'cate_feature_17',
 'cate_feature_18',
 'cate_feature_19',
 'cate_feature_20',
 'cate_feature_21',
 'cate_feature_22',
 'cate_feature_23',
 'cate_feature_24',
 'cate_feature_25',
 'cate_feature_26']

In [152]:
sample_hashed = train_rdd.map(lambda x: feature_hash(x, 10000))
sample_hashed.take(1)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 776.0 failed 1 times, most recent failure: Lost task 0.0 in stage 776.0 (TID 52515, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/anaconda/lib/python3.6/site-packages/pyspark-2.3.1-py3.6.egg/pyspark/rdd.py", line 1371, in takeUpToNumLeft
    yield next(iterator)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-152-eaadc752e283>", line 1, in <lambda>
NameError: name 'feature_hash' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:149)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:149)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:149)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/anaconda/lib/python3.6/site-packages/pyspark-2.3.1-py3.6.egg/pyspark/rdd.py", line 1371, in takeUpToNumLeft
    yield next(iterator)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-152-eaadc752e283>", line 1, in <lambda>
NameError: name 'feature_hash' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:149)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:149)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


# Model Assessment
We then consider how well the model is performing on the training vs test set to check if the model is tending to overfit.

In [240]:
sample_hashed.take(1)

[[1, 1, 16, 4]]

In [241]:
import itertools
list(itertools.combinations([1, 16, 41], 2))

[(1, 16), (1, 41), (16, 41)]

In [242]:
wj0f1_wj1f0 = np.dot(W[1, 1, :], W[16, 0, :])
wj0f2_wj2f0 = np.dot(W[1, 2, :], W[4, 0, :])
wj1f2_wj2f1 = np.dot(W[16, 2, :], W[4, 1, :])
total = wj0f1_wj1f0 + wj0f2_wj2f0 + wj1f2_wj2f1
print(f"Expected value is the sum of these three: {total}")

Expected value is the sum of these three: 0.7879988184229724


In [243]:
sample_hashed.map(lambda x: phi(x[1:])).collect()[0]

0.7879988184229724

In [256]:
sys.getsizeof(np.random.uniform(0,1,size=(10000000,10)))

800000112

# Extras to consider
## Develop classes for the model

In [None]:
# class FFM:
#     def __init__(self, n_features, k = 10, eta = 0.1, reg_c = 0.1):
#         self.n_features = n_features
#         self.k = k
#         self.eta = eta
#         self.reg_c = reg_c
        
        
# ffm = FFM(25, k=3)
# ffm.n_features

### Notes
- We develop the paper this way to mimic homework from throughout the semester and build the model up sequentially
- Should we write tests and show them in the presentation for simple function like $\phi_{FFM}$

In [94]:
import itertools
a = np.random.randint(5, size=39)


In [95]:
%%timeit
combs = itertools.combinations(a, r=2)
np.sum([np.dot(i, j) for i, j in combs])
# print(f'itertools processed in {time.time() - start} seconds.')


1.23 ms ± 19.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


In [96]:
%%timeit
def phi_test(x):
    total = 0
    for i in range(len(x) - 1):
#         if not x[i]:
#             continue
            
        for j in range(i + 1, len(x)):
#             if x[j]:
                total += np.dot(int(x[i]), int(x[j]))
    return total

phi_test(a)
# print(f'phi_test processed in {time.time() - start} seconds.')

1.63 ms ± 14.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
