In [34]:
import json
import pandas as pd
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import lit
from pyspark.sql.functions import lit, col
from pyspark.sql.types import IntegerType

TRAIN_FILE = 'data/full-interactions.csv'
USER_COL = "playlist"
ITEM_COL = "track"

def load_ds():
    ds = spark.read.csv(TRAIN_FILE, header=True, mode="DROPMALFORMED")
    ds = ds.withColumn(ITEM_COL, ds[ITEM_COL].cast(IntegerType()))
    ds = ds.withColumn(USER_COL, ds[USER_COL].cast(IntegerType()))
    ds = ds.withColumn("rating", lit(1))
    ds = ds.na.fill(-1).cache()
    return ds

In [2]:
ds = load_ds()

In [3]:
%%time
als = ALS(rank=30, userCol=USER_COL, itemCol=ITEM_COL, ratingCol="rating", implicitPrefs=True).fit(ds)

CPU times: user 162 ms, sys: 32.4 ms, total: 194 ms
Wall time: 20min 2s


In [5]:
als.save("models/30-factors-als")

In [8]:
MAPPED_CHALLENGE_FILE = "data/challenge_set_mapped.json"
REVERSED_TRACK_MAP_FILE = 'data/mpd/reduced/reversed_track_map.json'

eval_playlists = json.load(open(MAPPED_CHALLENGE_FILE))

[9056, 2624, 1120, 1125, 2585]

In [36]:
tracks = spark.createDataFrame(pd.DataFrame(eval_playlists[1001]['tracks'], columns=[ITEM_COL]))
tracks = tracks.withColumn(ITEM_COL, tracks[ITEM_COL].cast(IntegerType()))

res = als.recommendForItemSubset(tracks, 10)

In [37]:
res.show()

+-----+--------------------+
|track|     recommendations|
+-----+--------------------+
| 1125|[[355618, 0.75653...|
| 9056|[[193008, 0.69078...|
| 2585|[[355618, 0.71151...|
| 1120|[[293601, 0.84426...|
| 2624|[[269486, 0.30876...|
+-----+--------------------+

