In [0]:
%sh
pip install graphframes

In [None]:
%pyspark

from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import *
import os

# Initialize Spark with GraphFrames and OOM protection
spark = SparkSession.builder \
    .appName("UnifiedRecSys") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "50") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.jars.packages", "graphframes:graphframes:0.8.3-spark3.1-s_2.12") \
    .enableHiveSupport() \
    .getOrCreate()

# Import GraphFrame AFTER Spark session is initialized
from graphframes import GraphFrame

# Configure your custom ChatGPT client
from openai import OpenAI

client = OpenAI(
    base_url='https://xiaoai.plus/v1',
    api_key='sk-WvIc4NVMTcUwuqa5xVrHG0VG3V2m2xeAK9Umx0NlDD4ZLPFL'
)


# By RIDA


In [None]:
%pyspark
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel  # Critical import

spark = SparkSession.builder \
    .appName("BusinessRecSys") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .enableHiveSupport() \
    .getOrCreate()

# Load data with proper persistence
business = spark.table("business").persist(StorageLevel.DISK_ONLY)
review = spark.table("review").persist(StorageLevel.DISK_ONLY) 
users = spark.table("users").persist(StorageLevel.MEMORY_AND_DISK)

print(f"""
Loaded Data:
- Businesses: {business.count():,}
- Reviews: {review.count():,}
- Users: {users.count():,}
""")
#By Rida

In [None]:
%pyspark
# Show a few records to check if everything is fine
business.show(5)
checkin.show(5)
review.show(5)


#By Rida

In [None]:
%pyspark
print(f"Business Data Count: {business.count()}")
print(f"Check-in Data Count: {checkin.count()}")
print(f"Review Data Count: {review.count()}")

#By Rida

In [None]:
%pyspark
from pyspark.ml.feature import StringIndexer

# 1. Start fresh - drop existing indexed columns
review = review.select("user_id", "business_id", "rating")

# 2. Create new indexers
user_indexer = StringIndexer(inputCol="user_id", outputCol="user_index")
business_indexer = StringIndexer(inputCol="business_id", outputCol="business_index")

# 3. Fit models (using distinct values)
user_index_model = user_indexer.fit(review.select("user_id").distinct())
business_index_model = business_indexer.fit(review.select("business_id").distinct())

# 4. Apply transformations
review = user_index_model.transform(review)
review = business_index_model.transform(review)

# 5. Keep only essential columns
review = review.select("user_index", "business_index", "rating").persist()

# 6. Verify
print("Final Schema After Fix:")
review.printSchema()



#By Rida

In [None]:
%pyspark
# Increase to 20%
review = review.sample(0.05).persist()
review.count() 

#By Rida

In [None]:
%pyspark
from pyspark.ml.recommendation import ALS

# Train ALS with memory optimization
als = ALS(
    rank=8, 
    maxIter=3,  
    regParam=0.1,
    userCol="user_index",
    itemCol="business_index",
    ratingCol="rating",
    coldStartStrategy="drop"
)

als_model = als.fit(review)
user_factors = als_model.userFactors.cache()

#By khadija


In [None]:

%pyspark
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import BucketedRandomProjectionLSH

# ------------------------------------
# 1. Convert & Optimize User Factors
# ------------------------------------
list_to_vector_udf = F.udf(lambda lst: Vectors.dense(lst), VectorUDT())

user_factors = als_model.userFactors.select(
    F.col("id").cast("integer"),
    list_to_vector_udf(F.col("features")).alias("features")
).repartition(100, "id").alias("u1")  # Remove the .filter() with last_active_date

user_factors.cache().count()

# ------------------------------------
# 2. Optimized LSH Configuration
# ------------------------------------
brp = BucketedRandomProjectionLSH(
    inputCol="features",
    outputCol="hashes",
    bucketLength=4.0,
    numHashTables=2
)

lsh_model = brp.fit(user_factors)

# ------------------------------------
# 3. Limited Similarity Search
# ------------------------------------
similar_users = lsh_model.approxSimilarityJoin(
    user_factors, 
    user_factors, 
    threshold=2.0,
    distCol="distance"
).filter("datasetA.id < datasetB.id").limit(500000)

# ------------------------------------
# 4. Similarity Calculation
# ------------------------------------
friend_recs = similar_users.select(
    F.col("datasetA.id").alias("user1"),
    F.col("datasetB.id").alias("user2"),
    (1 - F.col("distance")).alias("similarity")
).filter(F.col("similarity") > 0.2)

# ------------------------------------
# 5. Top-N Recommendations
# ------------------------------------
window = Window.partitionBy("user1").orderBy(F.desc("similarity"))

top_similar_users = (friend_recs
    .withColumn("rank", F.row_number().over(window))
    .filter(F.col("rank") <= 5)
    .select("user1", "user2", "similarity")
)

# ------------------------------------
# 6. Execute and Monitor
# ------------------------------------
print(f"Processing {user_factors.count()} users")
top_similar_users.show(20, truncate=False)

#By khadija

In [None]:
%pyspark
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import BucketedRandomProjectionLSH

# -----------------------------------------------------
# 1. Generate Similar Users (LSH)
# -----------------------------------------------------
# Assuming you've already created user_factors DataFrame
brp = BucketedRandomProjectionLSH(
    inputCol="features",
    outputCol="hashes",
    bucketLength=2.0,
    numHashTables=3
)
lsh_model = brp.fit(user_factors)

# Calculate similar users (THIS CREATES similar_users)
similar_users = lsh_model.approxSimilarityJoin(
    user_factors, 
    user_factors, 
    threshold=1.5, 
    distCol="distance"
)

# -----------------------------------------------------
# 2. Create User Pairs
# -----------------------------------------------------
user_pairs = similar_users.select(
    F.col("datasetA.id").alias("user1"),
    F.col("datasetB.id").alias("user2"),
    (1 - F.col("distance")).alias("similarity")
).filter(F.col("similarity") > 0.2)

# -----------------------------------------------------
# 3. Get Top Recommendations
# -----------------------------------------------------
window = Window.partitionBy("user1").orderBy(F.desc("similarity"))

top_friend_recs = user_pairs \
    .withColumn("rank", F.row_number().over(window)) \
    .filter(F.col("rank") <= 5) \
    .drop("rank")

top_friend_recs.show(10)
#By khadija

In [None]:

%pyspark
# Get users and their reviewed businesses
user_reviews = review.select(
    col("user_index").alias("user"),
    col("business_index").alias("business")
)

# Find common businesses between recommended pairs
common_businesses = top_friend_recs.join(
    user_reviews.alias("u1"), 
    col("user1") == col("u1.user")
).join(
    user_reviews.alias("u2"), 
    (col("user2") == col("u2.user")) & 
    (col("u1.business") == col("u2.business"))
).groupBy("user1", "user2").agg(
    count("*").alias("shared_businesses")
)


# Show users with the most shared interests
common_businesses.orderBy(col("shared_businesses").desc()).show()

#BY AYA 


In [11]:
%sh
pip install pyarrow==14.0.0  # Install specific version

In [None]:
%pyspark
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import os

def generate_explanation(user1_id, user2_id):
    # Initialize OpenAI client inside UDF (fresh for each task)
    from openai import OpenAI
    client = OpenAI(
        base_url="https://xiaoai.plus/v1",
        api_key=os.environ["OPENAI_API_KEY"]  # Store key in Zeppelin environment
    )
    
    # Fetch user data safely
    user1_reviews = review.where(F.col("user_index") == user1_id).limit(3).collect()
    user2_reviews = review.where(F.col("user_index") == user2_id).limit(3).collect()
    
    # Build prompt
    prompt = f"Explain why user {user1_id} and {user2_id} should connect based on shared interests."
    
    # API call with error handling
    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            timeout=10  # Prevent hanging
        )
        return response.choices[0].message.content
    except Exception as e:
        return f"Could not generate explanation: {str(e)}"

# Register UDF
explanation_udf = F.udf(generate_explanation, StringType())


#by AYA 

In [13]:
%sh
# Install networkx and matplotlib in the Spark/Python environment
pip install networkx matplotlib

In [None]:
%pyspark
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1. Generate recommendations (if not already done)
window = Window.partitionBy("user1").orderBy(F.desc("similarity"))
top_friend_recs = user_pairs.withColumn("rank", F.row_number().over(window)) \
    .filter(F.col("rank") <= 5) \
    .drop("rank")

# 2. Create final_recs
final_recs = top_friend_recs  # Or add any additional transformations
final_recs.persist().count()  # Force materialization 
#BY Mohamed 

In [None]:
%pyspark
import networkx as nx
import matplotlib.pyplot as plt

# Convert Spark DataFrame to Pandas
pandas_recs = final_recs.toPandas()

# Create graph
G = nx.Graph()
for _, row in pandas_recs.iterrows():
    G.add_edge(row["user1"], row["user2"], weight=row["similarity"])

# Draw
plt.figure(figsize=(12, 8))
nx.draw(G, with_labels=True, node_color='skyblue', edge_color='gray', width=1.5)
plt.title("Friend Recommendations Based on ALS Similarity")
plt.show() 
#BY Mohamed