In [1]:
!pip install pyspark



In [9]:
BUCKET = "steam-reco-team-yw1204"
DT = "dt=1205"
RAW_BASE = f"s3a://{BUCKET}/raw/steam_kaggle/{DT}"

print(RAW_BASE)

s3a://steam-reco-team-yw1204/raw/steam_kaggle/dt=1205


In [10]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("SteamReco")
    .config(
        "spark.jars.packages",
        "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262"
    )
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com")
    .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID)
    .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
    .config(
        "spark.hadoop.fs.s3a.aws.credentials.provider",
        "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
    )
    .config("spark.sql.shuffle.partitions", "64")
    .getOrCreate()
)

spark

In [None]:
reviews = spark.read.parquet("s3a://steam-reco-team-yw1204/curated/reviews_clean/v=1")
games   = spark.read.parquet("s3a://steam-reco-team-yw1204/curated/games_clean/v=1")
users   = spark.read.parquet("s3a://steam-reco-team-yw1204/curated/users_clean/v=1")

reviews.printSchema()
games.printSchema()
users.printSchema()

# Collaborative Filtering (CF) with ALS

In [None]:
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer

ratings_als = (
    reviews
    .select("user_id", "app_id", "is_recommended", "playtime_hours")
    .withColumn(
        "rating",
        1.0 * F.col("is_recommended").cast("float") +
        0.5 * F.log1p(F.col("playtime_hours"))
    )
)

user_indexer = StringIndexer(inputCol="user_id", outputCol="user_idx")
item_indexer = StringIndexer(inputCol="app_id", outputCol="item_idx")

ratings_indexed = user_indexer.fit(ratings_als).transform(ratings_als)
ratings_indexed = item_indexer.fit(ratings_indexed).transform(ratings_indexed)

ratings_indexed = ratings_indexed.select(
    "user_id", "app_id", "user_idx", "item_idx", "rating"
)

ratings_indexed.cache()


In [None]:
from pyspark.ml.recommendation import ALS

als = ALS(
    userCol="user_idx",
    itemCol="item_idx",
    ratingCol="rating",
    implicitPrefs=True,
    rank=64,
    maxIter=10,
    regParam=0.05,
    coldStartStrategy="drop",
    nonnegative=True,
)

als_model = als.fit(ratings_indexed)


In [None]:
# Top 200 recommendations directly from ALS
cf_recs = als_model.recommendForAllUsers(200)


In [None]:
def get_cf_for_user(user_id):
    user_idx_val = ratings_indexed.filter(F.col("user_id")==user_id).select("user_idx").first()
    if user_idx_val is None:
        return None
    user_idx_val = user_idx_val["user_idx"]

    cf_df = cf_recs \
        .filter(F.col("user_idx") == user_idx_val) \
        .select(F.explode("recommendations").alias("rec")) \
        .select(
            F.col("rec.item_idx").alias("item_idx"),
            F.col("rec.rating").alias("cf_score")
        )

    # Map item_idx back to app_id
    item_map = ratings_indexed.select("item_idx","app_id").dropDuplicates()
    cf_df = cf_df.join(item_map, "item_idx", "left").select("app_id","cf_score")

    return cf_df


# Content-Based Filtering (CBF)

In [None]:
from pyspark.sql import functions as F
from pyspark.ml.feature import RegexTokenizer, HashingTF, IDF, CountVectorizer, VectorAssembler
from pyspark.ml import Pipeline

games_clean2 = (
    games
    .withColumn("description_clean", F.lower(F.col("description")))
    .withColumn("tags_clean", F.col("tags"))
    .na.fill({"description_clean": ""})
)

tokenizer = RegexTokenizer(inputCol="description_clean", outputCol="desc_words", pattern="\\W+")
hashTF = HashingTF(inputCol="desc_words", outputCol="desc_tf", numFeatures=4096)
idf = IDF(inputCol="desc_tf", outputCol="desc_tfidf")
tag_cv = CountVectorizer(inputCol="tags_clean", outputCol="tags_vec")

pipeline = Pipeline(stages=[tokenizer, hashTF, idf, tag_cv])
model = pipeline.fit(games_clean2)
games_vec = model.transform(games_clean2)

assembler = VectorAssembler(
    inputCols=["desc_tfidf", "tags_vec"],
    outputCol="content_vec"
)

games_final = assembler.transform(games_vec).select("app_id","title","content_vec")
games_final.cache()


In [None]:
import numpy as np
from pyspark.sql.types import DoubleType
from pyspark.ml.linalg import SparseVector, DenseVector

def avg_vec(vs):
    arrs = []
    for v in vs:
        if isinstance(v, SparseVector):
            arrs.append(v.toArray())
        else:
            arrs.append(v)
    return DenseVector(np.mean(arrs, axis=0))

avg_udf = F.udf(avg_vec)


In [None]:
def get_user_profile(user_id):
    played = ratings_indexed.filter(F.col("user_id")==user_id).select("app_id")
    if played.count()==0:
        return None

    game_vecs = played.join(games_final, "app_id").select("content_vec")
    profile = game_vecs.agg(avg_udf(F.collect_list("content_vec")).alias("profile_vec")).first()
    return profile["profile_vec"]


In [None]:
from pyspark.sql.function import udf

def cosine(a, b):
    import numpy as np
    if isinstance(a, SparseVector): a = a.toArray()
    if isinstance(b, SparseVector): b = b.toArray()
    return float(np.dot(a,b)/(np.linalg.norm(a)*np.linalg.norm(b)+1e-9))

cosine_udf = F.udf(cosine, DoubleType())


In [None]:
def compute_cb_scores(user_profile, candidate_df):
    return (
        candidate_df
        .join(games_final, "app_id")
        .withColumn("cb_score", cosine_udf(F.lit(user_profile), "content_vec"))
    )


# Hybrid Recommender

In [None]:
from pyspark.sql import Window

def recommend_games(user_id, top_n=20, alpha_cf=0.4, alpha_cb=0.6):

    cf_df = get_cf_for_user(user_id)
    if cf_df is None or cf_df.count()==0:
        print("No CF data for this user")
        return None

    user_profile = get_user_profile(user_id)
    if user_profile is None:
        print("User has no content profile")
        return None

    played = ratings_indexed.filter(F.col("user_id")==user_id)\
                            .select("app_id").distinct()

    cf_filtered = cf_df.join(played, "app_id", "leftanti")

    cb_scored = compute_cb_scores(user_profile, cf_filtered)

    w = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

    hybrid = (
        cb_scored
        .withColumn("cf_norm", F.col("cf_score")/F.max("cf_score").over(w))
        .withColumn("cb_norm", F.col("cb_score")/F.max("cb_score").over(w))
        .withColumn("hybrid_score",
             alpha_cf*F.col("cf_norm") + alpha_cb*F.col("cb_norm")
        )
        .join(games.select("app_id","title","tags"), "app_id", "left")
        .orderBy(F.col("hybrid_score").desc())
        .limit(top_n)
    )

    hybrid.show(truncate=False)
    return hybrid
