TODO:
- Do CV to pick regParam. Need to write a custom CV, since the dev/test sets must come from the final day.
- user-category interaction.
- user-site interaction.
- hour: morning, afternoon, night; weekend or not.
- Parameters of the lg model to tune:
    + elasticNetParam & regParam: L2 by default. Set elasticNetParam=1 to use L1.
    + standardization: categorical variables shouldn't need standardization.
    + weightCol: correct for inbalance.
    
Variations:
- user-category interaction: count per day (site_category, device_ip), (app_category, device_ip)
- user-site interaction (site_id, device_ip), (app_id, device_ip)

Basemodel:
- C1
- banner_pos
- site_category_vec
- app_category_vec
- device_type
- device_conn_type
- C15
- C16
- C18
- C19
- C21

No idea yet:
- hour
- device_model
- high-cardinality anonymous variables: C14, C17, C20

Not using:
- site_domain, app_domain: correlated with site/app_id (check)
- device_id: correlated with device_ip (check)

In [45]:
import pyspark
import math
import time

from pyspark.sql.functions import udf, col, mean as _mean, date_format, to_timestamp
from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [46]:
MulticlassMetrics.fMeasure

<function pyspark.mllib.evaluation.MulticlassMetrics.fMeasure(self, label=None, beta=None)>

In [3]:
spark = pyspark.sql.SparkSession.builder\
.appName("CTR Prediction")\
.master("local")\
.config("spark.local.dir", "/home/atkm/nycTaxi/tmp")\
.getOrCreate()

In [23]:
def read_csv(path):
    return spark.read.format("csv")\
      .option("header", "true")\
      .option("inferSchema", "true")\
      .load(path)

def clean_clicks(df):
    fmt = "yymmddHH"
    return df.withColumn('datetime', 
                  to_timestamp(col('hour').cast('string'), fmt))

## Second model
Include most features.
How to use user info (device_id, device_ip),
(site id, site_domain), and (app_id, app_domain)?
Deal with categorical features with high cardinality.
Ignore anonymous features with high cardinality.

In [107]:
str_cols = ['site_id','site_domain','site_category','app_id','app_domain','app_category','device_id','device_ip','device_model']

## First model (baseline)
Use categorical variables with low cardinality.
In terms of accuracy, the model is only as good as a model that predicts 0 (not click) for any input.

train_small.csv:
10s to read and clean.
90s to train.

In [39]:
%%time
df = clean_clicks(read_csv('data/train_tiny.csv'))

CPU times: user 5.4 ms, sys: 539 µs, total: 5.94 ms
Wall time: 204 ms


In [47]:
def lg_model(df):
    lg = LogisticRegression(
        featuresCol = 'features',
        labelCol = 'click'
    )

    str_indexer1 = StringIndexer(
        inputCol='site_category',
        outputCol='site_category_idx',
        handleInvalid='skip'
    )
    str_indexer2 = StringIndexer(
        inputCol='app_category',
        outputCol='app_category_idx',
        handleInvalid='skip'
    )
    encoder = OneHotEncoderEstimator(
        inputCols = ['site_category_idx', 'app_category_idx'],
        outputCols = ['site_category_vec', 'app_category_vec'])

    assembler = VectorAssembler(
        inputCols = ['C1',
             'banner_pos',
             'site_category_vec',
             'app_category_vec',
             'device_type',
             'device_conn_type',
             'C15',
             'C16',
             'C18',
             'C19',
             'C21'],
        outputCol = 'features'
    )

    
    pipeline = Pipeline(stages = [str_indexer1, str_indexer2, encoder,
                                  assembler, lg])
    
    last_day = 30
    test = df.withColumn('day', 
              date_format(col('datetime'), 'dd')
             ).filter(col('day') == last_day)
    train = df.withColumn('day', 
              date_format(col('datetime'), 'dd')
             ).filter(col('day') != last_day)
    
    train_start = time.time()
    model = pipeline.fit(train)
    train_time = time.time() - train_start
    print('Train time (s): ', train_time)
    
    pred_start = time.time()
    pred = model.transform(test)
    pred_time = time.time() - pred_start
    print('Prediction time (s): ', pred_time)
    
    evaluator = MulticlassClassificationEvaluator(
        labelCol='click', predictionCol='prediction', metricName='accuracy'
    )
    accuracy = evaluator.evaluate(pred.select('click','prediction'))
    print('Prediction accuracy: ', accuracy)
    
    def log_loss_func(predicted_prob, actual):
        # predicted_prob = [prob_0, prob_1]
        p = predicted_prob[1]
        return -(actual * math.log(p) + \
                 (1 - actual) * math.log(1 - p))
        return val
    
    log_loss = udf(log_loss_func, DoubleType())
    
    log_loss_score = pred.withColumn('log_loss', 
                      log_loss(col('probability'), col('click')))\
    .agg({'log_loss':'avg'}).collect()[0][0]
    
    print('Log loss: ', log_loss_score)

In [48]:
lg_model(df)

Train time (s):  3.726020336151123
Prediction time (s):  0.12889862060546875
Prediction accuracy:  0.839622641509434
Log loss:  0.45281404296974836


In [38]:
# baseline: if no click to all rows.
1 - df.agg({'click': 'avg'}).collect()[0][0]

0.830227

## Pipeline development with the tiny dataset

In [None]:
df = read_csv('data/train_tiny.csv')

In [4]:
vars1 = """
click,C1,banner_pos,site_category,app_category,device_type,device_conn_type,
C15,C16,C18,C19,C21
""".replace('\n','').split(',')
str_vars = ['site_category', 'app_category']
vars1

['click',
 'C1',
 'banner_pos',
 'site_category',
 'app_category',
 'device_type',
 'device_conn_type',
 'C15',
 'C16',
 'C18',
 'C19',
 'C21']

In [5]:
df.select(*vars1).show(5)

+-----+----+----------+-------------+------------+-----------+----------------+---+---+---+---+---+
|click|  C1|banner_pos|site_category|app_category|device_type|device_conn_type|C15|C16|C18|C19|C21|
+-----+----+----------+-------------+------------+-----------+----------------+---+---+---+---+---+
|    0|1005|         0|     3e814130|    07d7df22|          1|               0|300|250|  0| 35|221|
|    1|1005|         0|     3e814130|    07d7df22|          1|               0|300|250|  2| 35| 23|
|    0|1005|         0|     28905ebd|    07d7df22|          1|               0|320| 50|  0| 35|221|
|    0|1005|         0|     f028772b|    07d7df22|          1|               0|320| 50|  3| 39| 23|
|    0|1005|         1|     f028772b|    07d7df22|          1|               0|320| 50|  0| 35|221|
+-----+----+----------+-------------+------------+-----------+----------------+---+---+---+---+---+
only showing top 5 rows



In [9]:
# 
lg = LogisticRegression(
    featuresCol = 'features',
    labelCol = 'click'
)

str_indexer1 = StringIndexer(
    inputCol='site_category',
    outputCol='site_category_idx',
    handleInvalid='skip'
)
str_indexer2 = StringIndexer(
    inputCol='app_category',
    outputCol='app_category_idx',
    handleInvalid='skip'
)
encoder = OneHotEncoderEstimator(
    inputCols = ['site_category_idx', 'app_category_idx'],
    outputCols = ['site_category_vec', 'app_category_vec'])

assembler = VectorAssembler(
    inputCols = ['C1',
         'banner_pos',
         'site_category_vec',
         'app_category_vec',
         'device_type',
         'device_conn_type',
         'C15',
         'C16',
         'C18',
         'C19',
         'C21'],
    outputCol = 'features'
)

pipeline = Pipeline(stages = [str_indexer1, str_indexer2, encoder,
                              assembler, lg])

In [10]:
train, test = df.randomSplit([.5, .5])
model = pipeline.fit(train)
pred = model.transform(test)

In [12]:
pred.select('click','prediction').show(5)

+-----+----------+
|click|prediction|
+-----+----------+
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 5 rows



In [28]:
pred.columns

['id',
 'click',
 'hour',
 'C1',
 'banner_pos',
 'site_id',
 'site_domain',
 'site_category',
 'app_id',
 'app_domain',
 'app_category',
 'device_id',
 'device_ip',
 'device_model',
 'device_type',
 'device_conn_type',
 'C14',
 'C15',
 'C16',
 'C17',
 'C18',
 'C19',
 'C20',
 'C21',
 'site_category_idx',
 'app_category_idx',
 'site_category_vec',
 'app_category_vec',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [47]:
pred.select('probability', 'prediction').head(10)

[Row(probability=DenseVector([0.8537, 0.1463]), prediction=0.0),
 Row(probability=DenseVector([0.8538, 0.1462]), prediction=0.0),
 Row(probability=DenseVector([0.8539, 0.1461]), prediction=0.0),
 Row(probability=DenseVector([0.8543, 0.1457]), prediction=0.0),
 Row(probability=DenseVector([0.8539, 0.1461]), prediction=0.0),
 Row(probability=DenseVector([0.8538, 0.1462]), prediction=0.0),
 Row(probability=DenseVector([0.8536, 0.1464]), prediction=0.0),
 Row(probability=DenseVector([0.8544, 0.1456]), prediction=0.0),
 Row(probability=DenseVector([0.8539, 0.1461]), prediction=0.0),
 Row(probability=DenseVector([0.8539, 0.1461]), prediction=0.0)]

In [71]:
def log_loss_func(predicted_prob, actual):
    # predicted_prob = [prob_0, prob_1]
    p = predicted_prob[1]
    return -(actual * math.log(p) + \
             (1 - actual) * math.log(1 - p))
    return val
    
log_loss = udf(log_loss_func, DoubleType())

In [100]:
sc = pred.withColumn('log_loss', 
                      log_loss(col('probability'), col('click')))\
      .agg({'log_loss':'avg'}).collect()[0][0]
sc

0.4736354066653905

In [14]:
evaluator = MulticlassClassificationEvaluator(
    # also has 'f1', 'precision', 'recall'
    labelCol='click', predictionCol='prediction', metricName='accuracy'
)
accuracy = evaluator.evaluate(pred.select('click','prediction'))
accuracy

0.8211382113821138