In [4]:
%load_ext autoreload
%autoreload 2

In [2]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import col
import json
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, ArrayType, IntegerType
import os
import numpy as np
import pandas as pd

In [3]:
import sys
sys.path.append("../")

In [6]:
from src.utility.sys_utils import get_spark

In [7]:
spark = get_spark(cores=4)

In [8]:
data_home = "/media/ExtHDD01/recsys_data/yelp_dataset/tidy_data/"

# Filter whole dataset using thresholds

In [9]:
reviews = spark.read.csv(os.path.join(data_home, "ratings.csv"), header=True)

In [21]:
reviews.select(F.countDistinct("user_id")).show()

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                1637138|
+-----------------------+



In [22]:
reviews.select(F.countDistinct("business_id")).show()

+---------------------------+
|count(DISTINCT business_id)|
+---------------------------+
|                     192606|
+---------------------------+



In [23]:
reviews.count()

6685900

In [10]:
user_thresh = 10
item_thresh = 1
dataset_name = "user_{}_item_{}".format(user_thresh, item_thresh)

In [11]:
active_users = (reviews.select("user_id", "business_id")
                .groupBy("user_id")
                .agg(F.count("*").alias("count_business"))
                .filter(col("count_business") >= user_thresh))

In [16]:
active_users.select(F.countDistinct("user_id")).show()

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                 122824|
+-----------------------+



In [13]:
active_items = (reviews.select("user_id", "business_id")
                .groupBy("business_id")
                .agg(F.count("*").alias("count_user"))
                .filter(col("count_user") >= item_thresh))

In [17]:
active_items.select(F.countDistinct("business_id")).show()

+---------------------------+
|count(DISTINCT business_id)|
+---------------------------+
|                     192606|
+---------------------------+



In [18]:
filtered_reviews = (reviews
                    .join(active_users, on="user_id", how="inner")
                    .join(active_items, on="business_id", how="inner")
                    .drop("count_business", "count_user"))

In [19]:
filtered_reviews.show(5)

+--------------------+--------------------+----+-------------------+-----+--------------------+-----+------+
|         business_id|             user_id|cool|               date|funny|           review_id|stars|useful|
+--------------------+--------------------+----+-------------------+-----+--------------------+-----+------+
|--9e1ONYQuAa-CB_R...|3qz_dfwbFwTQeDRzy...|   0|2018-04-26 03:08:43|    0|iBzBKf0EnBxNNfCe9...|  5.0|     0|
|--9e1ONYQuAa-CB_R...|H0tfWQsGjEBuhXD4W...|   0|2014-01-06 19:26:01|    0|UFBG39zEiwJqmos2p...|  5.0|     0|
|--9e1ONYQuAa-CB_R...|R0KVWeN9xR-F6j4z5...|   1|2006-07-15 16:49:37|    0|d3-sC4eUvIdDzz6Kg...|  4.0|     2|
|--9e1ONYQuAa-CB_R...|n9DJHwgYflQ_ms8gB...|   0|2011-07-13 07:26:47|    0|KVacH9suT8a_b5UCk...|  3.0|     0|
|--9e1ONYQuAa-CB_R...|1rlB-SWvDU5TnDnym...|   2|2013-06-13 13:04:34|    3|qO-giyrBOUhY9lzYo...|  5.0|     2|
+--------------------+--------------------+----+-------------------+-----+--------------------+-----+------+
only showing top 5 

In [20]:
filtered_reviews.count()

3486292

In [26]:
windowSpec = Window \
    .partitionBy("user_id") \
    .orderBy(F.desc("date"))

In [27]:
test_reviews = filtered_reviews.withColumn("rank", F.dense_rank().over(windowSpec)).filter(col("rank") == 1).drop("rank")

In [28]:
train_reviews = filtered_reviews.withColumn("rank", F.dense_rank().over(windowSpec)).filter(col("rank") > 1).drop("rank")

In [125]:
train_file_dir, test_file_dir = [os.path.join(data_home, 
                                              "{}/yelp.{}".format(dataset_name, name))
                                for name in ["train", "test"]]

In [29]:
train_reviews.coalesce(1).write.csv(
    train_file_dir, 
    header=True, 
    mode="overwrite")

In [30]:
test_reviews.coalesce(1).write.csv(
    test_file_dir, 
    header=True, 
    mode="overwrite")

In [33]:
import subprocess

In [129]:
def collapse_dir_to_single_file(dir_, file_type="csv"):
    output = subprocess.check_output("ls {}/*.{}".format(dir_, file_type), shell=True).decode("utf-8").strip()
    file_name = dir_.split("/")[-1] + ".{}".format(file_type)
    file_name = os.path.abspath(os.path.join(dir_, "../", file_name))
    print("MOVING {} \nTO {}".format(output, file_name))
    os.system("cat {} > {}".format(output, file_name))
    os.system("rm -r {}".format(dir_))
    print("Done!\n")

In [70]:
for dir_ in [train_file_dir, test_file_dir]:
    collapse_dir_to_single_file(dir_)

MOVING /media/ExtHDD01/recsys_data/yelp_dataset/tidy_data/user_10_item_1/yelp.train/part-00000-b5367686-6102-4cd1-89d4-09ceed6f3658-c000.csv 
TO /media/ExtHDD01/recsys_data/yelp_dataset/tidy_data/user_10_item_1/yelp.train.csv
Done!

MOVING /media/ExtHDD01/recsys_data/yelp_dataset/tidy_data/user_10_item_1/yelp.test/part-00000-6d5a8172-b730-45b1-9484-0d56cbb626d2-c000.csv 
TO /media/ExtHDD01/recsys_data/yelp_dataset/tidy_data/user_10_item_1/yelp.test.csv
Done!



# Generate business side info

In [75]:
business_df = spark.read.json(os.path.join(data_home, "business.json")).filter(~F.isnull(col("categories")))

with open(os.path.join(data_home, "top_categories.json")) as f:
    top_categories = json.loads(f.read())

In [65]:
def find_top_category(str_cats):
    not_found = "NotFound"
    if not str_cats: return not_found
    
    cats = set(map(lambda x: x.strip(), str_cats.split(",")))
    intersections = cats.intersection(set(top_categories))
    
    return list(intersections)[0] if intersections else not_found

In [66]:
find_top_cat_udf = udf(find_top_category, StringType())

business_with_top = business_df.withColumn("top_category", find_top_cat_udf(col("categories")))

In [67]:
business_with_top.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [170]:
business_features = business_with_top.select("business_id", "top_category", "review_count", "stars", "state", "city")

In [171]:
business_side_info_dir = os.path.join(data_home, "side_info/business_features")
business_features.coalesce(1).write.csv(
    business_side_info_dir, 
    header=True,
    mode="overwrite")

In [172]:
collapse_dir_to_single_file(business_side_info_dir)

MOVING /media/ExtHDD01/recsys_data/yelp_dataset/tidy_data/side_info/business_features/part-00000-1e222c16-9f41-4e1c-af85-a4c74a1268ec-c000.csv 
TO /media/ExtHDD01/recsys_data/yelp_dataset/tidy_data/side_info/business_features.csv
Done!



In [174]:
business_name_table = business_with_top.select("business_id", "name")
business_name_dir = os.path.join(data_home, "side_info/business_name_table")
business_name_table.coalesce(1).write.csv(
    business_name_dir,
    header=True,
    mode="overwrite")
collapse_dir_to_single_file(business_name_dir)

MOVING /media/ExtHDD01/recsys_data/yelp_dataset/tidy_data/side_info/business_name_table/part-00000-b7674344-4372-4959-939d-f9e3b7656b9a-c000.csv 
TO /media/ExtHDD01/recsys_data/yelp_dataset/tidy_data/side_info/business_name_table.csv
Done!



# Generate user side information

In [84]:
user_info = spark.read.json(os.path.join(data_home, "user.json"))

In [85]:
selected_cols = user_info.columns
selected_cols.remove("friends")

In [86]:
user_side_info = user_info.select(*selected_cols)

In [87]:
user_side_info.printSchema()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)



In [160]:
user_side_info = user_side_info.toPandas()

In [161]:
user_side_info_comp_related = user_side_info[[col for col in user_side_info.columns if "compliment_" in col]]

cnt_compliment = user_side_info_comp_related.sum(axis=1)

user_side_info = user_side_info[[col for col in user_side_info.columns if "compliment_" not in col]]

user_side_info["cnt_compliment"] = cnt_compliment

del user_side_info["name"]

In [162]:
def vec_time_delta(time_series, anchor_time):
    time_delta = np.array(anchor_time, dtype='datetime64[D]') - np.array(yelping_time, dtype='datetime64[D]') 
    return pd.Series(time_delta.astype(int))

In [165]:
yelping_time = pd.to_datetime(user_side_info.yelping_since)

user_side_info["yelp_days"] = vec_time_delta(yelping_time, "2019-12-31")

del user_side_info["yelping_since"]

user_side_info["elite_years"] = user_side_info.elite.fillna("").apply(lambda s: len(str(s).split(",")) if s else 0)

del user_side_info["elite"]

In [166]:
user_side_info.head()

Unnamed: 0,average_stars,cool,fans,funny,review_count,useful,user_id,cnt_compliment,yelp_days,elite_years
0,4.03,25,5,17,95,84,l6BmjZMeQD3rDxWUbiAiow,8,2275,3
1,3.63,16,4,22,33,48,4XChL029mKr5hydo79Ljxg,3,2504,0
2,3.71,10,0,8,16,28,bc8C_eETBWL0olvFSJJd0w,1,2279,0
3,4.85,14,5,4,17,30,dD0gZpBctWGdWo9WlGuhlA,4,2049,0
4,4.08,665,39,279,361,1114,MM4RJAeH6yuaN8oZDSt0RA,293,2260,4


In [168]:
user_side_info_path = os.path.join(data_home, "side_info/user.del_friends.csv")
user_side_info.to_csv(user_side_info_path, index=False)

# Sample negative test candidates

In [117]:
candidate_num = 200

In [92]:
candidates_group = (business_with_top.groupBy("top_category", "state")
                    .agg(F.collect_list("business_id").alias("candidates")))

In [103]:
test_join_candidates = (business_with_top.select("business_id", "top_category", "state")
.join(test_reviews.select("business_id", "user_id"), on="business_id")
.join(candidates_group, on=["top_category", "state"])
.drop("top_category", "state"))

In [108]:
from pyspark.sql.functions import explode, rand

In [115]:
test_candidates = (test_join_candidates
                    .select( "user_id", "business_id", explode(test_join_candidates.candidates).alias("neg_cand"))
                    .filter(col("business_id") != col("neg_cand"))
                    .withColumn("random_num", rand()))

In [116]:
window = Window.partitionBy("user_id", "business_id").orderBy(F.desc("random_num"))

In [119]:
test_candidates = (test_candidates.withColumn("rank", F.dense_rank().over(window))
                   .filter(col("rank") <= candidate_num))

In [120]:
test_candidates.show()

+--------------------+--------------------+--------------------+------------------+----+
|             user_id|         business_id|            neg_cand|        random_num|rank|
+--------------------+--------------------+--------------------+------------------+----+
|-3i9bhfvrM3F1wsC9...|ghpFh6XpH1TYZhjAG...|6kZ5oQALI1NsP-y6H...| 0.999779010587621|   1|
|-3i9bhfvrM3F1wsC9...|ghpFh6XpH1TYZhjAG...|GnZ0VSACXC0J4Pg2m...|0.9997078350764168|   2|
|-3i9bhfvrM3F1wsC9...|ghpFh6XpH1TYZhjAG...|qLWzcf5m-ynm-JNpM...|0.9996312372452258|   3|
|-3i9bhfvrM3F1wsC9...|ghpFh6XpH1TYZhjAG...|wrWdywCI7uLbDb6v0...|0.9995906019245152|   4|
|-3i9bhfvrM3F1wsC9...|ghpFh6XpH1TYZhjAG...|gJj_X9tpxs038IpYu...|0.9994173900082054|   5|
|-3i9bhfvrM3F1wsC9...|ghpFh6XpH1TYZhjAG...|laAQAiNJs-Tq844Wi...|0.9993643159139213|   6|
|-3i9bhfvrM3F1wsC9...|ghpFh6XpH1TYZhjAG...|vOK_dCDl39hd1IhNH...|0.9992927394741269|   7|
|-3i9bhfvrM3F1wsC9...|ghpFh6XpH1TYZhjAG...|4A_KR6E3ziZvuNVru...|0.9990651730506802|   8|
|-3i9bhfvrM3F1wsC9...

In [122]:
test_ui_with_nested_candidates = (test_candidates.groupBy("user_id", "business_id")
                                                 .agg(F.collect_list("neg_cand").alias("candidates")))

In [123]:
test_ui_with_nested_candidates.show()

+--------------------+--------------------+--------------------+
|             user_id|         business_id|          candidates|
+--------------------+--------------------+--------------------+
|-3i9bhfvrM3F1wsC9...|ghpFh6XpH1TYZhjAG...|[6kZ5oQALI1NsP-y6...|
|-55DgUo52I3zW9Rxk...|M_XPxdozcz7SI0APL...|[CrrgNQdhZH_ZTsju...|
|-9da1xk7zgnnfO1uT...|ZL15NFVdN7hG0NUXp...|[bo0pXRc-eTsXMw1P...|
|-GKEFg_92pp0q842c...|caHv7LRXKNTYLK_Ay...|[Vg4fqLpVxISU9Erc...|
|-LFUxu48rLMWmOVN2...|mNj3O80xDcT9Ltz8o...|[0DIt3Q2UzWXfMSfP...|
|-NnADpikTs6IWM9tb...|2oVJN6hH5OC2gyWAe...|[E2Thv67Hf-l-08Wq...|
|-dErbI4sHSkRz6oxj...|zKDuRdTHXYpRO2VjT...|[quYRn_b9q_0LyOLj...|
|-hnBzgVoRoqLrGVSx...|In8gjAPQ1eE2F2OMY...|[CsbNmQqu9dKFrgcI...|
|-inp099-1gsJF3KPa...|ud3CUNbFOPYgDbxsf...|[uG14cj5EUGssWv6A...|
|-vSUDEQB9j2DaP8_4...|_5inlOVvlrCOKcI77...|[Pg0s3KdDqhKHYva0...|
|0-UxxxWLz1muOzPx2...|Fhd14aofUI-fMBR43...|[1zIEkahfChNyOSu8...|
|00PJZDRvUeg2ZgQQ1...|gnV8-S02xSBHhWFv4...|[dGG9FSznfmefT7tr...|
|01HpNrCSLNWbQG9Fv...|yPg

In [145]:
test_ui_with_candidates = test_candidates.select("user_id","business_id",col("neg_cand").alias("candidate_id"))

In [146]:
test_ui_with_candidates.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- candidate_id: string (nullable = true)



In [11]:
train = pd.read_csv(os.path.join(data_home, dataset_name, "yelp.train.csv"))[["user_id", "business_id", "stars"]]

In [12]:
# remove samples within the train set
join_table = test_ui_with_candidates.merge(train, on=("user_id", "business_id"), how="left")

In [19]:
test_ui_with_candidates

Unnamed: 0,user_id,business_id,candidate_id
0,-3i9bhfvrM3F1wsC9XIB8g,ghpFh6XpH1TYZhjAGdx-xw,6kZ5oQALI1NsP-y6H3cKWQ
1,-3i9bhfvrM3F1wsC9XIB8g,ghpFh6XpH1TYZhjAGdx-xw,GnZ0VSACXC0J4Pg2mxTT1A
2,-3i9bhfvrM3F1wsC9XIB8g,ghpFh6XpH1TYZhjAGdx-xw,qLWzcf5m-ynm-JNpM4E7KQ
3,-3i9bhfvrM3F1wsC9XIB8g,ghpFh6XpH1TYZhjAGdx-xw,wrWdywCI7uLbDb6v07KpKg
4,-3i9bhfvrM3F1wsC9XIB8g,ghpFh6XpH1TYZhjAGdx-xw,gJj_X9tpxs038IpYuCdDvw
...,...,...,...
24394706,zzRq3Ykz8yfr2mcsPhYnEg,zUgDrRtGvK5ZTFlHCsTHwA,ArI4L2dLFa_U4ecu6UIBMA
24394707,zzRq3Ykz8yfr2mcsPhYnEg,zUgDrRtGvK5ZTFlHCsTHwA,3nzK3K-Dfo1L6mpB8FTKkw
24394708,zzRq3Ykz8yfr2mcsPhYnEg,zUgDrRtGvK5ZTFlHCsTHwA,V0GryzKJfygQOK-n8Hw5Mw
24394709,zzRq3Ykz8yfr2mcsPhYnEg,zUgDrRtGvK5ZTFlHCsTHwA,PWjaIjXAXXfs_I4TnAfvtQ


In [20]:
filtered_candidates = join_table[pd.isna(join_table["stars"])][["user_id", "business_id", "candidate_id"]]

In [24]:
filtered_candidates

Unnamed: 0,user_id,business_id,candidate_id
0,-3i9bhfvrM3F1wsC9XIB8g,ghpFh6XpH1TYZhjAGdx-xw,6kZ5oQALI1NsP-y6H3cKWQ
1,-3i9bhfvrM3F1wsC9XIB8g,ghpFh6XpH1TYZhjAGdx-xw,GnZ0VSACXC0J4Pg2mxTT1A
2,-3i9bhfvrM3F1wsC9XIB8g,ghpFh6XpH1TYZhjAGdx-xw,qLWzcf5m-ynm-JNpM4E7KQ
3,-3i9bhfvrM3F1wsC9XIB8g,ghpFh6XpH1TYZhjAGdx-xw,wrWdywCI7uLbDb6v07KpKg
4,-3i9bhfvrM3F1wsC9XIB8g,ghpFh6XpH1TYZhjAGdx-xw,gJj_X9tpxs038IpYuCdDvw
...,...,...,...
24742719,zzRq3Ykz8yfr2mcsPhYnEg,zUgDrRtGvK5ZTFlHCsTHwA,ArI4L2dLFa_U4ecu6UIBMA
24742720,zzRq3Ykz8yfr2mcsPhYnEg,zUgDrRtGvK5ZTFlHCsTHwA,3nzK3K-Dfo1L6mpB8FTKkw
24742721,zzRq3Ykz8yfr2mcsPhYnEg,zUgDrRtGvK5ZTFlHCsTHwA,V0GryzKJfygQOK-n8Hw5Mw
24742722,zzRq3Ykz8yfr2mcsPhYnEg,zUgDrRtGvK5ZTFlHCsTHwA,PWjaIjXAXXfs_I4TnAfvtQ


In [11]:
test_ground_truth = filtered_candidates[["user_id", "business_id"]]

In [12]:
test_ground_truth.drop_duplicates(inplace=True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.


In [17]:
test_ground_truth = test_ground_truth.reset_index(drop=True)

In [19]:
test_ground_truth.rename(columns={"business_id":"candidate_id"}, inplace=True)

In [23]:
test_ground_truth

Unnamed: 0,user_id,candidate_id
0,-3i9bhfvrM3F1wsC9XIB8g,ghpFh6XpH1TYZhjAGdx-xw
1,-9da1xk7zgnnfO1uTVYGkA,ZL15NFVdN7hG0NUXppkU9Q
2,-GKEFg_92pp0q842clS_Jw,caHv7LRXKNTYLK_Ay7RB2w
3,-LFUxu48rLMWmOVN2ANEdQ,mNj3O80xDcT9Ltz8oyzt9g
4,-NnADpikTs6IWM9tbNO3Zg,2oVJN6hH5OC2gyWAe_Taug
...,...,...
114772,zb3IKmIEIaBqMevAiP3o_g,ezh06vIIRSmCAkpLsu8SOw
114773,zgFpxIm2_I86lhtHOdmjsQ,udGrKu1eJ_Qdd6O1PeUrmg
114774,zmHpLrDO7pgOQ6_cnea_Qw,j8yiyt6iTlP_fq8ZS1oR4A
114775,zpypx3tz7Y6kB1Yn64ytuA,zXAH-mQyIj9ErATOQwcaIQ


In [25]:
# combine test ground truth back
candidates = pd.concat([test_ground_truth, filtered_candidates[["user_id", "candidate_id"]]]).reset_index(drop=True)

In [26]:
candidates.to_parquet(os.path.join(data_home, dataset_name, "test_candidates.parquet"))