## Import and spark session

In [1]:
# Package imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import min, max, col, to_timestamp, datediff, when, lit, udf, array, struct, explode, collect_list, concat_ws, countDistinct, split, trim, desc, array_contains, when, array_contains, col
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType, StringType, DoubleType
from math import radians, sin, cos, sqrt, atan2
import pandas as pd

  from pandas.core.computation.check import NUMEXPR_INSTALLED


In [2]:
# Spark session
spark = SparkSession.builder \
    .appName("YelpReviewsClearProcess") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "6g") \
    .getOrCreate()

## Filter and create data

In [8]:
# This process should be iteraded for each year of the dataset by changing the input files and the output file name

# 1. Load business data
business_df = spark.read.json('yelp_dataset15/yelp_academic_dataset_business.json')

# Check the schema to determine category type
category_type = business_df.schema['categories'].dataType

# Create appropriate filter based on data type
if str(category_type).startswith('ArrayType'):
    # Array type categories
    category_filter = "array_contains(categories, 'Restaurants')"
else:
    # String type categories
    category_filter = "categories like '%Restaurants%'"

# Apply the appropriate filter
restaurant_business_df = (business_df
    .select('business_id', 'categories', 'latitude', 'longitude')
    .where('categories is not null')
    .where(category_filter)
    .select('business_id', 'latitude', 'longitude')
)

# Cache the filtered business IDs since we'll reuse them
restaurant_business_df.cache()

# 2. Read and filter the reviews
reviews_df = (spark.read.json('yelp_dataset15/yelp_academic_dataset_review.json')
    .select('business_id', 'review_id', 'date', 'text', 'stars')
)

# 3. Join the data and create final dataset
restaurant_data_df = (reviews_df
    .join(restaurant_business_df, 'business_id')
    .select(
        'business_id',
        'review_id',
        'latitude',
        'longitude',
        'date',
        'text',
        'stars'
    )
)

# 4. Write to parquet with optimization
restaurant_data_df.coalesce(1).write.mode('overwrite').parquet('restaurant_data_15')

# 5. Print some statistics
print(f"Number of restaurant-related businesses found: {restaurant_business_df.count()}")
print(f"Number of reviews in final dataset: {restaurant_data_df.count()}")

Number of restaurant-related businesses found: 52268
Number of reviews in final dataset: 4724471


In [9]:
from pyspark.sql.functions import count, when, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Define the schema explicitly
schema = StructType([
    StructField("business_id", StringType(), True),
    StructField("review_id", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("date", TimestampType(), True),
    StructField("text", StringType(), True),
    StructField("stars", DoubleType(), True)  # Explicitly set as DoubleType
])

# Read files individually and union them with schema conversion
dfs = []
for file in [
    "restaurant_data_10.snappy.parquet",
    "restaurant_data_11.snappy.parquet",
    "restaurant_data_12.snappy.parquet",
    "restaurant_data_13.snappy.parquet",
    "restaurant_data_14.snappy.parquet",
    "restaurant_data_15.snappy.parquet"
]:
    # Read each file and cast stars to double
    df = spark.read.parquet(file)
    df = df.withColumn("stars", col("stars").cast("double"))
    dfs.append(df)

# Union all dataframes
merged_df = dfs[0]
for df in dfs[1:]:
    merged_df = merged_df.union(df)

# Initial count
initial_count = merged_df.count()
print(f"Initial row count for restaurant reviews: {initial_count}")

# Remove duplicates
deduplicated_df = merged_df.dropDuplicates()
after_dedup_count = deduplicated_df.count()
print(f"Row count after removing duplicates: {after_dedup_count}")
print(f"Removed {initial_count - after_dedup_count} duplicate rows")

# Drop rows with null values in critical columns
cleaned_df = (deduplicated_df
    .dropna(subset=['review_id', 'text', 'latitude', 'longitude', 'date'])
)

final_count = cleaned_df.count()
print(f"Final row count after removing null values: {final_count}")
print(f"Removed {after_dedup_count - final_count} rows with null values")

# Get detailed null counts per column for reporting
null_counts = cleaned_df.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in cleaned_df.columns
]).show()

Initial row count for restaurant reviews: 25582834
Row count after removing duplicates: 14898440
Removed 10684394 duplicate rows
Final row count after removing null values: 14898440
Removed 0 rows with null values
+-----------+---------+--------+---------+----+----+-----+
|business_id|review_id|latitude|longitude|date|text|stars|
+-----------+---------+--------+---------+----+----+-----+
|          0|        0|       0|        0|   0|   0|    0|
+-----------+---------+--------+---------+----+----+-----+



In [10]:
# Save the combined data to a new parquet file
cleaned_df.coalesce(1).write.mode('overwrite').parquet('combined_restaurant_data_no_duplicates')

## Proximity and incidents

In [3]:
# Additional cleaning
whole_df = spark.read.parquet('combined_restaurant_data_no_duplicates.snappy.parquet')
whole_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- stars: double (nullable = true)



In [13]:
duplicate_count = cleaned_df.groupBy('business_id', 'review_id', 'text').count().filter('count > 1').count()
print(f"Number of duplicates based on business_id, review_id, and text: {duplicate_count}")

Number of duplicates based on business_id, review_id, and text: 3732283


In [4]:
deduplicated_df = whole_df.dropDuplicates(['business_id', 'review_id', 'text'])
print(deduplicated_df.count())

10023390


In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, count, when, explode, to_timestamp, collect_list, concat_ws
)
from pyspark.sql.types import (
    StructType, StructField, StringType, DoubleType, TimestampType, ArrayType, IntegerType
)
from math import radians, sin, cos, sqrt, atan2

# User-configurable parameters
VICTIMS_CUTOFF = 3
PROXIMITY_DISTANCE_KM = 5
REVIEW_TIMELINE_DAYS = 365

# Function to calculate distance using the Haversine formula
def haversine_distance(lat1, lon1, lat2, lon2):
    R = 6371  # Earth's radius in kilometers
    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    return R * c

# Register UDF for Haversine distance calculation
spark.udf.register("haversine_distance", haversine_distance, DoubleType())

# Load and preprocess reviews data, starting from deduplicated_df
reviews_df = deduplicated_df.withColumn("date", to_timestamp("date"))

# Load and preprocess shootings data
shootings_df = spark.read.csv("/Users/okovacsgabor/local_documents/thesis_data/updated_shootings_with_coordinates.csv", header=True, inferSchema=True) \
    .withColumn("Incident Date", to_timestamp("Incident Date", "MMMM d, yyyy")) \
    .filter(col("Victims Killed") >= VICTIMS_CUTOFF)

# Broadcast shootings data to all nodes
broadcast_shootings = spark.sparkContext.broadcast(shootings_df.collect())

# Define UDF to check proximity and time conditions
def check_proximity_and_time_with_incidents(review_lat, review_lon, review_date):
    if review_lat is None or review_lon is None or review_date is None:
        return []
    
    incident_data = []
    for shooting in broadcast_shootings.value:
        if shooting["Latitude"] is None or shooting["Longitude"] is None:
            continue
        
        distance = haversine_distance(
            float(review_lat), float(review_lon),
            float(shooting["Latitude"]), float(shooting["Longitude"])
        )
        
        if distance <= PROXIMITY_DISTANCE_KM:
            days_diff = (review_date - shooting["Incident Date"]).days
            if abs(days_diff) <= REVIEW_TIMELINE_DAYS:
                weeks_diff = days_diff // 7
                incident_data.append({
                    "incident_id": str(shooting["Incident ID"]),
                    "weeks_diff": weeks_diff
                })
    
    return incident_data

# Register UDF
from pyspark.sql.functions import udf
check_proximity_and_time_with_incidents_udf = udf(
    check_proximity_and_time_with_incidents,
    ArrayType(StructType([
        StructField("incident_id", StringType()),
        StructField("weeks_diff", IntegerType())
    ]))
)

# Apply UDF to create incident data column
result_df = reviews_df.withColumn(
    "incident_data",
    check_proximity_and_time_with_incidents_udf(col("latitude"), col("longitude"), col("date"))
)

# Explode and process incident data
exploded_df = result_df.withColumn("incident_info", explode("incident_data")) \
    .withColumn("incident_id", col("incident_info.incident_id")) \
    .withColumn("weeks_diff", col("incident_info.weeks_diff")) \
    .drop("incident_data", "incident_info")

# Aggregate incident IDs and week differences
grouped_df = exploded_df.groupBy("business_id", "review_id", "latitude", "longitude", "date", "text", "stars") \
    .agg(collect_list("incident_id").alias("incident_ids"),
         collect_list("weeks_diff").alias("weeks_diffs"))

# Convert lists to strings
final_df_with_string = grouped_df.withColumn("incident_ids_str", concat_ws(",", col("incident_ids"))) \
                                 .withColumn("weeks_diffs_str", concat_ws(",", col("weeks_diffs").cast("array<string>")))

print(final_df_with_string.count())

# Write to Parquet
final_df_with_string.select(
    "business_id", "review_id", "latitude", "longitude", "date", "text", "stars", "incident_ids_str", "weeks_diffs_str"
).coalesce(1).write.parquet("proximity_5.parquet", mode="overwrite")


925738


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 49857)
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/anaconda3/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/anaconda3/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/anaconda3/lib/python3.8/socketserver.py", line 720, in __init__
    self.handle()
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/accumulators.py", line 271,

## Exploding and filtering dataframe for week number

In [35]:
test_df = spark.read.parquet("/Users/okovacsgabor/local_documents/thesis_data/clear_process/proximity_5.snappy.parquet")

In [36]:
from pyspark.sql.functions import split, posexplode, col

# Split the incident_ids_str and weeks_diffs_str columns into arrays
test_df = test_df.withColumn("incident_ids_array", split(test_df["incident_ids_str"], ",")) \
       .withColumn("weeks_diffs_array", split(test_df["weeks_diffs_str"], ","))

# Explode the incident_ids_array and weeks_diffs_array columns
exploded_df = test_df.selectExpr("*", "posexplode(incident_ids_array) as (pos, incident_id)") \
                .selectExpr("*", "posexplode(weeks_diffs_array) as (pos2, weeks_diff)") \
                .filter(col("pos") == col("pos2")) \
                .drop("pos", "pos2", "incident_ids_array", "weeks_diffs_array", "incident_ids_str", "weeks_diffs_str")

# Change weeks diff to double
exploded_df = exploded_df.withColumn("weeks_diff", col("weeks_diff").cast("double"))

# Drop duplicates by business_id, review_id, and incident_id -- additional cleaning
exploded_df = exploded_df.dropDuplicates(["business_id", "review_id", "incident_id"])

# Show the result
exploded_df.show()

+--------------------+--------------------+---------+----------+-------------------+--------------------+-----+-----------+----------+
|         business_id|           review_id| latitude| longitude|               date|                text|stars|incident_id|weeks_diff|
+--------------------+--------------------+---------+----------+-------------------+--------------------+-----+-----------+----------+
|--9e1ONYQuAa-CB_R...|-h-J6d7TyoKtWMyuP...|36.123183|-115.16919|2018-01-30 20:03:45|Top notch service...|  5.0|     946496|      17.0|
|--9e1ONYQuAa-CB_R...|2Sk-2jlmv0jj4RjGi...|36.123183|-115.16919|2017-04-23 00:00:00|Ask ten local 'fo...|  4.0|     946496|     -23.0|
|--9e1ONYQuAa-CB_R...|6D-KwWsfVd25r4EYa...|36.123183|-115.16919|2017-12-29 21:49:50|TexasTripletMom's...|  4.0|     946496|      12.0|
|--9e1ONYQuAa-CB_R...|C-bI45l6hz8fvX-Nc...|36.123183|-115.16919|2016-12-31 00:00:00|It was above aver...|  4.0|     946496|     -40.0|
|--9e1ONYQuAa-CB_R...|F1NfXHeVrS2i9y17y...|36.123183|-1

In [37]:
# Filter the rows where the weeks_diff is within -8 and 8
eight_df = exploded_df.filter((col("weeks_diff") >= -8) & (col("weeks_diff") <= 7))
print(eight_df.count())

169668


In [38]:
distinct_incident_ids = eight_df.select(countDistinct("incident_id").alias("distinct_count")).collect()[0]["distinct_count"]
distinct_business_ids = eight_df.select(countDistinct("business_id").alias("distinct_count")).collect()[0]["distinct_count"]
print("Distinct shooting incidents in the data:")
print(distinct_incident_ids)
print("--------------------")
print("Distinct businesses in the data:")
print(distinct_business_ids)

Distinct shooting incidents in the data:
50
--------------------
Distinct businesses in the data:
10652


In [39]:
duplicate_count = eight_df.groupBy('review_id', 'business_id').count().filter('count > 1').count()
print(f"Number of duplicates based on review_id and business_id: {duplicate_count}")

Number of duplicates based on review_id and business_id: 1800


In [41]:
# There are still duplicates based on 'business_id' and 'review_id' caused by multiple incidents
# Step 1: Identify duplicates based on 'business_id' and 'review_id'
duplicates = (eight_df
              .groupBy("business_id", "review_id")
              .count()
              .filter(F.col("count") > 1)
              .select("business_id", "review_id"))

# Step 2: Join back with the original dataframe to get all columns for duplicate rows
duplicate_rows = (eight_df
                  .join(duplicates, on=["business_id", "review_id"], how="inner")
                  .orderBy("business_id", "review_id"))

# Show the result
duplicate_rows.show(truncate=False)

+----------------------+----------------------+----------+-----------+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [42]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def handle_duplicates(eight_df, incidents_path):
    # First, load the incidents data
    incidents_df = spark.read.csv(incidents_path, header=True)
    
    # Convert necessary columns in incidents_df
    incidents_df = incidents_df.withColumn(
        "Incident Date", 
        F.to_timestamp("Incident Date", "MM/dd/yyyy")
    )
    
    # Create a window spec for finding the closest positive weeks_diff to 0
    window_spec = Window.partitionBy("business_id", "review_id")
    
    # Add row numbers based on our primary criterion:
    # 1. Only positive weeks_diff
    # 2. Closest to 0
    with_ranks = eight_df.withColumn(
        "abs_weeks_diff",
        F.abs("weeks_diff")
    ).withColumn(
        "rank",
        F.row_number().over(
            window_spec.orderBy(
                # First ensure weeks_diff is positive
                F.when(F.col("weeks_diff") < 0, float('inf'))
                .otherwise(F.col("abs_weeks_diff"))
            )
        )
    )
    
    # Get cases where rank = 1 (these are our clear winners)
    clear_winners = with_ranks.filter(F.col("rank") == 1)
    
    # Find cases where we have ties (same abs_weeks_diff for same business_id, review_id)
    ties = with_ranks.withColumn(
        "min_abs_diff",
        F.min("abs_weeks_diff").over(window_spec)
    ).filter(
        (F.col("abs_weeks_diff") == F.col("min_abs_diff")) &
        (F.col("weeks_diff") > 0)
    ).withColumn(
        "tie_count",
        F.count("*").over(window_spec)
    ).filter(F.col("tie_count") > 1)
    
    if ties.count() > 0:
        # For ties, we need to consider incident dates
        ties_with_dates = ties.join(
            incidents_df.select(
                F.col("Incident ID").alias("incident_id"),
                "Incident Date"
            ),
            on="incident_id"
        )
        
        # Calculate time difference between review and incident
        ties_with_dates = ties_with_dates.withColumn(
            "time_diff",
            F.abs(F.unix_timestamp("date") - F.unix_timestamp("Incident Date"))
        )
        
        # Select the row with minimum time difference for each tie group
        tie_winners = ties_with_dates.withColumn(
            "tie_rank",
            F.row_number().over(
                window_spec.orderBy("time_diff")
            )
        ).filter(F.col("tie_rank") == 1)
        
        # Select only the original columns
        tie_winners = tie_winners.select(eight_df.columns)
        
        # Combine clear winners with tie winners
        final_df = clear_winners.filter(
            ~F.concat(F.col("business_id"), F.col("review_id")).isin(
                ties.select(
                    F.concat(F.col("business_id"), F.col("review_id"))
                ).distinct().rdd.flatMap(lambda x: x).collect()
            )
        ).union(tie_winners)
    else:
        final_df = clear_winners
    
    # Return final dataframe with only the original columns
    return final_df.select(eight_df.columns)

# Usage:
incidents_path = "/Users/okovacsgabor/local_documents/thesis_data/updated_shootings_with_coordinates.csv"
result_df = handle_duplicates(eight_df, incidents_path)

In [46]:
# Check for any remaining duplicates
remaining_duplicates = (result_df
    .groupBy("business_id", "review_id")
    .count()
    .filter(F.col("count") > 1))

remaining_duplicates.show()  # Should show 0 results

duplicate_count = result_df.groupBy('review_id', 'business_id').count().filter('count > 1').count()
print(f"Number of duplicates based on review_id and business_id: {duplicate_count}")
print(result_df.count())



+-----------+---------+-----+
|business_id|review_id|count|
+-----------+---------+-----+
+-----------+---------+-----+

Number of duplicates based on review_id and business_id: 0
167866


In [47]:
from pyspark.sql import functions as F

# Step 1: Group by business_id and incident_id to count distinct weeks_diff for each combination
incident_weeks_review_count = (
    result_df
    .groupBy("business_id", "incident_id")  # Group by both business_id and incident_id
    .agg(F.countDistinct("weeks_diff").alias("distinct_weeks"))  # Count distinct weeks for each combination
)

# Step 2: Group by distinct_weeks and count how many restaurant-incident pairs fall into each category
incident_weeks_summary = (
    incident_weeks_review_count
    .groupBy("distinct_weeks")
    .agg(F.count("*").alias("num_restaurant_incident_pairs"))  # Count the number of restaurant-incident pairs for each distinct_weeks
    .orderBy("distinct_weeks", ascending=False)  # Sort by number of weeks in descending order
)

# Step 3: Show the result
incident_weeks_summary.show()

+--------------+-----------------------------+
|distinct_weeks|num_restaurant_incident_pairs|
+--------------+-----------------------------+
|            16|                          509|
|            15|                          336|
|            14|                          309|
|            13|                          367|
|            12|                          465|
|            11|                          474|
|            10|                          525|
|             9|                          637|
|             8|                          616|
|             7|                          746|
|             6|                          927|
|             5|                          966|
|             4|                         1180|
|             3|                         1565|
|             2|                         2048|
|             1|                         3297|
+--------------+-----------------------------+



In [48]:
result_df.coalesce(1).write.mode('overwrite').parquet('minus_8_plus_7')

## Joining scores and filtering for englsih

In [49]:
pre_df = spark.read.parquet("/Users/okovacsgabor/local_documents/thesis_data/clear_process/minus_8_plus_7.snappy.parquet")

In [50]:
# Drop non english rows

from pyspark.sql import functions as F
from langdetect import detect
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

# Step 1: Define a UDF for language detection
def detect_language(text):
    try:
        return detect(text)
    except:
        return None  # Return None if detection fails

detect_language_udf = udf(detect_language, StringType())

# Step 2: Apply the UDF to add a `language` column
pre_df = pre_df.withColumn("language", detect_language_udf(F.col("text")))

# Step 3: Filter only English reviews (where language is 'en')
english_reviews_df = pre_df.filter(F.col("language") == "en")

# Step 4: Calculate the percentage of reviews dropped
initial_count = pre_df.count()
filtered_count = english_reviews_df.count()
percentage_dropped = ((initial_count - filtered_count) / initial_count) * 100

print(f"Percentage of reviews dropped: {percentage_dropped:.2f}%")

# Step 5: Drop the `language` column from the final DataFrame
english_reviews_df = english_reviews_df.drop("language")


Percentage of reviews dropped: 0.19%


In [51]:
# Read reviews_with_sentiment.csv
reviews_with_sentiment_df = spark.read.csv("/Users/okovacsgabor/local_documents/thesis_data/reviews_with_sentiment.csv", header=True, inferSchema=True)

In [52]:
# Remove duplicates based on review_id in reviews_with_sentiment_df
reviews_with_sentiment_df = reviews_with_sentiment_df.dropDuplicates(["review_id"])

# Now perform the left join as before
merged_df = english_reviews_df.join(
    reviews_with_sentiment_df.select("review_id", "sentiment_score"),
    on="review_id",
    how="left"
)

# Check the row count to confirm
print(f"Row count of menglish_reviews_df after removing duplicates: {english_reviews_df.count()}")
print(f"Row count of merged_df after removing duplicates: {merged_df.count()}")

# Check the schema to confirm sentiment_score has been added
merged_df.printSchema()

# Count the number of rows where sentiment_score is missing (i.e., null)
missing_sentiment_count = merged_df.filter(merged_df.sentiment_score.isNull()).count()
print(f"Number of missing sentiment_score values: {missing_sentiment_count}")

Row count of menglish_reviews_df after removing duplicates: 167541
Row count of merged_df after removing duplicates: 167536
root
 |-- review_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- text: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- incident_id: string (nullable = true)
 |-- weeks_diff: double (nullable = true)
 |-- sentiment_score: double (nullable = true)

Number of missing sentiment_score values: 11


In [53]:
merged_df.coalesce(1).write.mode('overwrite').parquet('final_data_8_7')

## Filtering for only complete incidents

In [3]:
all_df = spark.read.parquet("/Users/okovacsgabor/local_documents/thesis_data/clear_process/final_data_8_7.snappy.parquet")

In [6]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Step 1: Assign a unique ID to each row in all_df
all_df_with_id = all_df.withColumn("unique_id", F.monotonically_increasing_id())

# Step 2: Group by business_id and incident_id to count distinct weeks_diff for each combination
incident_weeks_review_count = (
    all_df_with_id
    .groupBy("business_id", "incident_id")  # Group by both business_id and incident_id
    .agg(F.countDistinct("weeks_diff").alias("distinct_weeks"))  # Count distinct weeks for each combination
)

# Step 3: Filter to include only those pairs that have reviews in all 16 distinct weeks
incident_weeks_review_count_filtered = incident_weeks_review_count.filter(F.col("distinct_weeks") == 16)

# Step 4: Join the filtered pairs back to all_df_with_id to keep only the relevant rows
filtered_all_df = all_df_with_id.join(
    incident_weeks_review_count_filtered.select("business_id", "incident_id"),
    on=["business_id", "incident_id"],
    how="inner"
)

# Step 5: Select the original columns and the unique_id
filtered_all_df = filtered_all_df.select("unique_id")

# Step 6: Filter the original all_df using the unique_id
final_filtered_df = all_df_with_id.join(filtered_all_df, on="unique_id", how="inner").drop("unique_id")

# Step 7: Show the result
final_filtered_df.show()

+--------------------+--------------------+-------------+---------------+-------------------+--------------------+-----+-----------+----------+---------------+
|           review_id|         business_id|     latitude|      longitude|               date|                text|stars|incident_id|weeks_diff|sentiment_score|
+--------------------+--------------------+-------------+---------------+-------------------+--------------------+-----+-----------+----------+---------------+
|-3rPQVk5g5PPMSa7h...|qY-BUQY-SFBaSrFHo...|39.9550836837| -75.1552917273|2017-02-15 01:01:02|Very good food an...|  5.0|     810756|      -7.0|         0.7574|
|-43FFWuLN1g7Gut9r...|XI99CG-lx3OH7oVeR...|36.1517081701| -86.8198794742|2018-02-03 03:28:49|Went for lunch an...|  5.0|    1025673|       3.0|         0.9628|
|-4mBqyDxSOkjZ1gib...|yz0KWVamNhqiZGz7X...|29.9545040131| -90.0664367676|2017-05-15 23:14:48|This place was am...|  5.0|     858719|      -3.0|         0.9059|
|-50XTL8wXcyfjPK6U...|9PZxjhTIU7OgPIzuG.

In [7]:
from pyspark.sql import functions as F

# Step 1: Group by business_id and incident_id to count distinct weeks_diff for each combination
incident_weeks_review_count = (
    final_filtered_df
    .groupBy("business_id", "incident_id")  # Group by both business_id and incident_id
    .agg(F.countDistinct("weeks_diff").alias("distinct_weeks"))  # Count distinct weeks for each combination
)

# Step 2: Group by distinct_weeks and count how many restaurant-incident pairs fall into each category
incident_weeks_summary = (
    incident_weeks_review_count
    .groupBy("distinct_weeks")
    .agg(F.count("*").alias("num_restaurant_incident_pairs"))  # Count the number of restaurant-incident pairs for each distinct_weeks
    .orderBy("distinct_weeks", ascending=False)  # Sort by number of weeks in descending order
)

# Step 3: Show the result
incident_weeks_summary.show()

+--------------+-----------------------------+
|distinct_weeks|num_restaurant_incident_pairs|
+--------------+-----------------------------+
|            16|                          508|
+--------------+-----------------------------+



In [9]:
distinct_incident_ids = final_filtered_df.select(countDistinct("incident_id").alias("distinct_count")).collect()[0]["distinct_count"]
distinct_business_ids = final_filtered_df.select(countDistinct("business_id").alias("distinct_count")).collect()[0]["distinct_count"]
print("Distinct shooting incidents in the data:")
print(distinct_incident_ids)
print("--------------------")
print("Distinct businesses in the data:")
print(distinct_business_ids)

Distinct shooting incidents in the data:
30
--------------------
Distinct businesses in the data:
406


In [12]:
# Select distinct weeks_diff and sort them
unique_week_diffs = final_filtered_df.select("weeks_diff").distinct().orderBy("weeks_diff")

# Show the result
unique_week_diffs.show()

+----------+
|weeks_diff|
+----------+
|      -8.0|
|      -7.0|
|      -6.0|
|      -5.0|
|      -4.0|
|      -3.0|
|      -2.0|
|      -1.0|
|       0.0|
|       1.0|
|       2.0|
|       3.0|
|       4.0|
|       5.0|
|       6.0|
|       7.0|
+----------+



In [10]:
print(final_filtered_df.count())

45713


In [8]:
final_filtered_df.coalesce(1).write.mode('overwrite').parquet('final_data_8_7_comp_weeks')

In [54]:
spark.stop()