# 🍽️ Hybrid Restaurant Recommendation System using PySpark


This notebook implements a **hybrid recommendation system** using the Yelp dataset filtered for Philadelphia.  
We combine collaborative filtering (ALS model) with content-based filtering (TF-IDF on business categories), enhanced by cosine similarity for hybrid re-ranking.



| Step              | Purpose                                                     |
|-------------------|-------------------------------------------------------------|
| **ALS Model**     | Learns collaborative signals (user–business preferences)    |
| **TF-IDF**        | Captures business similarity based on categories            |
| **Cosine Similarity** | Matches recommended businesses with user's taste       |
| **Hybrid Scoring**| Combines both to boost recommendation relevance             |


In [1]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, collect_list, concat_ws, avg
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF, Normalizer, StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml import Pipeline
import pyspark.sql.functions as F
import utils.config as config
import os


os.environ['SPARK_HOME'] = config.APP
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python3'

# Spark config 
spark = SparkSession.builder \
    .appName("HybridRecommender") \
    .config("spark.python.worker.reuse", "false") \
    .config("spark.network.timeout", "800s") \
    .getOrCreate()


# Load data

review_df = spark.read.json(config.PHILADELPHIA, multiLine=True)
user_df = spark.read.json(config.USER, multiLine=True)


In [2]:

# Extract business metadata
business_df = review_df.select("business_id", "categories", "business_stars").dropna(subset=["categories"])
business_df = business_df.withColumn("category", explode(split(col("categories"), ",\\s*")))

business_grouped = business_df.groupBy("business_id", "business_stars") \
    .agg(concat_ws(" ", collect_list("category")).alias("category_text"))

# TF-IDF pipeline
tokenizer = Tokenizer(inputCol="category_text", outputCol="words")
vectorizer = CountVectorizer(inputCol="words", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="tfidf_features")
normalizer = Normalizer(inputCol="tfidf_features", outputCol="norm_features")

pipeline = Pipeline(stages=[tokenizer, vectorizer, idf, normalizer])
tfidf_model = pipeline.fit(business_grouped)
tfidf_business = tfidf_model.transform(business_grouped).select("business_id", "business_stars", "norm_features")


In [3]:

# Index users and businesses
indexer_user = StringIndexer(inputCol="user_id", outputCol="user_index")
indexer_business = StringIndexer(inputCol="business_id", outputCol="business_index")
indexed_model = Pipeline(stages=[indexer_user, indexer_business]).fit(review_df)

indexed_data = indexed_model.transform(review_df).select("user_index", "business_index", "business_stars")

# Train ALS model
als = ALS(userCol="user_index", itemCol="business_index", ratingCol="business_stars",
          coldStartStrategy="drop", nonnegative=True, implicitPrefs=False,
          rank=10, maxIter=10, regParam=0.1)
als_model = als.fit(indexed_data)


In [4]:

# Top-N ALS
user_recs = als_model.recommendForAllUsers(10)
recs_exploded = user_recs.withColumn("rec", explode("recommendations")) \
    .select("user_index", col("rec.business_index").alias("business_index"), col("rec.rating").alias("als_score"))

# User history (positive feedback)
user_history = review_df.select("user_id", "business_id", "business_stars").filter("business_stars >= 4")
user_history = indexed_model.transform(user_history).select("user_index", "business_index", "business_id")

# Join TF-IDF
als_with_tfidf = recs_exploded.join(tfidf_business, recs_exploded.business_index == tfidf_business.business_id)
user_history_with_tfidf = user_history.join(tfidf_business, "business_id")


In [5]:

from pyspark.ml.linalg import DenseVector
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql.functions import udf

# Convert VectorUDT to array
vector_to_array_udf = udf(lambda v: v.toArray().tolist() if v is not None else [], ArrayType(DoubleType()))
als_with_tfidf = als_with_tfidf.withColumn("als_array", vector_to_array_udf("norm_features"))
user_history_with_tfidf = user_history_with_tfidf.withColumn("uh_array", vector_to_array_udf("norm_features"))

# Cross join user history with recommendations
user_cross = user_history_with_tfidf.alias("uh").join(
    als_with_tfidf.alias("als"),
    col("uh.user_index") == col("als.user_index")
).select(
    col("als.user_index").alias("user_index"),
    col("als.business_id").alias("rec_business"),
    col("als.als_score"),
    col("uh.business_id").alias("hist_business"),
    col("uh.uh_array"),
    col("als.als_array")
)

# Compute cosine similarity (dot product since vectors are normalized)
user_cross = user_cross.withColumn(
    "similarity",
    F.expr("aggregate(zip_with(uh_array, als_array, (x, y) -> x * y), 0D, (acc, x) -> acc + x)")
)


In [7]:

# Average similarity score
avg_sim = user_cross.groupBy("user_index", "rec_business", "als_score") \
    .agg(avg("similarity").alias("avg_similarity"))

# Combine ALS score and similarity
final_recs = avg_sim.withColumn(
    "final_score", 0.7 * col("als_score") + 0.3 * col("avg_similarity")
).orderBy("user_index", col("final_score").desc())

final_recs.show()


Py4JJavaError: An error occurred while calling o561.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 133.0 failed 1 times, most recent failure: Lost task 0.0 in stage 133.0 (TID 465) (ENIAC executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:54)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:713)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:757)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:675)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:641)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:617)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:574)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:532)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 27 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:455)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:140)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:224)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:219)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:54)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	... 3 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:713)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:757)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:675)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:641)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:617)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:574)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:532)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 27 more
