In [None]:
# spark_cleaning
# -----------------------------------------------
# Yelp Dataset Cleaning & Preparation using PySpark
# -----------------------------------------------

from pyspark.sql.functions import col, length, to_timestamp

# -------------------------------
# Load Raw Yelp Datasets from GCS
# -------------------------------
business_df = spark.read.json("gs://yelpbigdata/business.json")
review_df = spark.read.json("gs://yelpbigdata/review.json")

# ------------------------------------------
# Step 1: Preprocessing - Column Renaming
# ------------------------------------------
# Rename ambiguous column to avoid collision
review_renamed_df = review_df.withColumnRenamed("stars", "review_stars")

# ------------------------------------------
# Step 2: Join Datasets on business_id
# ------------------------------------------
joined_df = review_renamed_df.join(business_df, on="business_id", how="inner")

# Drop 'stars' from business_df to avoid confusion
cleaned_joined_df = joined_df.drop(business_df.stars)

# ------------------------------------------
# Step 3: Basic Cleaning
# ------------------------------------------
# Remove rows with nulls in essential columns
cleaned_joined_df = cleaned_joined_df.dropna(
    subset=["business_id", "text", "review_stars", "date", "city"]
)

# Cast date to proper timestamp
cleaned_joined_df = cleaned_joined_df.withColumn("date", to_timestamp(col("date")))

# Filter out very short reviews (text length < 50)
cleaned_joined_df = cleaned_joined_df.filter(length(col("text")) >= 50)

# Filter for valid rating values (1 to 5)
cleaned_joined_df = cleaned_joined_df.filter(
    (col("review_stars") >= 1) & (col("review_stars") <= 5)
)

# ------------------------------------------
# Step 4: Business Category Filtering
# ------------------------------------------
# Only keep restaurants
cleaned_joined_df = cleaned_joined_df.filter(col("categories").like("%Restaurants%"))

# Keep only rows with known price range and WiFi info
cleaned_joined_df = cleaned_joined_df.filter(
    col("attributes.RestaurantsPriceRange2").isNotNull() &
    col("attributes.WiFi").isNotNull()
)

# ------------------------------------------
# Step 5: Select Final Columns for Analysis
# ------------------------------------------
final_df = cleaned_joined_df.select(
    "business_id", "text", "review_stars", "date", "city", "state",
    "categories", "name", "review_count",
    col("attributes.RestaurantsPriceRange2").alias("RestaurantsPriceRange2"),
    col("attributes.WiFi").alias("WiFi")
)

# Preview sample output and statistics
final_df.show(5, truncate=False)
final_df.describe(["review_stars"]).show()

# ------------------------------------------
# Step 6: Save Cleaned Data as Parquet to GCS
# ------------------------------------------
final_df.write.mode("overwrite").parquet("gs://yelpbigdata/final_df.parquet")

In [None]:
# spark_analysis
# -----------------------------------------------
# Sentiment Aggregation & Exploratory Analysis using PySpark
# -----------------------------------------------

from pyspark.sql.functions import when, col, avg, count
import time

# -----------------------------------------------
# Step 1: Add Sentiment Column Based on Rating
# -----------------------------------------------
# Define sentiment label from review_stars (1–5)
# ≤ 2 => Negative, 3 => Neutral, ≥ 4 => Positive
sentiment_df = final_df.withColumn(
    "sentiment",
    when(col("review_stars") <= 2, "negative")
    .when(col("review_stars") == 3, "neutral")
    .otherwise("positive")
)

# Create a temporary SQL view
sentiment_df.createOrReplaceTempView("sentiment_reviews")


# -----------------------------------------------
# Step 2: Sentiment Distribution Query
# -----------------------------------------------
start_time = time.time()

sentiment_counts = spark.sql("""
    SELECT sentiment, COUNT(*) AS total_reviews
    FROM sentiment_reviews
    GROUP BY sentiment
    ORDER BY total_reviews DESC
""")

sentiment_counts.show()
print(f"Sentiment count query execution time: {time.time() - start_time:.2f} seconds")


# -----------------------------------------------
# Step 3: Average Review Stars per City
# -----------------------------------------------
start_time = time.time()

avg_stars_by_city = final_df.groupBy("city").agg(
    avg("review_stars").alias("avg_review_stars"),
    count("*").alias("num_reviews")
).orderBy(col("avg_review_stars").desc())

avg_stars_by_city.show(20)
print(f"City-level analysis execution time: {time.time() - start_time:.2f} seconds")


# -----------------------------------------------
# Step 4: Review Count Distribution per Business
# -----------------------------------------------
start_time = time.time()

review_count_distribution = final_df.groupBy("review_count") \
    .count() \
    .orderBy("review_count")

review_count_distribution.show(20)
print(f"Review count distribution execution time: {time.time() - start_time:.2f} seconds")

In [None]:
# machine-learning
# -------------------------
# Step 1: Import Libraries
# -------------------------
from pyspark.sql.functions import when
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

# -------------------------
# Step 2: Create Sentiment Label
# -------------------------
df_labeled = final_df.withColumn(
    "sentiment",
    when(final_df["review_stars"] >= 4, "positive")
    .when(final_df["review_stars"] == 3, "neutral")
    .otherwise("negative")
)

# -------------------------
# Step 3: Filter required columns and drop nulls
# -------------------------
df_model = df_labeled.select("text", "sentiment").dropna()

# -------------------------
# Step 4: Define Pipeline Stages
# -------------------------
indexer = StringIndexer(inputCol="sentiment", outputCol="label")
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=5000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=30, maxDepth=7, maxBins=32)

# -------------------------
# Step 5: Build Pipeline
# -------------------------
pipeline = Pipeline(stages=[indexer, tokenizer, remover, hashingTF, idf, rf])

# -------------------------
# Step 6: Train-Test Split
# -------------------------
train_data, test_data = df_model.randomSplit([0.8, 0.2], seed=42)

# -------------------------
# Step 7: Fit Model
# -------------------------
rf_model = pipeline.fit(train_data)

# -------------------------
# Step 8: Evaluate Performance
# -------------------------
predictions = rf_model.transform(test_data)

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")

f1_score = evaluator_f1.evaluate(predictions)
accuracy = evaluator_acc.evaluate(predictions)
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)

print(f"F1 Score: {f1_score:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

# -------------------------
# Step 9: Feature Importances
# -------------------------
rf_stage = rf_model.stages[-1]  # Extract RandomForestClassifier from pipeline
importances = rf_stage.featureImportances
importance_list = sorted([(idx, importance) for idx, importance in enumerate(importances)], key=lambda x: x[1], reverse=True)

# Save to file
with open("/tmp/model_metrics_and_importance.txt", "w") as f:
    f.write("Top 20 Feature Importances (by index):\n")
    for idx, score in importance_list[:20]:
        f.write(f"Feature {idx}: Importance {score:.6f}\n")

# Upload to GCS
!gsutil cp /tmp/model_metrics_and_importance.txt gs://yelpbigdata/results

# -------------------------
# Step 10: Confusion Matrix
# -------------------------
preds_and_labels = predictions.select("prediction", "label").toPandas()
cm = confusion_matrix(preds_and_labels["label"], preds_and_labels["prediction"])

plt.figure(figsize=(6,5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Confusion Matrix")
plt.savefig("/tmp/confusion_matrix.png")
plt.show()
plt.close()

!gsutil cp /tmp/confusion_matrix.png gs://yelpbigdata/images