In [None]:
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window

import predictionio

from collections import defaultdict
from datetime import datetime

import pandas as pd
import numpy as np
import ml_metrics as metrics
from tqdm import tqdm

In [None]:
# 
# Data files are stored in hdfs
# For convenience fool hdfs url is used 
# Url consists of four parts: base url, dataset name, dataset purpose (train/test) and version
# It's supposed that fool dataset has .all extension
#

BASE_URL = ???
DATASET_NAME = ???
VERSION = "1"

path_to_source_data = BASE_URL + "/" + DATASET_NAME + ".all"
path_to_train_data = BASE_URL + "/" + DATASET_NAME + ".train." + VERSION
path_to_test_data = BASE_URL + "/" + DATASET_NAME + ".test." + VERSION

#
# Event list can be obtaned from data, but for reporting 
# purposes it's more convenient to define evens order.
# Also primary event should be determined, the event we're
# going to make prediction of.
# And split event, this one will be used to determine 
# time moment for dataset split on train and test subsets.
# It's often the same as primary event
#

PRIMARY_EVENT_NAME = ???
SPLIT_EVENT_NAME = ??? # it should be often primary event
eventsList = [???]

report_file_prefix = DATASET_NAME + "." + VERSION

In [None]:
sqlContext = SQLContext(sc)
df = sqlContext.read.json(path_to_source_data)
df = df.withColumn("Date", F.from_utc_timestamp("eventTime", "UTC"))

In [None]:
# Get number of records
df.count()

In [None]:
# Get events count
df.groupBy('event').count().toPandas()

In [None]:
users_with_event_count = df.groupBy(F.col("entityId").alias("user")).count()

In [None]:
#
# How many users have only one event
#

users_with_event_count.filter("count = 1").count()

In [None]:
#
# Filter users with small number of events
#

min_events = 10
users_with_few_events = (users_with_event_count
                         .filter("count < %d" % (min_events))
                         .select(F.col("user").alias("user_with_few_events")))
ndf = df.join(users_with_few_events, 
              F.col("entityId")==F.col("user_with_few_events"), 
              how="left_outer")
df1 = ndf.filter("user_with_few_events is NULL").drop("user_with_few_events")
#df = df1

In [None]:
# Check new number of records
df1.count()

In [None]:
#df = df1

In [None]:
def get_split_date(df, train_ratio=0.8):
    """Calculates split date 
    
    Calculates the moment of time that we will use to split 
    data into the train (befor the moment) and the test sets
    
    Args:
        df: Spark DataFrame
        train_ratio: ratio of samples in train set

    Returns:
        A datetime object        
    """
    date_rdd = (df
                .filter("event = '%s'" % (PRIMARY_EVENT_NAME))
                .select("Date")
                .sort("Date", ascending=True)
                .rdd)
    total_primary_events = date_rdd.count()
    split_date = (date_rdd
                  .zipWithIndex()
                  .filter(lambda x: x[1] > total_primary_events * train_ratio)
                  .first()[0][0])
    return split_date

split_date = get_split_date(df)

In [None]:
split_date

In [None]:
# No need in Date as we have eventTime
# save data for training and then consequent test
#WRIGHTING_MODE = "error" # or use mode="overwrite"
WRIGHTING_MODE = "overwrite"

# Test records are newer then train so that we have no information leakage from the future
df.filter(F.col("Date") >= split_date).drop("Date").write.json(path_to_test_data, mode=WRIGHTING_MODE) 
df.filter(F.col("Date") < split_date).drop("Date").write.json(path_to_train_data, mode=WRIGHTING_MODE)

In [None]:
# Now you may want to use your Spark cluster to perform the model training
# Use pio import --appid <APPID> --input <path_to_train_data>
# 
# You may continue analysis from the next line after save, just rerun 3 first lines
# and copy split_date value to the next line

In [None]:
split_date = datetime(2015, 12, 8, 1, 53, 35)

In [None]:
#
# We only need column subset for consequent steps
#

train_df = df.filter(F.col("Date") < split_date).select("entityId", "event", "targetEntityId").cache()
test_df = df.filter(F.col("Date") >= split_date).select("entityId", "event", "targetEntityId").cache()

In [None]:
#
# Calculation of different stat metrics of datasets
#

events_by_type = (df
                  .groupBy("event")
                  .count()
                  .select(F.col("event"), F.col("count").alias("count_total"))
                  .toPandas())

events_by_type_test = (test_df
                       .groupBy("event")
                       .count()
                       .select(F.col("event"), F.col("count").alias("count_test"))
                       .toPandas()
                       .set_index("event"))

events_by_type_train = (train_df
                        .groupBy("event")
                        .count()
                        .select(F.col("event"), F.col("count").alias("count_train"))
                        .toPandas()
                        .set_index("event"))

# --- 
unique_users_by_event = (df
                         .select(F.col("entityId"), F.col("event"))
                         .distinct()
                         .groupBy("event")
                         .count()
                         .select(F.col("event"), F.col("count").alias("unique_users_total"))
                         .toPandas()
                         .set_index("event"))

unique_users_by_event_train = (train_df
                               .select(F.col("entityId"), F.col("event"))
                               .distinct()
                               .groupBy("event")
                               .count()
                               .select(F.col("event"), F.col("count").alias("unique_users_train"))
                               .toPandas()
                               .set_index("event"))

unique_users_by_event_test = (test_df
                              .select(F.col("entityId"), F.col("event"))
                              .distinct()
                              .groupBy("event")
                              .count()
                              .select(F.col("event"), F.col("count").alias("unique_users_test"))
                              .toPandas()
                              .set_index("event"))

# --- 
unique_items_by_event = (df
                         .select(F.col("targetEntityId"), F.col("event"))
                         .distinct()
                         .groupBy("event")
                         .count()
                         .select(F.col("event"), F.col("count").alias("unique_items_total"))
                         .toPandas()
                         .set_index("event"))

unique_items_by_event_train = (train_df
                               .select(F.col("targetEntityId"), F.col("event"))
                               .distinct()
                               .groupBy("event")
                               .count()
                               .select(F.col("event"), F.col("count").alias("unique_items_train"))
                               .toPandas()
                               .set_index("event"))

unique_items_by_event_test = (test_df
                              .select(F.col("targetEntityId"), F.col("event"))
                              .distinct()
                              .groupBy("event")
                              .count()
                              .select(F.col("event"), F.col("count").alias("unique_items_test"))
                              .toPandas()
                              .set_index("event"))

# totals
events = df.count()
events_train = train_df.count()
events_test = test_df.count()

unique_users = df.select("entityId").distinct().count()
unique_users_train = train_df.select("entityId").distinct().count()
unique_users_test = test_df.select("entityId").distinct().count()

unique_items = df.select(F.col("targetEntityId")).distinct().count()
unique_items_train = train_df.select(F.col("targetEntityId")).distinct().count()
unique_items_test = test_df.select(F.col("targetEntityId")).distinct().count()

In [None]:
info_df = events_by_type
dfs = [events_by_type_train, events_by_type_test, 
       unique_users_by_event, unique_users_by_event_train, unique_users_by_event_test, 
       unique_items_by_event, unique_items_by_event_train, unique_items_by_event_test]

for data_frame in dfs:
    info_df = info_df.join(data_frame, on="event")
    
n_rows, n_cols = info_df.shape

# totals
info_df.loc[n_rows] = ['ANY EVENT', events, events_train, events_test, 
                  unique_users, unique_users_train, unique_users_test, 
                  unique_items, unique_items_train, unique_items_test]

In [None]:
info_df.to_csv(report_file_prefix + "_split_info.csv")

In [None]:
def mk_intersection_matrix(by_rows, columns_for_matrix, 
                           horizontal_suffix="", vertical_suffix=""):
    """ Makes pandas dataframe of intersections out of list of rows
    
    """
    result = pd.DataFrame(columns=[col + horizontal_suffix for col in columns_for_matrix])
    for en in columns_for_matrix:
        result.loc[en + vertical_suffix, :] = [0] * len(columns_for_matrix)
    for r in by_rows:
        row = r.asDict()
        en_h = row['event_left']
        en_v = row['event_right']
        count = row['count']
        result.loc[en_v + vertical_suffix, en_h + horizontal_suffix] = count
    return result

In [None]:
columns_for_matrix = eventsList

In [None]:
train_train_users = (
    train_df
    .select(F.col("entityId").alias("user"), F.col("event").alias("event_left"))
    .distinct()
    .join(train_df.select(F.col("entityId").alias("user"), F.col("event").alias("event_right")).distinct(), 
       on="user", how="inner")
    .groupBy(["event_left", "event_right"])
    .count()
    .collect())

trtru = mk_intersection_matrix(train_train_users, columns_for_matrix)
trtru.to_csv(report_file_prefix + "_train_train_user_intersection.csv")

In [None]:
train_test_users = (
    train_df
    .select(F.col("entityId").alias("user"), F.col("event").alias("event_left"))
    .distinct()
    .join(test_df.select(F.col("entityId").alias("user"), F.col("event").alias("event_right")).distinct(), 
       on="user", how="inner")
    .groupBy(["event_left", "event_right"])
    .count()
    .collect())


trtsu = mk_intersection_matrix(train_test_users, columns_for_matrix, 
                               horizontal_suffix=" train", vertical_suffix=" test")
trtsu.to_csv(report_file_prefix + "_train_test_user_intersection.csv")

In [None]:
train_train_items = (
    train_df
    .select(F.col("targetEntityId").alias("item"), F.col("event").alias("event_left"))
    .distinct()
    .join(train_df.select(F.col("targetEntityId").alias("item"), F.col("event").alias("event_right")).distinct(), 
       on="item", how="inner")
    .groupBy(["event_left", "event_right"])
    .count()
    .collect())

trtri = mk_intersection_matrix(train_train_items, columns_for_matrix)
trtri.to_csv(report_file_prefix + "_train_train_item_intersection.csv")

In [None]:
train_test_items = (
    train_df
    .select(F.col("targetEntityId").alias("item"), F.col("event").alias("event_left"))
    .distinct()
    .join(test_df.select(F.col("targetEntityId").alias("item"), F.col("event").alias("event_right")).distinct(), 
       on="item", how="inner")
    .groupBy(["event_left", "event_right"])
    .count()
    .collect())

trtsi = mk_intersection_matrix(train_test_items, columns_for_matrix,
                               horizontal_suffix=" train", vertical_suffix=" test")
trtsi.to_csv(report_file_prefix + "_train_test_item_intersection.csv")

In [None]:
# 
# Now we perform "dummy test"
# We evaluate performance of "naive" predictors and use them as a baseline.
#
# Three baseline MAP @ k experiments:
# 1. Random sampling from items (uniform) - i.e. naive predicor with the list
# of items and as it's naive it just predicts some item from its list 
#
# 2. Random sampling from items (according to their distribution in training data)
# In this case the chance of choosing the item is proportional to the item popularity
#
# 3. Top-N items from training data
# This predictor always use most popular items as its predictions
#

In [None]:
# 
# Items counts
#
counts = train_df.filter("event = '%s'" % (PRIMARY_EVENT_NAME)).groupBy("targetEntityId").count().collect()

In [None]:
sorted_rating = sorted([(row.asDict()['count'], row.asDict()['targetEntityId']) for row in counts], reverse=True)
elements = np.array([item for cnt, item in sorted_rating])
probs = np.array([cnt for cnt, item in sorted_rating])
probs = 1.0 * probs / probs.sum()

In [None]:
def run_map_test_dummy(data, items=None, probs=None, uniform=True, top=True,
                       users=None, primaryEvent=PRIMARY_EVENT_NAME, K=10, no_progress=False):
    """Performs dummy test
    
    Args:
        data: list of event rows
        items: np.array or list of items sorted in descending popularity order
        probs: np.array or list of corresponding probabilities (needed for experiment #2)
        uniform: Boolean flag to use uniform sampling
        top: Boolean flag to use top items
        users: set of users to consider
        primaryEvent: str name of primary event
        K: int for MAP @ K
        no_progress: Boolean flag not to show the progress bar during calculations
    
    Returns:
        list of [MAP@1, MAP@2, ... MAP@K] evaluations
    """
    d = {}
    for rec in data:
        if rec.event == primaryEvent:
            user = rec.entityId
            item = rec.targetEntityId
            if (users is None) or (user in users):
                d.setdefault(user, []).append(item)
    
    holdoutUsers = d.keys()
    
    prediction = []
    ground_truth = []
    if no_progress:
        gen = holdoutUsers
    else:
        gen = tqdm(holdoutUsers)
    for user in gen:
        if top:
            test_items = items[0:K]
        elif uniform:
            test_items = np.random.choice(items, size=(K,))
        else:
            test_items = np.random.choice(items, size=(K,), p=probs)
        prediction.append(test_items)
        ground_truth.append(d.get(user, []))
    return [metrics.mapk(ground_truth, prediction, k) for k in range(1, K + 1)]

In [None]:
test_data = test_df.filter("event = '%s'" % (PRIMARY_EVENT_NAME)).collect()

In [None]:
# case 1. Random sampling from items (uniform)
run_map_test_dummy(test_data, items=elements, probs=probs, uniform=True, top=False)

In [None]:
# case 2. Random sampling from items (according to their distribution in training data)
run_map_test_dummy(test_data, items=elements, probs=probs, uniform=False, top=False)

In [None]:
# case 3. Top-N items from training data
run_map_test_dummy(test_data, items=elements, probs=probs, uniform=True, top=True)

In [None]:
#
# Additional test to chech top-20 most popular items 
# what MAP@1 score do they give
#

for i in range(20):
    r = run_map_test_dummy(test_data, items=elements[i:], uniform=True, top=True, K=1, no_progress=True)[0]
    print(r)

In [None]:
#
# MAP test itself
# One should have trained model and pio deploy running to perform MAP test
# of the model
#

In [None]:
sqlContext = SQLContext(sc)
test_df = sqlContext.read.json(path_to_test_data)

In [None]:
test_df.count()

In [None]:
test_data = (test_df
             .filter("event = '%s'" % (PRIMARY_EVENT_NAME))
             .select("entityId", "event", "targetEntityId")
             .collect())

In [None]:
len(test_data)

In [None]:
def run_map_test(data, eventNames, users=None, primaryEvent=PRIMARY_EVENT_NAME, 
                 consider_non_zero_scores=True, num=100, K=10,
                 test=False, predictionio_url="http://0.0.0.0:8000"):
    N_TEST = 2000
    d = {}
    res_data = {}
    engine_client = predictionio.EngineClient(url=predictionio_url)

    for rec in data:
        if rec.event == primaryEvent:
            user = rec.entityId
            item = rec.targetEntityId
            if (users is None) or (user in users):
                d.setdefault(user, []).append(item)
    
    if test:
        holdoutUsers = d.keys()[1:N_TEST]
    else:
        holdoutUsers = d.keys()
    
    prediction = []
    ground_truth = []
    user_items_cnt = 0.0
    users_cnt = 0
    for user in tqdm(holdoutUsers):
        q = {
            "user": user,
            "eventNames": eventNames,
            "num": num,
        }
        
        try:
            res = engine_client.send_query(q)
            # Sort by score then by item name
            tuples = sorted([(r["score"], r["item"]) for r in res["itemScores"]], reverse=True)
            scores = [score for score, item in tuples]
            items = [item for score, item in tuples]
            res_data[user] = {
                "items": items,
                "scores": scores,
            }
            # Consider only non-zero scores 
            if consider_non_zero_scores:
                if len(scores) > 0 and scores[0] != 0.0:
                    prediction.append(items)
                    ground_truth.append(d.get(user, []))
                    user_items_cnt += len(d.get(user, []))
                    users_cnt += 1
            else:
                prediction.append(items)
                ground_truth.append(d.get(user, []))
                user_items_cnt += len(d.get(user, []))
                users_cnt += 1
        except predictionio.NotFoundError:
            print("Error with user: %s" % user)
    return ([metrics.mapk(ground_truth, prediction, k) for k in range(1, K + 1)], 
            res_data, user_items_cnt/users_cnt)

# 
def get_nonzero(r_data):
    users = [user for user, res_data in r_data.items() if res_data['scores'][0] != 0.0]
    return users

In [None]:
#
# Primary event test
# This test is mostly necessary to find users with non-zero item scores
#
(map_res, res_data, items_per_user) = run_map_test(test_data, [PRIMARY_EVENT_NAME], test=False, num=2)
map_res

In [None]:
#
# use this number rounded as the parameter K in run_map_test
#

items_per_user

In [None]:
non_zero_users = get_nonzero(res_data)
len(non_zero_users)

In [None]:
non_zero_users_csv = "non_zero_users." + DATASET_NAME + "." + VERSION + ".csv"

In [None]:
pd.DataFrame(data=non_zero_users, 
             columns=['user']).to_csv(non_zero_users_csv)

In [None]:
# restore non zero users if rerun
#non_zero_users = set(pd.read_csv(non_zero_users_csv)['user'].astype(np.object))

In [None]:
#
# Test every event separately
# 

runResult = {}
for ev in eventList:
    (r_scores, r_data, ipu) = run_map_test(test_data, [ev], users=set(non_zero_users), K=3, test=False)
    runResult[ev] = {
        "r_scores": r_scores,
        #"r_data": r_data
    }
    print(ev)
    print(r_scores)
    print(len(get_nonzero(r_data)))

In [None]:
#
# All events
#
evl = eventsList
r_scores, r_data, ipu = run_map_test(test_data, evl, users=set(non_zero_users), K=3, test=False)
print(r_scores)
print(len(get_nonzero(r_data)))

In [None]:
#
# All but...
#
for ev in eventList:
    evs = list(eventsList)
    evs.remove(ev)
    (r_scores, r_data, ipu) = run_map_test(test_data, evs, users=set(non_zero_users), K=3, test=False)
    runResult[ev] = {
        "r_scores": r_scores,
        #"r_data": r_data
    }
    print(ev)
    print(r_scores)
    print(len(get_nonzero(r_data)))

In [None]:
# Pairs
for i in range(len(eventsList)):
    for j in range(i + 1, len(eventsList)):
        event_pair = [eventsList[i], eventsList[j]]
        (r_scores, r_data, ipu) = run_map_test(test_data, event_pair,
                                               users=set(non_zero_users), K=3, test=False)
        print(event_pair)
        print(r_scores)
        print(len(get_nonzero(r_data)))