In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, udf, collect_list, expr, avg
from pyspark.sql.functions import row_number
from pyspark.ml.evaluation   import RegressionEvaluator
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql.types import ArrayType, StringType, DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.stat import Summarizer
from pyspark import SparkContext
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import expr, col, arrays_zip, sum as _sum, count as _count
import numpy as np

[Stage 27:=>                                                       (1 + 8) / 35]

#### Introduction

Below, I attempt to recreate my project 4 TF-IDF content-based recommendation system on Goodreads data. This time, rather than limiting myself to one genre (comics / graphic novels), I leverage the distributed computing power of pyspark to bring in all available data across genres.

First, I can initiate spark (my computer has 8GB of RAM):

In [3]:
spark = SparkSession.builder \
    .appName("DistributedTFIDFRecs") \
    .config("spark.driver.memory","8g") \
    .config("spark.executor.memory","8g") \
    .config("spark.sql.ansi.enabled","false") \
    .getOrCreate()

sc = SparkContext.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/15 02:00:16 WARN Utils: Your hostname, Jacobs-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.7 instead (on interface en0)
25/07/15 02:00:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/15 02:00:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 52328)
Traceback (most recent call last):
  File "/Users/jacobsilver/miniconda3/lib/python3.12/socketserver.py", line 318, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/U

Next, I load my parquet files. These were created from a json and csv file from this data ([https://cseweb.ucsd.edu/~jmcauley/datasets/goodreads.html]) in the notebook file build_parquets.ipynb sharing this folder. The interactions data required a bit of column fixing:

In [4]:
# Load your parquet files
df_b = spark.read.parquet("data/goodreads_books.parquet")

#df_i (issue with column headers)
df_i = spark.read.parquet("data/goodreads_interactions.parquet")
df_i= df_i.toDF(
    "user_id",
    "book_id",
    "is_read",
    "rating",
    "is_reviewed")

As with project 4, I'll trim the data for noise management and operability, though it remains far larger than anything I've worked with previously:

In [5]:
#identify popular items
popular = df_i.groupBy("book_id") \
             .count() \
             .filter("count >= 50") \
             .select("book_id")

#restrict interactions to those popular items
df_i_pop = df_i.join(popular, on="book_id")

#down‐sample the interactions
df_i2 = df_i_pop.sample(fraction=0.01, seed=905)

#pplit that sample into train/test
train, test = df_i2.randomSplit([0.8, 0.2], seed=905)

#now restrict your book metadata to exactly those sampled books
sampled_books = df_i2.select("book_id").distinct()
df_b = df_b.join(sampled_books, on="book_id")

Now I can build a TF-IDF vectorization pipeline:

In [6]:
tokenizer = Tokenizer(inputCol="description", outputCol="words")
hashTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=2048)
idf = IDF(inputCol="rawFeatures", outputCol="features")
pipeline = Pipeline(stages=[tokenizer, hashTF, idf])

model = pipeline.fit(df_b)
df_b_feat  = model.transform(df_b).select("book_id", "features")

#join interactions → weighted vectors
df_ui = train.join(df_b_feat, on="book_id") \
            .select("user_id","book_id","rating","features")

def weighted(v, r):
    arr = v.toArray()
    return Vectors.dense(arr * float(r))

weighted_udf = udf(weighted, VectorUDT())
df_w = df_ui.withColumn("weighted", weighted_udf(col("features"), col("rating")))

#build user profiles via DataFrame API
summ = Summarizer.metrics("sum")
agg = (
    df_w
    .groupBy("user_id")
    .agg(summ.summary(df_w.weighted).alias("stats"))
)
user_profiles = (
    agg
    .select("user_id", col("stats")["sum"].alias("user_profile"))
    .persist()
)

#collect & broadcast as numpy arrays
items = df_b_feat.collect()
item_map = { r.book_id: r.features.toArray() for r in items }
item_bcast = sc.broadcast(item_map)

#robust recommendation UDF
def recommend_np(profile, k=10):
    ups = profile.toArray()
    nu  = np.linalg.norm(ups)
    if nu == 0:
        return []                  # nothing to recommend
    sims = []
    for bid, arr in item_bcast.value.items():
        nv = np.linalg.norm(arr)
        if nv == 0:
            continue              # skip zero‐vector items
        sims.append((bid, float(np.dot(ups, arr)/(nu*nv))))
    sims.sort(key=lambda x: x[1], reverse=True)
    return [str(bid) for bid,_ in sims[:k]]

rec_udf = udf(recommend_np, ArrayType(StringType()))
recs = user_profiles.withColumn("recommendations", rec_udf(col("user_profile")))

                                                                                

#### Conclusion

In Project 4, it was not really even possible for me to load more than one genre's worth of data, and it happened to be one of the smallest datasets available. I actually abandoned subject areas I had greater interest in (such as history/biography) due to an inability to process any results. In that sense, once I'm dealing with a few million ratings and 10s of thousands of items/users, I believe it's essentially a necessity to move toward a distributed environment, even if I then intend to filter the data down.

That said, I wish that pyspark on a local Jupyter notebook allowed for easier usage of traditional pandas syntax, which offers a great deal of analytical flexibility. In the future, I would try to achieve the best of both worlds by borrowing computational power from the cloud, such as via Microsoft Azure or another online ML platform, while still retaining the ability to use any python package of my choosing. While I could ultimately build a recommendation system in this format, evaluation, and even something as relatively simple as EDA, became a challenging and laborious effort.