In [3]:
import pyspark 
from pyspark.sql import SparkSession 
import time
from pyspark.sql.functions import *

In [4]:
spark = SparkSession.builder.config('spark.driver.memory' , '8g').getOrCreate()

In [5]:
def top_stores_by_positive_reviews(business_df, review_df, 
                                       positive_threshold=4, top_n=10):
        """
        4. Cửa hàng nhận nhiều đánh giá tích cực nhất
        
        Optimizations:
        - Single-pass aggregation with conditional logic
        - Repartitioning and caching
        - Early filtering
        - Broadcast join
        
        Args:
            business_df: DataFrame chứa dữ liệu business
            review_df: DataFrame chứa dữ liệu review
            positive_threshold: ngưỡng sao tích cực (default: 4)
            top_n: số lượng top cửa hàng
        
        Returns:
            DataFrame với top cửa hàng có nhiều review tích cực nhất
        """
        print(f"\n{'='*60}")
        print(f"Analysis 4: Top {top_n} Stores by Positive Reviews (>= {positive_threshold} stars)")
        print(f"{'='*60}")
        start_time = time.time()
        
        # Repartition and cache
        review_partitioned = review_df \
            .select("business_id", "review_id", "stars", "useful") \
            .repartition(200, "business_id") \
            .cache()
        
        # Single-pass aggregation with conditional logic
        review_stats = review_partitioned.groupBy("business_id").agg(
            # Count positive reviews
            sum(when(col("stars") >= positive_threshold, 1).otherwise(0))
                .alias("positive_review_count"),
            
            # Total review count
            count("review_id").alias("total_review_count"),
            
            # Average stars of positive reviews
            avg(when(col("stars") >= positive_threshold, col("stars")))
                .alias("avg_positive_rating"),
            
            # Total useful votes from positive reviews
            sum(when(col("stars") >= positive_threshold, col("useful")).otherwise(0))
                .alias("total_useful_votes")
        )
        
        # Calculate positive ratio and filter
        review_stats_filtered = review_stats \
            .withColumn(
                "positive_ratio", 
                col("positive_review_count") / col("total_review_count")
            ) \
            .filter(col("positive_review_count") > 0)
        
        # Get top candidates
        top_candidates = review_stats_filtered \
            .orderBy(desc("positive_review_count"), desc("positive_ratio")) \
            .limit(top_n * 3)
        
        # Broadcast join
        result = top_candidates.join(
            broadcast(business_df.select(
                "business_id", "name", "city", "state", "categories"
            )),
            "business_id"
        ).select(
            "business_id",
            "name",
            "city",
            "state",
            "categories",
            "positive_review_count",
            "total_review_count",
            "positive_ratio",
            "avg_positive_rating",
            "total_useful_votes"
        ).orderBy(
            desc("positive_review_count"), 
            desc("positive_ratio")
        ).limit(top_n)
        result_count = result.count()
        
        # Cleanup
        review_partitioned.unpersist()
        
        elapsed = time.time() - start_time
        print(f"✓ Completed in {elapsed:.2f}s - Found {result_count} results")
        
        return result

In [6]:
review = spark.read.json(r'D:\VS-workspace\bigdata-2025-1\data\review.json')

In [7]:
business = spark.read.json(r'D:\VS-workspace\bigdata-2025-1\data\business.json')

In [26]:
user = spark.read.json(r'D:\VS-workspace\bigdata-2025-1\data\user.json')

In [28]:
user.show()


+-------------+---------------+---------------+----------------+--------------+---------------+---------------+---------------+-----------------+----------------+------------------+-----------------+-----+--------------------+----+--------------------+-----+---------+------------+------+--------------------+-------------------+
|average_stars|compliment_cool|compliment_cute|compliment_funny|compliment_hot|compliment_list|compliment_more|compliment_note|compliment_photos|compliment_plain|compliment_profile|compliment_writer| cool|               elite|fans|             friends|funny|     name|review_count|useful|             user_id|      yelping_since|
+-------------+---------------+---------------+----------------+--------------+---------------+---------------+---------------+-----------------+----------------+------------------+-----------------+-----+--------------------+----+--------------------+-----+---------+------------+------+--------------------+-------------------+
|         

In [8]:
review_partitioned = review \
            .select("business_id", "review_id", "stars", "useful") \
            .repartition(200, "business_id") \
            .cache()

In [9]:
review_stats = review_partitioned.groupBy("business_id").agg(
            # Count positive reviews
            sum(when(col("stars") >= 4, 1).otherwise(0))
                .alias("positive_review_count"),
            
            # Total review count
            count("review_id").alias("total_review_count"),
            
            # Average stars of positive reviews
            avg(when(col("stars") >= 4, col("stars")))
                .alias("avg_positive_rating"),
            
            # Total useful votes from positive reviews
            sum(when(col("stars") >= 4, col("useful")).otherwise(0))
                .alias("total_useful_votes")
        )

In [10]:
# Calculate positive ratio and filter
review_stats_filtered = review_stats \
    .withColumn(
        "positive_ratio", 
        col("positive_review_count") / col("total_review_count")
    ) \
    .filter(col("positive_review_count") > 0)

In [11]:
 # Get top candidates
top_candidates = review_stats_filtered \
    .orderBy(desc("positive_review_count"), desc("positive_ratio")) \
    .limit(10 * 3)

In [12]:
result = top_candidates.join(
            broadcast(business.select(
                "business_id", "name", "city", "state", "categories"
            )),
            "business_id"
        ).select(
            "business_id",
            "name",
            "city",
            "state",
            "categories",
            "positive_review_count",
            "total_review_count",
            "positive_ratio",
            "avg_positive_rating",
            "total_useful_votes"
        ).orderBy(
            desc("positive_review_count"), 
            desc("positive_ratio")
        ).limit(10)

In [13]:
result.show()

+--------------------+--------------------+------------+-----+--------------------+---------------------+------------------+------------------+-------------------+------------------+
|         business_id|                name|        city|state|          categories|positive_review_count|total_review_count|    positive_ratio|avg_positive_rating|total_useful_votes|
+--------------------+--------------------+------------+-----+--------------------+---------------------+------------------+------------------+-------------------+------------------+
|_ab50qdWOk0DdB6XO...|   Acme Oyster House| New Orleans|   LA|Live/Raw Food, Se...|                 5982|              7673|0.7796168382640427| 4.6093279839518555|              3414|
|ac1AeYqs8Z4_e2X5M...|        Oceana Grill| New Orleans|   LA|Restaurants, Seaf...|                 5865|              7516|0.7803352847259181|  4.684057971014493|              2322|
|GXFMD0Z4jEVZBCsbP...|Hattie B’s Hot Ch...|   Nashville|   TN|American (Traditi...|  

In [14]:
test = top_stores_by_positive_reviews(business , review ,positive_threshold=4, top_n=10)


Analysis 4: Top 10 Stores by Positive Reviews (>= 4 stars)
✓ Completed in 2.41s - Found 10 results


In [15]:
test.show()

+--------------------+--------------------+------------+-----+--------------------+---------------------+------------------+------------------+-------------------+------------------+
|         business_id|                name|        city|state|          categories|positive_review_count|total_review_count|    positive_ratio|avg_positive_rating|total_useful_votes|
+--------------------+--------------------+------------+-----+--------------------+---------------------+------------------+------------------+-------------------+------------------+
|_ab50qdWOk0DdB6XO...|   Acme Oyster House| New Orleans|   LA|Live/Raw Food, Se...|                 5982|              7673|0.7796168382640427| 4.6093279839518555|              3414|
|ac1AeYqs8Z4_e2X5M...|        Oceana Grill| New Orleans|   LA|Restaurants, Seaf...|                 5865|              7516|0.7803352847259181|  4.684057971014493|              2322|
|GXFMD0Z4jEVZBCsbP...|Hattie B’s Hot Ch...|   Nashville|   TN|American (Traditi...|  

In [18]:
def top_rated_products(business_df, review_df, min_reviews=50, top_n=10):
        """
        3. Sản phẩm (doanh nghiệp) đánh giá tích cực nhất
        
        Optimizations:
        - Partitioning by business_id
        - Strategic caching
        - Early filtering by min_reviews
        - Broadcast join
        
        Args:
            business_df: DataFrame chứa dữ liệu business
            review_df: DataFrame chứa dữ liệu review
            min_reviews: số lượng review tối thiểu
            top_n: số lượng top sản phẩm
        
        Returns:
            DataFrame với top sản phẩm có rating cao nhất
        """
        print(f"\n{'='*60}")
        print(f"Analysis 3: Top {top_n} Rated Products (Min {min_reviews} reviews)")
        print(f"{'='*60}")
        start_time = time.time()
        
        # Repartition and cache
        review_partitioned = review_df \
            .select("business_id", "review_id", "stars", "useful") \
            .repartition(200, "business_id") \
            .cache()
        
        # Aggregate review stats
        business_stats = review_partitioned \
            .filter(col("stars").isNotNull()) \
            .groupBy("business_id") \
            .agg(
                count("review_id").alias("total_reviews"),
                avg("stars").alias("avg_review_stars"),
                sum("useful").alias("total_useful")
            )
        
        # Filter by minimum reviews
        qualified = business_stats.filter(col("total_reviews") >= min_reviews)
        
        # Get top candidates
        top_candidates = qualified \
            .orderBy(desc("avg_review_stars"), desc("total_reviews")) \
            .limit(top_n * 5)
        
        # Broadcast join
        result = top_candidates.join(
            broadcast(business_df.select(
                "business_id", "name", "city", "state", "categories", "stars"
            )),
            "business_id"
        ).select(
            "business_id",
            "name",
            "city",
            "state",
            "categories",
            "total_reviews",
            "avg_review_stars",
            "total_useful",
            col("stars").alias("business_avg_stars")
        ).orderBy(
            desc("avg_review_stars"), 
            desc("total_reviews")
        ).limit(top_n)
        
        result_count = result.count()
        
        # Cleanup
        review_partitioned.unpersist()
        
        elapsed = time.time() - start_time
        print(f"✓ Completed in {elapsed:.2f}s - Found {result_count} results")
        
        return result

In [19]:
test_03 = top_rated_products(business, review, min_reviews=50, top_n=10)


Analysis 3: Top 10 Rated Products (Min 50 reviews)
✓ Completed in 28.34s - Found 10 results


In [20]:
test_03.show()

+--------------------+--------------------+-------------+-----+--------------------+-------------+----------------+------------+------------------+
|         business_id|                name|         city|state|          categories|total_reviews|avg_review_stars|total_useful|business_avg_stars|
+--------------------+--------------------+-------------+-----+--------------------+-------------+----------------+------------+------------------+
|1RqfozJoosHAsKZhc...|Walls Jewelry Rep...|    Nashville|   TN|Watch Repair, Loc...|          114|             5.0|          97|               5.0|
|-siOxQQcGKtb-04dX...|ella & louie flowers|Santa Barbara|   CA|Flowers & Gifts, ...|          104|             5.0|          65|               5.0|
|4-P4Bzqd01YvKX9tp...|       Drink & Learn|  New Orleans|   LA|Beer, Wine & Spir...|           90|             5.0|          47|               5.0|
|dhLARBhUnJloLa8xZ...|Steves iPhone Repair|  Cherry Hill|   NJ|Professional Serv...|           78|             5

In [21]:
test_05 = top_selling_products_recent(review , business , days=90, top_n=10)

NameError: name 'top_selling_products_recent' is not defined

In [None]:
def top_selling_products_recent(review_df, business_df, days=90, top_n=10):
        """
        1. Top sản phẩm (doanh nghiệp) bán chạy nhất trong khoảng thời gian gần
        
        Optimizations:
        - Salting to handle data skew
        - Two-stage aggregation
        - Broadcast join for business info
        - Early filtering and limiting
        
        Args:
            review_df: DataFrame chứa dữ liệu review (đã preprocess dates)
            business_df: DataFrame chứa dữ liệu business
            days: số ngày gần đây cần phân tích
            top_n: số lượng top sản phẩm
        
        Returns:
            DataFrame với top sản phẩm bán chạy nhất
        """
        print(f"\n{'='*60}")
        print(f"Analysis 1: Top {top_n} Selling Products (Last {days} days)")
        print(f"{'='*60}")
        start_time = time.time()
        
        # Add salt to handle skew
        review_with_salt = review_df.withColumn("salt", (rand() * 10).cast("int"))
        
        # Filter by date range
        cutoff_date = current_date() - lit(days)
        recent_reviews = review_with_salt.filter(col("review_date") >= cutoff_date)
        
        # Stage 1: Salted aggregation
        salted_agg = recent_reviews.groupBy("business_id", "salt").agg(
            count("review_id").alias("partial_count"),
            sum("stars").alias("partial_sum_stars"),
            count("stars").alias("partial_count_stars")
        )
        
        # Stage 2: Final aggregation
        business_stats = salted_agg.groupBy("business_id").agg(
            sum("partial_count").alias("recent_review_count"),
            (sum("partial_sum_stars") / sum("partial_count_stars")).alias("avg_rating")
        )
        
        # Get top candidates before join
        top_candidates = business_stats \
            .orderBy(desc("recent_review_count")) \
            .limit(top_n * 10)
        
        # Broadcast join with business info
        result = top_candidates.join(
            broadcast(business_df.select(
                "business_id", "name", "city", "state", "categories"
            )),
            "business_id"
        ).select(
            "business_id",
            "name",
            "city",
            "state",
            "categories",
            "recent_review_count",
            "avg_rating"
        ).orderBy(desc("recent_review_count")).limit(top_n)
        
        # Materialize result
        result_count = result.count()
        
        elapsed = time.time() - start_time
        print(f"✓ Completed in {elapsed:.2f}s - Found {result_count} results")
        
        return result
    