In [1]:
import numpy as np
import pandas as pd

from google.cloud import bigquery

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

## Create BQ table with data for model

In [2]:
bq_client = bigquery.Client()
job_config = bigquery.QueryJobConfig()

table_ref = bq_client.dataset('instacart').table('reorder_model')
job_config.destination = table_ref
job_config.write_disposition = 'WRITE_TRUNCATE'

query = """
    WITH users AS (
      SELECT user_id, COUNT(*) AS num_orders, SUM(days_since_prior_order) AS days_bw_first_last_order
      FROM instacart.orders
      WHERE eval_set = "prior"
      GROUP BY 1
    ), user_product AS (
      SELECT orders.user_id, op.product_id, 
        COUNT(*) AS num_orders, SUM(op.reordered) AS num_reorders,
        MIN(orders.order_number) AS first_order_number, MIN(days_since_first_order) AS first_order_day,
        MAX(orders.order_number) AS last_order_number, MAX(days_since_first_order) AS last_order_day,
        AVG(op.add_to_cart_order) AS avg_cart_order
      FROM instacart.order_products__prior AS op
      INNER JOIN (
        SELECT *,
          SUM(COALESCE(days_since_prior_order,0)) OVER (PARTITION BY user_id ORDER BY order_number ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `days_since_first_order`
        FROM instacart.orders 
        WHERE eval_set = "prior"
      ) AS orders USING(order_id)
      GROUP BY 1,2
    ), user_product_features AS (
      SELECT up.user_id, up.product_id,
        up.num_orders / users.num_orders AS perc_all_orders,
        SAFE_DIVIDE(up.num_reorders, users.num_orders - up.first_order_number) AS perc_reorder,
        SAFE_DIVIDE(up.num_orders, users.days_bw_first_last_order) AS orders_per_day,
        SAFE_DIVIDE(up.num_reorders, users.days_bw_first_last_order - up.first_order_day) AS reorders_per_day,
        up.first_order_number, up.first_order_day, up.last_order_number, up.last_order_day, up.avg_cart_order, 
        users.days_bw_first_last_order
      FROM user_product AS up
      INNER JOIN users AS users USING(user_id)
    ), user_features AS (
      SELECT orders.user_id,
        ANY_VALUE(users.num_orders) AS num_orders,
        ANY_VALUE(users.days_bw_first_last_order) AS days_bw_first_last_order,
        ANY_VALUE(users.days_bw_first_last_order) / ANY_VALUE(users.num_orders) AS avg_days_bw_orders,
        COUNT(*) / ANY_VALUE(users.num_orders) AS num_products_per_order,
        SUM(op.reordered) / SUM(CASE WHEN orders.order_number > 1 THEN 1 ELSE 0 END) AS perc_reorder,
        COUNT(DISTINCT op.product_id) AS num_products,
        COUNT(DISTINCT products.aisle_id) AS num_aisles,
        COUNT(DISTINCT products.department_id) AS num_departments
      FROM instacart.orders AS orders
      INNER JOIN instacart.order_products__prior AS op USING(order_id)
      INNER JOIN instacart.products AS products USING(product_id)
      INNER JOIN users USING(user_id)
      GROUP BY 1
    ), product_features AS (
      SELECT product_id, aisle_id, department_id,
        num_users / num_users_tot AS perc_users,
        num_orders / num_orders_tot AS perc_all_orders,
        num_reorder / num_reorder_tot AS perc_reorder
      FROM (
        SELECT products.product_id, products.aisle_id, products.department_id,
          COUNT(DISTINCT orders.user_id) AS num_users,
          COUNT(*) AS num_orders, 
          SUM(op.reordered) AS num_reorder
        FROM instacart.orders AS orders
        INNER JOIN instacart.order_products__prior AS op USING(order_id)
        INNER JOIN instacart.products AS products USING(product_id)
        GROUP BY 1,2,3
      ) AS x
      INNER JOIN (
        SELECT COUNT(DISTINCT user_id) AS num_users_tot,
          COUNT(*) AS num_orders_tot, 
          SUM(CASE WHEN order_number > 1 THEN 1 ELSE 0 END) AS num_reorder_tot
        FROM instacart.orders
        WHERE eval_set = "prior"
      ) AS y ON 1=1
    ), all_features AS (
      SELECT
        upf.user_id,
        upf.product_id,
        pf.aisle_id,
        pf.department_id,
        upf.perc_all_orders AS upf_perc_all_orders,
        upf.perc_reorder AS upf_perc_reorder,
        upf.orders_per_day AS upf_orders_per_day,
        upf.reorders_per_day AS upf_reorders_per_day,
        upf.first_order_number AS upf_first_order_number,
        upf.first_order_day AS upf_first_order_day,
        upf.last_order_number AS upf_last_order_number,
        upf.last_order_day AS upf_last_order_day,
        upf.avg_cart_order AS upf_avg_cart_order,
        uf.num_orders AS uf_num_orders,
        uf.num_products_per_order AS uf_num_products_per_order,
        uf.perc_reorder AS uf_perc_reorder,
        uf.days_bw_first_last_order AS uf_days_bw_first_last_order,
        uf.avg_days_bw_orders AS uf_avg_days_bw_orders,
        uf.num_products AS uf_num_products,
        uf.num_aisles AS uf_num_aisles,
        uf.num_departments AS uf_num_departments,
        pf.perc_users AS pf_perc_users,
        pf.perc_all_orders AS pf_perc_all_orders,
        pf.perc_reorder AS pf_perc_reorder
      FROM user_product_features AS upf
      INNER JOIN user_features AS uf USING(user_id)
      INNER JOIN product_features AS pf USING(product_id)
    )
    SELECT af.*, 
      # a few other features that need to computed based on order
      af.uf_days_bw_first_last_order - af.upf_last_order_day + o.days_since_prior_order AS upf_days_since_last_order,
      o.order_number - af.upf_last_order_number AS upf_orders_since_last_order,
      # train vs. test and reordered (only for train)
      o.eval_set,
      o.order_id,
      CASE WHEN o.eval_set = "test" THEN NULL ELSE LEAST(COALESCE(op_train.order_id,0),1) END AS reordered
    FROM all_features AS af
    INNER JOIN instacart.orders AS o ON af.user_id = o.user_id AND o.eval_set IN ('train','test')
    LEFT JOIN instacart.order_products__train AS op_train ON o.order_id = op_train.order_id AND af.product_id = op_train.product_id
"""

query_job = bq_client.query(query, job_config=job_config)
query_job.result()

<google.cloud.bigquery.table.RowIterator at 0x7f3e5dd84400>

## Pull data from BQ into Spark DF

In [3]:
# for deleting temp files when we're done
def cleanup(sess, input_directory):
    input_path = sess._jvm.org.apache.hadoop.fs.Path(input_directory)
    input_path.getFileSystem(sess._jsc.hadoopConfiguration()).delete(input_path, True)

In [4]:
# set up spark session
sess = SparkSession.builder.appName("Model builder").getOrCreate()
bucket = sess._sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sess._sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
output = 'gs://instacart-data/outputs/reorder_test_pred.csv'

In [5]:
# load data from bq
conf = {
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': project,
    'mapred.bq.input.dataset.id': 'instacart',
    'mapred.bq.input.table.id': 'reorder_model',
}

cleanup(sess, input_directory)

data_raw = sess._sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

data_json = data_raw.map(lambda x: x[1])
data_df = sess.read.json(data_json).repartition(sess._sc.defaultParallelism * 2)

In [6]:
# cast integers
data_df = data_df\
    .withColumn('label', data_df.reordered.cast('integer'))\
    .withColumn('aisle_id', data_df.aisle_id.cast('integer'))\
    .withColumn('department_id', data_df.department_id.cast('integer'))\
    .withColumn('user_id', data_df.user_id.cast('integer'))\
    .withColumn('product_id', data_df.product_id.cast('integer'))\
    .withColumn('order_id', data_df.order_id.cast('integer'))\
    .withColumn('uf_num_orders', data_df.uf_num_orders.cast('integer'))\
    .withColumn('uf_days_bw_first_last_order', data_df.uf_days_bw_first_last_order.cast('integer'))\
    .withColumn('uf_num_aisles', data_df.uf_num_aisles.cast('integer'))\
    .withColumn('uf_num_departments', data_df.uf_num_departments.cast('integer'))\
    .withColumn('uf_num_products', data_df.uf_num_products.cast('integer'))\
    .withColumn('upf_first_order_day', data_df.upf_first_order_day.cast('integer'))\
    .withColumn('upf_first_order_number', data_df.upf_first_order_number.cast('integer'))\
    .withColumn('upf_last_order_day', data_df.upf_last_order_day.cast('integer'))\
    .withColumn('upf_last_order_number', data_df.upf_last_order_number.cast('integer'))\
    .withColumn('upf_orders_since_last_order', data_df.upf_orders_since_last_order.cast('integer'))\
    .withColumn('upf_days_since_last_order', data_df.upf_days_since_last_order.cast('integer'))   

In [7]:
# cache to each worker
data_df.cache()

DataFrame[aisle_id: int, department_id: int, eval_set: string, order_id: int, pf_perc_all_orders: double, pf_perc_reorder: double, pf_perc_users: double, product_id: int, reordered: string, uf_avg_days_bw_orders: double, uf_days_bw_first_last_order: int, uf_num_aisles: int, uf_num_departments: int, uf_num_orders: int, uf_num_products: int, uf_num_products_per_order: double, uf_perc_reorder: double, upf_avg_cart_order: double, upf_days_since_last_order: int, upf_first_order_day: int, upf_first_order_number: int, upf_last_order_day: int, upf_last_order_number: int, upf_orders_per_day: double, upf_orders_since_last_order: int, upf_perc_all_orders: double, upf_perc_reorder: double, upf_reorders_per_day: double, user_id: int, label: int]

## Hyperparameter tuning

In [8]:
# split into train/test
train = data_df.filter(data_df.eval_set == 'train')
test = data_df.filter(data_df.eval_set == 'test')

train_user, validate_user = train.select('user_id').distinct().randomSplit([0.8, 0.2], seed=1)
train2 = train.join(train_user, 'user_id')
validate = train.join(validate_user, 'user_id')    

In [9]:
# construct pipeline
xvar1 = ["upf_perc_all_orders", "upf_perc_reorder", "upf_orders_per_day", "upf_reorders_per_day", \
         "upf_first_order_number", "upf_first_order_day", "upf_last_order_number", "upf_last_order_day", \
         "upf_avg_cart_order", "upf_days_since_last_order", "upf_orders_since_last_order"]

xvar2 = ["uf_num_orders", "uf_num_products_per_order", "uf_perc_reorder", \
         "uf_days_bw_first_last_order", "uf_avg_days_bw_orders", "uf_num_products", "uf_num_aisles", \
         "uf_num_departments"]

xvar3 = ["pf_perc_users", "pf_perc_all_orders", "pf_perc_reorder"]

xvar = xvar1 + xvar2 + xvar3

null_counts = train.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in train.columns)).\
              toPandas().transpose()
null_col = list(null_counts.index[null_counts[0].nonzero()])

imp = Imputer(strategy="median", inputCols=null_col, outputCols=null_col)
va = VectorAssembler(inputCols=xvar, outputCol="features")
ss = StandardScaler(withMean=True, withStd=True, inputCol="features", outputCol="features2")
lr = LogisticRegression(maxIter=100, featuresCol="features2")
pipeline = Pipeline(stages=[imp, va, ss, lr])

In [10]:
# hyperparameter tuning
param_grid = ParamGridBuilder()\
    .addGrid(lr.regParam, list([10**k for k in range(-2, 0)])) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

eva = BinaryClassificationEvaluator(metricName='areaUnderROC')
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=param_grid,
                    numFolds=3,
                    evaluator=eva)

cv_model = cv.fit(train2)

best_func = np.argmax if eva.isLargerBetter() else np.argmin
best_idx = best_func(cv_model.avgMetrics)
best_score = cv_model.avgMetrics[best_idx]
best_param = param_grid[best_idx]

print("Best CV score: {}".format(best_score))
print("Best CV param: {}".format(best_param))

Best CV score: 0.8202132454179041
Best CV param: {Param(parent='LogisticRegression_4ec08603d307f531fe6b', name='regParam', doc='regularization parameter (>= 0).'): 0.01, Param(parent='LogisticRegression_4ec08603d307f531fe6b', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0}


In [11]:
# determine cutoff which maximizes mean F1 score
true_prob = udf(lambda x: float(x[-1]))

validate_pred = cv_model.transform(validate)
validate_pred = validate_pred.select(true_prob('probability').alias('probability').cast('float'), 'label')
validate_pred_df = validate_pred.toPandas()

thresholds = np.arange(0, 1, 0.01)
precision = np.array([np.mean(validate_pred_df.label[validate_pred_df.probability > x]) for x in thresholds])
recall = np.array([np.sum(validate_pred_df.label[validate_pred_df.probability > x]) / np.sum(validate_pred_df.label) for x in thresholds])
f1 = (2*precision*recall) / (precision + recall)
optimal_threshold = thresholds[np.nanargmax(f1)]

print("Optimal threshold: {}".format(optimal_threshold))
print("Optimal threshold F1: {}".format(np.nanmax(f1)))

best_param[lr.threshold] = optimal_threshold

Optimal threshold: 0.19
Optimal threshold F1: 0.42373350509153046


## Final model and prediction output

In [12]:
# tune model on entire data
model = pipeline.fit(train, best_param)

In [13]:
# create predictions for test set
collapse = udf(lambda x: ' '.join([str(i) for i in x]))

test_pred = model.transform(test)
test_pred = test_pred.filter(test_pred.prediction == 1)\
                .groupBy('order_id').agg(collect_list('product_id').alias('products'))
test_pred = test_pred.withColumn('products', collapse('products'))
test_pred = test.select('order_id').distinct().join(test_pred, on='order_id', how='left')

In [14]:
# export
cleanup(sess, output)
test_pred.repartition(1).write.option('header', 'true').csv(output)

In [15]:
# cleanup
cleanup(sess, input_directory)