In [1]:
from google.colab import drive
drive.mount('/content/drive')

!pip install pyspark

Mounted at /content/drive


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, desc, explode, split, length, when, countDistinct, lower
from pyspark.sql.types import *
import os
import time
import json

# Create a more memory-efficient Spark session
spark = SparkSession.builder \
    .appName("Yelp JSON Analysis") \
    .config("spark.driver.memory", "12g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.shuffle.partitions", "10") \
    .config("spark.default.parallelism", "10") \
    .getOrCreate()

def load_and_filter_restaurants(business_path, review_path):
    business_df = spark.read.json(business_path)

    restaurant_df = business_df.filter(
        lower(col("categories")).contains("restaurant")
    ).select(
        "business_id",
        "name",
        "city",
        "state",
        col("stars").alias("business_stars"),
        "review_count",
        "attributes",
        "categories",
        "hours",
        "is_open"
    )

    review_df = spark.read.json(review_path).select(
        "review_id",
        "business_id",
        "user_id",
        col("stars").alias("review_stars"),
        "useful",
        "funny",
        "cool",
        "text",
        "date"
    )

    combined_df = restaurant_df.join(
        review_df,
        on="business_id",
        how="inner"
    )

    return combined_df

def analyze_success_factors(df):
    # 1. Business Rating Distribution
    print("\nBusiness Rating Distribution:")
    business_ratings = df.select("business_id", "business_stars").distinct()
    business_ratings.groupBy("business_stars").count().orderBy("business_stars").show()

    # 2. Review Rating Distribution
    print("\nReview Rating Distribution:")
    df.groupBy("review_stars").count().orderBy("review_stars").show()

    # 3. State Performance
    print("\nPerformance by State:")
    state_perf = df.groupBy("state").agg(
        avg("business_stars").alias("avg_business_rating"),
        avg("review_stars").alias("avg_review_rating"),
        countDistinct("business_id").alias("num_restaurants"),
        avg("review_count").alias("avg_reviews_per_restaurant")
    )
    state_perf.orderBy(desc("avg_business_rating")).show()

    # 4. Review Engagement Analysis
    print("\nReview Engagement Impact:")
    engagement = df.groupBy("business_id", "business_stars").agg(
        avg("useful").alias("avg_useful_votes"),
        avg("funny").alias("avg_funny_votes"),
        avg("cool").alias("avg_cool_votes"),
        count("review_id").alias("total_reviews")
    )
    engagement.orderBy(desc("business_stars")).show()

    # 5. Open vs Closed Restaurant Performance
    print("\nOpen vs Closed Restaurant Performance:")
    open_vs_closed = df.groupBy("is_open").agg(
        avg("business_stars").alias("avg_business_rating"),
        avg("review_stars").alias("avg_review_rating"),
        countDistinct("business_id").alias("num_restaurants")
    )
    open_vs_closed.limit(10).show()

    # 6. Review Length Analysis
    print("\nReview Length vs Ratings:")
    length_analysis = df.select(
        "review_stars",
        length("text").alias("review_length")
    ).groupBy("review_stars").agg(
        avg("review_length").alias("avg_review_length"),
        count("*").alias("num_reviews")
    )
    length_analysis.orderBy("review_stars").show()

def main():
    # File paths
    business_path = '/content/drive/MyDrive/YelpDataset/yelp_academic_dataset_business.json'
    review_path = '/content/drive/MyDrive/YelpDataset/yelp_academic_dataset_review.json'

    try:
        # Load and combine data
        print("Loading and combining data...")
        combined_df = load_and_filter_restaurants(business_path, review_path)

        # Cache the dataframe for better performance
        combined_df.cache()

        # Run analysis
        print("Analyzing success factors...")
        analyze_success_factors(combined_df)

        #print("Displaying the first 5 rows of combined data:")
        #combined_df.show(5, truncate=False)

        # Uncache the dataframe
        combined_df.unpersist()

    except Exception as e:
        print(f"Error during analysis: {str(e)}")
    finally:
        # Clean up
        spark.stop()

if __name__ == "__main__":
    main()

Loading and combining data...
Analyzing success factors...

Business Rating Distribution:
+--------------+-----+
|business_stars|count|
+--------------+-----+
|           1.0|  247|
|           1.5| 1481|
|           2.0| 2911|
|           2.5| 4850|
|           3.0| 7641|
|           3.5|11798|
|           4.0|13444|
|           4.5| 8360|
|           5.0| 1554|
+--------------+-----+


Review Rating Distribution:
+------------+-------+
|review_stars|  count|
+------------+-------+
|         1.0| 567227|
|         2.0| 404495|
|         3.0| 543126|
|         4.0|1130305|
|         5.0|2079531|
+------------+-------+


Performance by State:
+-----+-------------------+------------------+---------------+--------------------------+
|state|avg_business_rating| avg_review_rating|num_restaurants|avg_reviews_per_restaurant|
+-----+-------------------+------------------+---------------+--------------------------+
|   MT|                5.0|               5.0|              1|                  