In [None]:
from pyspark.sql import SparkSession
import pyspark
from pyspark.sql.functions import col, year, to_date, pandas_udf, StringType, size, split
import spacy
import matplotlib.pyplot as plt
import numpy as np
from tqdm import trange
from collections import Counter
from pyspark.ml.feature import NGram, CountVectorizer, Word2Vec
from pyspark.sql.functions import split, explode, desc
import seaborn as sns
import pandas as pd
import os, shutil

In [None]:
# Initialize Spark Session with Increased Driver Memory
spark = SparkSession.builder \
    .appName('ReviewsProcessing') \
    .config('spark.driver.memory', '8g') \
    .getOrCreate()

In [None]:
# Read in reviews data
reviews_df = spark.read.parquet("yelp_reviews.parquet")

In [None]:
reviews_df.show()

In [None]:
# Text processing can be slow and resource intensive, let's start with rsaving reviews from a particular year
# First, extract year from reviews date column
year_to_filter = 2022
reviews_df = reviews_df.withColumn("year", year(to_date(col("date"))))
filtered_df = reviews_df.filter(col('year') == year_to_filter)

In [None]:
reviews_pd_df = filtered_df.toPandas()

In [None]:
reviews_pd_df.shape

In [None]:
# Merge with businesses to find, how many reviews are for businesses that closed, and categories
dataset_path = 'yelp_dataset'
businesses_df = pd.read_json(f'{dataset_path}/yelp_academic_dataset_business.json', lines=True)
businesses_df.shape

In [None]:
businesses_df.head()

In [None]:
merged_df = pd.merge(reviews_pd_df, businesses_df[["business_id", "categories", "is_open"]], on="business_id", how='left')
merged_df[merged_df.is_open == 0].shape

In [None]:
merged_df = merged_df.rename(columns={'is_open':'business_open'})

In [None]:
g = sns.catplot(x='stars', data=merged_df, kind='count', col='business_open')
g.fig.suptitle("Distribution of star rating for closed and open businesses", y=1.06)
plt.plot()

In [None]:
# Filter out closed businesses reviews
open_reviews_df = merged_df[merged_df.business_open == 1].copy()

In [None]:
open_reviews_df.shape

In [None]:
# Count reviews per business
pd.set_option('display.max_rows', None)
open_reviews_df.business_id.value_counts()

In [None]:
# overwrites be default
path = f'filtered/yelp_reviews_{year_to_filter}.parquet'

# remove folder if it exists (from previous PySpark save)
if os.path.isdir(path):
    shutil.rmtree(path)

# now safely save as a single parquet file
open_reviews_df.to_parquet(path, index=False)

In [None]:
spark.catalog.clearCache()  # clears all cached DataFrames
spark.stop()               # stops the Spark session