### Imports

In [15]:
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, avg, count, array, array_contains, row_number, size
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, FloatType
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.evaluation import RankingMetrics

## Setup

### Load spark data

In [16]:
try:
    spark.stop()
except:
    print('No spark is running')

warehouse = "/user/team20/project/hive/warehouse"
team = "team20"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()
spark.sql("USE team20_projectdb")

DataFrame[]

## Train

In [17]:
# read synopsis embeddings
schema = StructType([
    StructField('anime_id',
                IntegerType(), False),
    StructField('synopsis_emb_str',
                StringType(), False)
])
synopsis_df = spark.read.csv(
    "/user/team20/project/data/synopsis_embs.csv", 
    header=True, 
    mode="DROPMALFORMED",
    schema=schema
)


def read_embedding(emb: str):
    emb = [float(i) for i in emb[1:-1].split(', ')]
    return emb


read_embedding_udf = udf(read_embedding, ArrayType(FloatType()))

synopsis_df = synopsis_df.withColumn('synopsis_emb', read_embedding_udf(synopsis_df['synopsis_emb_str']))
synopsis_df = synopsis_df.drop('synopsis_emb_str')
synopsis_df.show(5)

+--------+--------------------+
|anime_id|        synopsis_emb|
+--------+--------------------+
|   54252|[0.030607548, 0.0...|
|   52976|[-0.06079528, 0.0...|
|   51595|[0.04106808, 0.06...|
|   51048|[-0.07514876, 0.0...|
|   50953|[-0.07117992, 0.0...|
+--------+--------------------+
only showing top 5 rows



In [18]:
# read train user_scores
scores_df = spark.read.json("/user/team20/project/data/train.json")
scores_df = scores_df.select(
    scores_df["user_id"].cast(IntegerType()),
    scores_df["anime_id"].cast(IntegerType()),
    scores_df["rating"].cast(IntegerType())
)
scores_df.show(5)

+-------+--------+------+
|user_id|anime_id|rating|
+-------+--------+------+
|      1|     576|     6|
|      1|     570|     8|
|      1|     565|     8|
|      1|     558|     8|
|      1|     523|     8|
+-------+--------+------+
only showing top 5 rows



### Calculate average embedding per user

In [8]:
# join tables
user_df = synopsis_df.join(scores_df, "anime_id")

# split array
emb_size = user_df.selectExpr("max(size(synopsis_emb))").collect()[0][0]
col_names = []
for i in range(emb_size):
    col_name = f"synopsis_emb_{i}"
    col_names.append(col_name)
    user_df = user_df.withColumn(col_name, col("synopsis_emb").getItem(i))
user_df.drop('synopsis_emb')

# average features
user_df = user_df.groupBy("user_id").agg(*[avg(col_name).alias(col_name) for col_name in col_names])

# collect array
assembler = VectorAssembler(inputCols=col_names, outputCol="synopsis_emb_agr")
user_df = assembler.transform(user_df)
user_df = user_df.drop(*col_names)

user_df.show(5)

+-------+--------------------+
|user_id|    synopsis_emb_agr|
+-------+--------------------+
|   1342|[-0.0619223169785...|
|   1580|[-0.0579523185087...|
|   1591|[-0.0421952999553...|
|   1645|[-0.0551495003070...|
|   1829|[-0.0548918120061...|
+-------+--------------------+
only showing top 5 rows



### Save user embeddings as json

In [None]:
user_df.write.parquet("/user/team20/project/models/model2.parquet")

In [30]:
user_df.count()

214409

## Test

### Load model

In [19]:
user_df = spark.read.parquet("/user/team20/project/models/model2.parquet")
user_df.show(5)

+-------+--------------------+
|user_id|    synopsis_emb_agr|
+-------+--------------------+
|    595|[-0.0563016329524...|
|   1188|[-0.0589005859801...|
|   1524|[-0.0545205731571...|
|   1792|[-0.0568325522336...|
|   2285|[-0.0613093246374...|
+-------+--------------------+
only showing top 5 rows



### Load test data

In [20]:
test_scores_df = spark.read.json("/user/team20/project/data/test.json")
test_scores_df = test_scores_df.select(
    test_scores_df["user_id"].cast(IntegerType()),
    test_scores_df["anime_id"].cast(IntegerType()),
    test_scores_df["rating"].cast(IntegerType())
)
# test_scores_df.groupBy("user_id").agg(count('rating')).agg({'count(rating)': 'avg'}).show()
test_scores_df.show(5)

+-------+--------+------+
|user_id|anime_id|rating|
+-------+--------+------+
|      1|   32281|     9|
|      1|   30276|     8|
|      1|   25777|     8|
|      1|   16870|     7|
|      1|   16498|     9|
+-------+--------+------+
only showing top 5 rows



In [21]:
test_rec_list = test_scores_df\
    .select("user_id", "anime_id", "rating")\
    .orderBy("user_id", "rating", ascending=False)\
    .groupBy("user_id")\
    .agg(F.collect_list("anime_id").alias("gt"))
test_rec_list.show()

+-------+--------------------+
|user_id|                  gt|
+-------+--------------------+
|      1|[1698, 849, 32281...|
|      9|[317, 304, 392, 3...|
|     20|[11061, 9253, 364...|
|     47|[6351, 4059, 2963...|
|     53|[48411, 42847, 41...|
|     91|[995, 4053, 2752,...|
|    108|[4177, 3572, 3588...|
|    111|[529, 433, 390, 2...|
|    112|[38524, 38000, 35...|
|    120|[34096, 16498, 11...|
|    133|[10408, 32729, 12...|
|    157|[30276, 29803, 28...|
|    163|[22199, 29809, 27...|
|    169|[5060, 2685, 883,...|
|    185|[8841, 667, 37435...|
|    212|[2236, 1575, 1142...|
|    222|[2025, 1818, 1726...|
|    223|[1133, 1361, 1290...|
|    224|[5258, 5114, 1818...|
|    225|[19291, 17873, 17...|
+-------+--------------------+
only showing top 20 rows



### Predict 100 anime titles for each user

In [22]:
train_rec_list = scores_df\
    .select("user_id", "anime_id", "rating")\
    .orderBy("user_id", "rating", ascending=False)\
    .groupBy("user_id")\
    .agg(F.collect_list("anime_id").alias("watched"))
train_rec_list = train_rec_list.join(
    user_df,
    train_rec_list['user_id'] == user_df['user_id'],
    'inner'
).drop(user_df.user_id)
train_rec_list = train_rec_list\
    .withColumn('watched_count', size('watched'))\
    .orderBy('watched_count', ascending=False)
# train_rec_list.show(5)

In [23]:
# pick top n users based on number of scores to optimize test
n = 1000
top_n_user_embs = train_rec_list.limit(n)
top_n_user_embs.show()

+-------+--------------------+--------------------+-------------+
|user_id|             watched|    synopsis_emb_agr|watched_count|
+-------+--------------------+--------------------+-------------+
|   6140|[28297, 24277, 23...|[-0.0573037852871...|         2091|
|  97561|[16706, 12069, 11...|[-0.0554656165558...|         2023|
|  65697|[32281, 32188, 32...|[-0.0558930915724...|         2014|
|   2276|[13409, 11239, 11...|[-0.0552982030775...|         2008|
| 520489|[11741, 4181, 290...|[-0.0525939000144...|         2005|
| 114065|[24893, 19775, 16...|[-0.0540419392271...|         1993|
|   6486|[7311, 4181, 4155...|[-0.0527306803572...|         1963|
| 470233|[30746, 30355, 29...|[-0.0492090633745...|         1956|
| 317041|[19647, 9741, 525...|[-0.0590964738393...|         1952|
| 405721|[849, 8675, 6547,...|[-0.0581922416902...|         1930|
|1130069|[27821, 25313, 15...|[-0.0606217867904...|         1887|
| 312369|[7785, 820, 578, ...|[-0.0500940134263...|         1882|
|   6264|[

In [27]:
def cosine_similarity(v1, v2):
    # assert False, f"{type(v1)} {type(v2)}"
    a, b = np.array(v1), np.array(v2)
    # check for nulls
    if not np.any(a) or not np.any(b):
        return -1.0

    cos_sim = np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    return float(cos_sim)


pred_df = top_n_user_embs.crossJoin(synopsis_df)
# filter watched animes
pred_df = pred_df.filter(~array_contains(pred_df.watched, pred_df.anime_id))
pred_df.show(5)

+-------+--------------------+--------------------+-------------+--------+--------------------+
|user_id|             watched|    synopsis_emb_agr|watched_count|anime_id|        synopsis_emb|
+-------+--------------------+--------------------+-------------+--------+--------------------+
|   6140|[28297, 24277, 23...|[-0.0573037852871...|         2091|   54252|[0.030607548, 0.0...|
|  97561|[16706, 12069, 11...|[-0.0554656165558...|         2023|   54252|[0.030607548, 0.0...|
|  65697|[32281, 32188, 32...|[-0.0558930915724...|         2014|   54252|[0.030607548, 0.0...|
|   2276|[13409, 11239, 11...|[-0.0552982030775...|         2008|   54252|[0.030607548, 0.0...|
| 520489|[11741, 4181, 290...|[-0.0525939000144...|         2005|   54252|[0.030607548, 0.0...|
+-------+--------------------+--------------------+-------------+--------+--------------------+
only showing top 5 rows



### Pipeline

In [30]:
# find similarity
cosine_similarity_udf = udf(cosine_similarity, FloatType())
pred_df = pred_df.withColumn(
    "similarity",
    cosine_similarity_udf(pred_df.synopsis_emb_agr, pred_df.synopsis_emb)
).select("user_id", "anime_id", "similarity")
pred_df.show(5)

+-------+--------+----------+
|user_id|anime_id|similarity|
+-------+--------+----------+
|   6140|   54252| 0.5956438|
|  97561|   54252| 0.6026298|
|  65697|   54252| 0.6132957|
|   2276|   54252|0.60624796|
| 520489|   54252|0.62517965|
+-------+--------+----------+
only showing top 5 rows



In [31]:
top_n_user_embs.count(), synopsis_df.count(), pred_df.count()

(100, 19892, 1829101)

In [32]:
# pick top 50 anime for each user
k = 50

window_spec = Window.partitionBy("user_id").orderBy(col("similarity").desc())
# Add a row number column to each partition
pred_df_ranked = pred_df.withColumn("rank", row_number().over(window_spec))
# Filter out rows with row_number <= 10 to get the top 10 values for each user_id
pred_df_top_k = pred_df_ranked.filter(col("rank") <= k)
pred_df_top_k = pred_df_top_k\
    .groupby('user_id')\
    .agg(F.collect_list("anime_id").alias("recommendations"))
pred_df_top_k.show(10)

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|   6140|[51219, 1272, 345...|
|  13101|[16245, 17539, 48...|
|  20550|[51219, 34561, 24...|
|  21941|[51219, 1272, 345...|
|  25018|[51219, 1272, 240...|
|  33547|[51219, 1272, 317...|
|  50470|[51219, 1272, 317...|
|  56107|[51219, 1272, 497...|
|  58125|[51219, 1272, 982...|
|  70886|[17539, 16245, 33...|
+-------+--------------------+
only showing top 10 rows



### Evaluate

In [33]:
recommendations = test_rec_list.join(
    pred_df_top_k,
    test_rec_list.user_id == pred_df_top_k.user_id,
    'inner'
).drop(pred_df_top_k.user_id)
recommendations.show(10)

+-------+--------------------+--------------------+
|user_id|                  gt|     recommendations|
+-------+--------------------+--------------------+
|   6140|[40028, 38524, 36...|[51219, 1272, 345...|
|  13101|[43608, 37965, 32...|[16245, 17539, 48...|
|  20550|[50265, 48583, 47...|[51219, 34561, 24...|
|  21941|[51064, 50265, 49...|[51219, 1272, 345...|
|  25018|[44087, 40716, 28...|[51219, 1272, 240...|
|  33547|[33985, 32664, 31...|[51219, 1272, 317...|
|  50470|[42203, 40815, 40...|[51219, 1272, 317...|
|  56107|[34822, 52034, 50...|[51219, 1272, 497...|
|  58125|[36720, 35330, 32...|[51219, 1272, 982...|
|  70886|[32281, 28957, 24...|[17539, 16245, 33...|
+-------+--------------------+--------------------+
only showing top 10 rows



In [35]:
# save recommendations
recommendations.write.parquet("/user/team20/project/output/model2_predictions")

In [37]:
# Define the schema for the DataFrame
schema = StructType([
    StructField("model", StringType(), True),
    StructField("precision@10", FloatType(), True),
    StructField("recall@10", FloatType(), True),
    StructField("ndcg@10", FloatType(), True),
    StructField("precision@5", FloatType(), True),
    StructField("recall@5", FloatType(), True),
    StructField("ndcg@5", FloatType(), True)
])

metrics = RankingMetrics(recommendations.select("gt", "recommendations").rdd)
# Create a list of tuples with sample data
data = [(
    "avg_synopsis_emb",
    metrics.precisionAt(10),
    metrics.recallAt(10),
    metrics.ndcgAt(10),
    metrics.precisionAt(5),
    metrics.recallAt(5),
    metrics.ndcgAt(5)
)]

# Create a DataFrame with the defined schema and data
df = spark.createDataFrame(data, schema)
df.show()



+------------+---------+-----------+-----------+--------+-----------+
|precision@10|recall@10|    ndcg@10|precision@5|recall@5|     ndcg@5|
+------------+---------+-----------+-----------+--------+-----------+
|       0.011|   0.0022|0.010751176|      0.014|  0.0014|0.012328672|
+------------+---------+-----------+-----------+--------+-----------+



In [40]:
df.write.parquet("/user/team20/project/output/evaluation", mode="append")

In [50]:
with open("/home/team20/team20/bigdata-final-project-iu-2024.git/output/evaluation.csv", "a") as f:
    f.write(','.join([str(i) for i in data[0]]))
    f.write("\n")