# Avec spark

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ALSMatrixFactorisation").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/05/17 08:07:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
from pyspark.sql import functions as F
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# Load the data
item_daily_features = spark.read.csv("data/item_daily_features.csv", header=True, inferSchema=True)
big_matrix = spark.read.csv("data/big_matrix.csv", header=True, inferSchema=True)
small_matrix = spark.read.csv("data/small_matrix.csv", header=True, inferSchema=True)

pop = item_daily_features.groupBy("video_id").agg(
    F.sum("show_cnt").alias("show_cnt"),
    F.sum("play_cnt").alias("play_cnt"),
    F.sum("like_user_num").alias("like_user_num"),
    F.sum("share_cnt").alias("share_cnt"),
    F.sum("comment_cnt").alias("comment_cnt")
)

# Compute the average watch_ratio for each video
video_watch_ratio = big_matrix.groupBy("video_id").agg(F.avg("watch_ratio").alias("watch_ratio"))

pop = pop.join(video_watch_ratio, on="video_id", how="right")
pop = pop.fillna(0)

feature_cols = ["show_cnt", "play_cnt", "like_user_num", "share_cnt", "comment_cnt"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
pop = assembler.transform(pop)
lr = LinearRegression(featuresCol="features", labelCol="watch_ratio")
lr_model = lr.fit(pop)
coeffs = lr_model.coefficients
print("Learned weights:")
for feature, coeff in zip(feature_cols, coeffs):
    print(f"{feature}: {coeff}")

pop = lr_model.transform(pop).withColumnRenamed("prediction", "popularity_score")
pop_df = pop.select("video_id", "popularity_score")
pop_df.describe("popularity_score").show()

                                                                                

25/05/17 12:23:42 WARN Instrumentation: [0d459fca] regParam is zero, which might cause numerical instability and overfitting.


                                                                                

Learned weights:
show_cnt: -5.4461591470311636e-08
play_cnt: 3.653622809495512e-08
like_user_num: -9.252525067534613e-08
share_cnt: -2.2890577091292535e-06
comment_cnt: -1.097557712348296e-07


                                                                                

+-------+-------------------+
|summary|   popularity_score|
+-------+-------------------+
|  count|              10728|
|   mean| 1.2037158926302574|
| stddev|0.17547826921399848|
|    min| -2.678406587847766|
|    max|  1.276259216263778|
+-------+-------------------+



In [None]:
big_matrix.show(5)

+-------+--------+-------------+--------------+--------------------+--------+----------------+------------------+
|user_id|video_id|play_duration|video_duration|                time|    date|       timestamp|       watch_ratio|
+-------+--------+-------------+--------------+--------------------+--------+----------------+------------------+
|      0|    3649|        13838|         10867|2020-07-05 00:08:...|20200705|1.593878903438E9|1.2733965215790926|
|      0|    9598|        13665|         10984|2020-07-05 00:13:...|20200705|1.593879221297E9|1.2440823015294975|
|      0|    5262|          851|          7908|2020-07-05 00:16:...|20200705|1.593879366687E9|0.1076125442589782|
|      0|    1963|          862|          9590|2020-07-05 00:20:...|20200705|1.593879626792E9|0.0898852971845672|
|      0|    8234|          858|         11000|2020-07-05 00:43:...|20200705|1.593880985128E9|             0.078|
+-------+--------+-------------+--------------+--------------------+--------+-----------

In [4]:
min_watch_ratio = big_matrix.agg({"watch_ratio": "min"}).collect()[0][0]
max_watch_ratio = big_matrix.agg({"watch_ratio": "max"}).collect()[0][0]
mean_watch_ratio = big_matrix.agg({"watch_ratio": "avg"}).collect()[0][0]
std_watch_ratio = big_matrix.agg({"watch_ratio": "stddev"}).collect()[0][0]

print(f"Min watch ratio: {min_watch_ratio}")
print(f"Max watch ratio: {max_watch_ratio}")
print(f"Mean watch ratio: {mean_watch_ratio}")
print(f"Std watch ratio: {std_watch_ratio}")

[Stage 14:>                                                         (0 + 8) / 9]

Min watch ratio: 0.0
Max watch ratio: 573.4571428571429
Mean watch ratio: 0.944505920574192
Std watch ratio: 1.6746010308958716


                                                                                

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.functions import log1p

interactions = big_matrix.select(
    col("user_id").cast("int"),
    col("video_id").cast("int"),
    col("watch_ratio").cast("double")
).na.fill(0)

In [None]:
from pyspark.ml.feature import StringIndexer
user_indexer = StringIndexer(inputCol="user_id", outputCol="userIndex").fit(interactions)
interactions = user_indexer.transform(interactions)
item_indexer = StringIndexer(inputCol="video_id", outputCol="videoIndex").fit(interactions)
interactions = item_indexer.transform(interactions)

                                                                                

In [7]:
train = interactions


In [None]:
# Data cleaning and filtering
from pyspark.sql.functions import col, count
from pyspark.sql.window import Window
user_inter_counts = (
    train.groupBy("user_id")
    .agg(count("video_id").alias("user_interactions"))
)
video_inter_counts = (
    train.groupBy("video_id")
    .agg(count("user_id").alias("video_interactions"))
)
quantile_75 = (
    train.approxQuantile("watch_ratio", [0.75], 0.01)[0]
)

filtered_train = (
    train
    .join(
        user_inter_counts.filter(col("user_interactions") >= 248),
        on="user_id",
        how="inner"
    )
    .join(
        video_inter_counts.filter(col("video_interactions") >= 1),
        on="video_id",
        how="inner"
    )
    .filter(
        (col("watch_ratio") > 0) & 
        (col("watch_ratio") <= quantile_75)
    )
    .drop("user_interactions", "video_interactions")
)

print(f"Nb lignes original: {train.count()}")
print(f"Nb lignes filtré: {filtered_train.count()}")
filtered_train.show(5)

                                                                                

Nb lignes original: 12530806


                                                                                

Nb lignes filtré: 9101961


                                                                                

+--------+-------+------------------+---------+----------+
|video_id|user_id|       watch_ratio|userIndex|videoIndex|
+--------+-------+------------------+---------+----------+
|    9900|    148|1.1622798529127154|   1218.0|    1121.0|
|     471|    148|1.0172131147540984|   1218.0|     886.0|
|    6357|    148|0.1304439390110353|   1218.0|     656.0|
|    7880|    148|0.1615575396825397|   1218.0|     469.0|
|    7880|    148|0.1615575396825397|   1218.0|     469.0|
+--------+-------+------------------+---------+----------+
only showing top 5 rows



In [9]:
train = train.withColumn("watch_ratio", log1p(col("watch_ratio")))

In [10]:
test = small_matrix.select(
    col("user_id").cast("int"),
    col("video_id").cast("int"),
    col("watch_ratio").cast("double")
).na.fill(0) 
test = test.withColumn("watch_ratio", log1p(col("watch_ratio")))
user_indexer = StringIndexer(inputCol="user_id", outputCol="userIndex").fit(test)
test = user_indexer.transform(test)
item_indexer = StringIndexer(inputCol="video_id", outputCol="videoIndex").fit(test)
test = item_indexer.transform(test)

                                                                                

In [None]:
train = train.join(pop_df, on="video_id", how="left")
test = test.join(pop_df, on="video_id", how="left")
train = train.fillna({"popularity_score": 0})
test = test.fillna({"popularity_score": 0})

In [None]:
from pyspark.ml.recommendation import ALS
als_log = ALS(
    maxIter=20,
    regParam=0.01,
    rank=10,
    userCol="userIndex",
    itemCol="videoIndex",
    ratingCol="watch_ratio",
    implicitPrefs=False,
    coldStartStrategy="drop"
)
model_log = als_log.fit(filtered_train)

                                                                                

25/05/17 08:08:06 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/05/17 08:08:06 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
25/05/17 08:08:06 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/05/17 08:08:07 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [12]:
predictions = model_log.transform(test)
predictions = predictions.na.drop()

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import expm1
from pyspark.sql.window import Window
from sklearn.metrics import ndcg_score
import numpy as np
import math

def evaluate_model_with_continuous_precision_spark(model, df_test, df_train, user_feat, k=10):
    item_popularity_df = df_train.groupBy("video_id").count().withColumnRenamed("count", "popularity")
    total_items = df_train.select("video_id").distinct().count()

    df_test = df_test.join(user_feat, on="user_id", how="left")

    pred_df = model.transform(test)
    pred_df = predictions.na.drop()
    pred_df = predictions.withColumn("prediction", expm1(col("prediction")))

    # Ranking
    windowSpec = Window.partitionBy("user_id").orderBy(F.desc("prediction"))
    pred_df = pred_df.withColumn("rank", F.row_number().over(windowSpec))
    pred_top_k = pred_df.filter(F.col("rank") <= k)

    # MAE & RMSE
    pred_top_k = pred_top_k.withColumn("abs_error", F.abs(F.col("watch_ratio") - F.col("prediction")))
    pred_top_k = pred_top_k.withColumn("sq_error", (F.col("watch_ratio") - F.col("prediction")) ** 2)

    mae = pred_top_k.groupBy().agg(F.avg("abs_error")).collect()[0][0]
    rmse = math.sqrt(pred_top_k.groupBy().agg(F.avg("sq_error")).collect()[0][0])

    # NDCG
    top_k_collected = pred_top_k.select("user_id", "video_id", "prediction", "watch_ratio").groupBy("user_id").agg(
        F.collect_list("video_id").alias("videos"),
        F.collect_list("prediction").alias("preds"),
        F.collect_list("watch_ratio").alias("truths")
    ).collect()

    ndcg_list = []
    for row in top_k_collected:
        ndcg_list.append(ndcg_score([row["truths"]], [row["preds"]]))

    avg_ndcg = np.mean(ndcg_list)

    # Novelty
    pred_top_k = pred_top_k.join(item_popularity_df, on="video_id", how="left")
    pred_top_k = pred_top_k.withColumn("novelty", -F.log2(F.col("popularity") / total_items + 1e-10))
    novelty = pred_top_k.groupBy().agg(F.avg("novelty")).collect()[0][0]

    # Popularity@k
    pred_top_k = pred_top_k.join(pop_df, on="video_id", how="left")
    avg_popularity = pred_top_k.groupBy().agg(F.avg("popularity_score")).collect()[0][0]
    
    # Résultats
    print(f'MAE@{k}: {mae:.4f}')
    print(f'RMSE@{k}: {rmse:.4f}')
    print(f'NDCG@{k}: {avg_ndcg:.4f}')
    print(f'Novelty@{k}: {novelty:.4f}')
    print(f'Average Popularity@{k}: {avg_popularity:.4f}')

user_feat = spark.read.csv("data/user_features.csv", header=True, inferSchema=True)
evaluate_model_with_continuous_precision_spark(
    model_log, test, train, user_feat, k=10
)

                                                                                

MAE@10: 0.8481
RMSE@10: 0.9492
NDCG@10: 0.8603
Novelty@10: 1.9056
Average Popularity@10: 1.0628


                                                                                

In [None]:
# Get top 10 recommendations for each user
user_recommendations = model_log.recommendForAllUsers(10)
user_recommendations = user_recommendations.withColumn(
    "recommendations",
    F.explode("recommendations")
).select(
    col("userIndex"),
    col("recommendations.videoIndex").alias("videoIndex"),
    col("recommendations.rating").alias("watch_ratio")
)
user_converter = interactions.select("userIndex", "user_id").distinct().withColumnRenamed("user_id", "original_user_id")
item_converter = interactions.select("videoIndex", "video_id").distinct().withColumnRenamed("video_id", "original_item_id")
user_recommendations = user_recommendations.join(
    user_converter.select(col("userIndex"), col("original_user_id")),
    on="userIndex",
    how="inner"
).join(
    item_converter.select(col("videoIndex"), col("original_item_id")),
    on="videoIndex",
    how="inner"
)
user_recommendations.show(10)

                                                                                

+----------+---------+-----------+----------------+----------------+
|videoIndex|userIndex|watch_ratio|original_user_id|original_item_id|
+----------+---------+-----------+----------------+----------------+
|      9719|      496|  1.3478788|            3389|            6366|
|     10328|      496|   1.284718|            3389|            4003|
|      9985|      496|   1.244511|            3389|            1754|
|      9081|      496|  1.2115295|            3389|           10355|
|      9738|      496|  1.1401483|            3389|            9081|
|      9341|      496|  1.1370603|            3389|            1451|
|      8007|      496|  1.1298721|            3389|            6097|
|      8371|      496|  1.1215873|            3389|            9948|
|      9371|      496|  1.1193432|            3389|            6200|
|      7998|      496|  1.1104711|            3389|            3480|
+----------+---------+-----------+----------------+----------------+
only showing top 10 rows

