In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F
# Driver
spark = SparkSession \
    .builder \
    .master('local') \
    .appName('SparkProj') \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.4") \
    .config("spark.shuffle.spill", "true") \
    .config("spark.shuffle.spill.compress", "true") \
    .getOrCreate()

# define dataset files path
business_path = 'dataset/yelp_academic_dataset_business.json'
checkin_path = 'dataset/yelp_academic_dataset_checkin.json'
review_path = 'dataset/yelp_academic_dataset_review.json'
tip_path = 'dataset/yelp_academic_dataset_tip.json'
user_path = 'dataset/yelp_academic_dataset_user.json'

business = spark.read.json(business_path).limit(5000)
checkin = spark.read.json(checkin_path).limit(5000)

In [20]:
# business.join(right=checkin, joinExprs=business("business_id") == checkin("business_id"), joinType="inner").show()
join_business = business.join(checkin, business['business_id'] == checkin['business_id']) 
join_business.show()


+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+--------------------+--------------------+
|             address|          attributes|         business_id|          categories|         city|               hours|is_open|     latitude|     longitude|                name|postal_code|review_count|stars|state|         business_id|                date|
+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+--------------------+--------------------+
|       2575 E Bay Dr|[,, u'none', {'ro...|0bPLkL0QhhPO5kt1_...|Food, Delis, Ital...|        Largo|[10:0-20:0, 10:0-...|      0|   27.9161159|   -82.7604608|Zio's Italian Market|      33771|         100|  4.5|   FL|0bPLkL0QhhP

In [21]:
from pyspark.sql.dataframe import DataFrame
# define target_cuisines
target_cuisines_task9to12 = ['Chinese', 'American', 'Mexican']


def epic1_filter_by_target_cuisines(business_df: DataFrame, target_cuisines: list):
    """ Given a DataFrame containing business data and a list of target cuisines,
        return a DataFrame containing only businesses that serve the target cuisines.
    Parameters:
    business_df: DataFrame - a DataFrame containing business data
    target_cuisines: list - a list of target cuisines
    Returns:
    DataFrame - a DataFrame containing only businesses that serve the target cuisines"""
    conditions = [business_df["categories"].contains("Restaurant") & business_df["categories"].contains(cuisine) for
                  cuisine in target_cuisines]
    business_df = business_df.withColumn("target_cuisines", when(conditions[0], target_cuisines[0]))
    for i in range(1, len(conditions)):
        business_df = business_df.withColumn("target_cuisines", when(conditions[i], target_cuisines[i]).otherwise(
            business_df["target_cuisines"]))
    return business_df.filter(business_df["target_cuisines"].isin(target_cuisines))

In [25]:
filtered_business_df = epic1_filter_by_target_cuisines(join_business, target_cuisines_task9to12)
filtered_business_df.show()

+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+--------------------+--------------------+---------------+
|             address|          attributes|         business_id|          categories|            city|               hours|is_open|     latitude|      longitude|                name|postal_code|review_count|stars|state|         business_id|                date|target_cuisines|
+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+--------------------+--------------------+---------------+
|680 E 56th St, Ste A|[,,, {u'divey': F...|0DbIUh_Xuj6H5B_SF...|Restaurants, Mexican|      Brownsburg|[10:30-22:30, 10:...|      1|39.8518617214| -86.3847577572|Iguan

In [26]:
filtered_business_df = filtered_business_df.withColumn('year',year('date')).drop('date')
filtered_business_df.show()

+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+--------------------+---------------+----+
|             address|          attributes|         business_id|          categories|            city|               hours|is_open|     latitude|      longitude|                name|postal_code|review_count|stars|state|         business_id|target_cuisines|year|
+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+--------------------+---------------+----+
|680 E 56th St, Ste A|[,,, {u'divey': F...|0DbIUh_Xuj6H5B_SF...|Restaurants, Mexican|      Brownsburg|[10:30-22:30, 10:...|      1|39.8518617214| -86.3847577572|Iguana's Mexican ...|      46112|          74|  4.0| 

In [27]:
American_count_by_year = filtered_business_df.filter(filtered_business_df.target_cuisines == 'American').groupBy("year").count().orderBy("year")
Mexican_count_by_year = filtered_business_df.filter(filtered_business_df.target_cuisines == 'Mexican').groupBy("year").count().orderBy("year")
Chinese_count_by_year = filtered_business_df.filter(filtered_business_df.target_cuisines == 'Chinese').groupBy("year").count().orderBy("year")

# American_pdf = American_count_by_year.toPandas()
# Mexican_pdf =Mexican_count_by_year.toPandas()
# Chinese_pdf = Chinese_count_by_year.toPandas()
American_count_by_year.show()

In [28]:
American_count_by_year.show()

+----+-----+
|year|count|
+----+-----+
|2010|    3|
|2011|    4|
|2012|    3|
|2014|    2|
|2015|    1|
|2016|    1|
|2019|    1|
|2021|    1|
+----+-----+



In [None]:
def epic1_task12(business_df:DataFrame,checkin_df:DataFrame, target_cuisines: list):
    join_business = business_df.join(checkin_df, business_df['business_id'] == checkin_df['business_id']) 

    filtered_business_df = epic1_filter_by_target_cuisines(join_business, target_cuisines_task9to12)
    filtered_business_df = filtered_business_df.withColumn('year',year('date')).drop('date')

    American_count_by_year = filtered_business_df.filter(filtered_business_df.target_cuisines == 'American').groupBy("year").count().orderBy("year")
    Mexican_count_by_year = filtered_business_df.filter(filtered_business_df.target_cuisines == 'Mexican').groupBy("year").count().orderBy("year")
    Chinese_count_by_year = filtered_business_df.filter(filtered_business_df.target_cuisines == 'Chinese').groupBy("year").count().orderBy("year")

    return American_count_by_year, Mexican_count_by_year, Chinese_count_by_year