In [1]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel
from pyspark.mllib.evaluation import RankingMetrics

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1606873502644_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
train_df = (sc.textFile("s3://millionsongs10605/EvalDataYear1MSDWebsite/year1_test_triplets_visible.txt")
        .map(lambda l: l.split("\t"))
        .toDF(["user_id", "song_id", "count"]))

test_df = (sc.textFile("s3://millionsongs10605/EvalDataYear1MSDWebsite/year1_test_triplets_hidden.txt")
        .map(lambda l: l.split("\t"))
        .toDF(["user_id", "song_id", "count"]))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
user_indexer = StringIndexer(
    inputCol="user_id",
    outputCol="user_idx",
    handleInvalid="skip")

song_indexer = StringIndexer(
    inputCol="song_id",
    outputCol="song_idx",
    handleInvalid="skip")

pipeline = Pipeline(stages=[user_indexer, song_indexer])

idx_model = pipeline.fit(train_df)

train_df = idx_model.transform(train_df)
test_df = idx_model.transform(test_df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
def processForALS(df):
    return (df
            .withColumn("user_idx", F.col("user_idx").cast(IntegerType()))
            .withColumn("song_idx", F.col("song_idx").cast(IntegerType()))
            .withColumn("count", F.col("count").cast(IntegerType()))
            .select("user_idx", "song_idx", "count"))

train_cnts = processForALS(train_df).cache()
test_cnts = processForALS(test_df).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
ground_truths = (test_cnts
                .orderBy("count", ascending=False)
                .groupBy("user_idx")
                .agg(F.collect_list("song_idx").alias("ground_truths"))
                .cache())

print(ground_truths.show(3))

def evaluate_params(train_cnts, ground_truths, rank, reg_param, iters=5):
    rec_model = ALS.train(
        train_cnts, rank, lambda_=reg_param,
        iterations=iters, nonnegative=True)
    
    preds_df = (rec_model
             .recommendProductsForUsers(10)
             .mapValues(lambda recs: [rec.product for rec in recs])
             .toDF(["user_idx", "preds"]))
    
    compare = (ground_truths
           .join(preds_df, on="user_idx", how="inner")
           .select(["preds", "ground_truths"])
           .rdd
           .map(lambda r: (r.preds, r.ground_truths)))
    
    metrics = RankingMetrics(compare)
    return rec_model, metrics

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------------------+
|user_idx|       ground_truths|
+--------+--------------------+
|     148|[7815, 1369, 1154...|
|     463|[20243, 30903, 13...|
|     471|[15, 70, 2142, 45...|
+--------+--------------------+
only showing top 3 rows

None

In [17]:
from itertools import product

ranks = [5, 10, 15, 20]
regs = [0.1]


for (rank, reg_param) in product(ranks, regs):
    print(f"Training Rank {rank}, Reg {reg_param}")
    rec_model, metrics = evaluate_params(train_cnts, ground_truths, rank, reg_param)
#     all_models.append(rec_model)
#     all_metrics.append(metrics)
    
    print(f"Rank {rank}, Reg {reg_param}: {metrics.precisionAt(10)}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Training Rank 5, Reg 0.1
Rank 5, Reg 0.1: 2.4002640290432015e-05
Training Rank 10, Reg 0.1
Rank 10, Reg 0.1: 3.300363039934402e-05
Training Rank 15, Reg 0.1
Rank 15, Reg 0.1: 2.2002420266229282e-05
Training Rank 20, Reg 0.1
Rank 20, Reg 0.1: 2.7002970326736017e-05

In [18]:
ranks = [25, 30]
regs = [0.1]


for (rank, reg_param) in product(ranks, regs):
    print(f"Training Rank {rank}, Reg {reg_param}")
    rec_model, metrics = evaluate_params(train_cnts, ground_truths, rank, reg_param)
#     all_models.append(rec_model)
#     all_metrics.append(metrics)
    
    print(f"Rank {rank}, Reg {reg_param}: {metrics.precisionAt(10)}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Training Rank 25, Reg 0.1
Rank 25, Reg 0.1: 2.5002750302533345e-05
Training Rank 30, Reg 0.1
Rank 30, Reg 0.1: 2.800308033883736e-05

In [8]:
train_songs = train_cnts.select("song_idx").distinct().collect()
test_songs = test_cnts.select("song_idx").distinct().collect()
len(train_songs), len(test_songs), len(set(test_songs) & set(train_songs))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(157251, 111578, 111578)

In [9]:
set(test_songs) <= set(train_songs)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

True

In [12]:
test_cnts.orderBy("count", ascending=False).show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------+-----+
|user_idx|song_idx|count|
+--------+--------+-----+
|   46282|   11221|    1|
|   24906|    1805|    1|
|   46282|  146926|    1|
|   46282|    3313|    1|
|   24906|       0|    1|
|   60210|   35191|    1|
|   60210|    1661|    1|
|   46282|      17|    1|
|   60210|     415|    1|
|   24906|    3848|    1|
+--------+--------+-----+
only showing top 10 rows