# Data model

- This notebook implements the data model.

- It first created the dimension and fact tables from cleaned parquet files.

- The following diagram shows the data model that is comprised of four tables:
    + sdf_business
    + sdf_reviews
    + sdf_tip
    + sdf_checkin

In [69]:
# Import libraries
import pandas as pd
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.functions import (
    split, col, explode, to_date, unix_timestamp,
    to_timestamp, year, month, dayofmonth, dayofweek, 
    hour, minute
)

In [46]:
# Create spark environment
from pyspark.sql import SparkSession
spark = (
    SparkSession
        .builder
        .appName("eda")
        .config("spark.driver.memory", "8g")
        .getOrCreate()
    
)

In [47]:
# Read data
sdf_business = spark.read.parquet("data/output/yelp_academic_dataset_business.parquet")
sdf_reviews = spark.read.parquet("data/output/yelp_academic_dataset_review.parquet")
sdf_tip = spark.read.parquet("data/output/yelp_academic_dataset_tip.parquet")
sdf_checkin = spark.read.parquet("data/output/yelp_academic_dataset_checkin.parquet")

In [48]:
def fun_preprocessing_summary(sdf):
    '''Prints a summary table of the number of observations and the years comprised.
    Atributes:
        sdf: A spark dataframe
    Returns:
        Prints a table with the number of obsefvations, minimum and maximum year.
    '''
    return (
        sdf.agg(
            F.count('business_id').alias('num_obs'), 
            F.min(sdf.year).alias('min_year'), 
            F.max(sdf.year).alias('max_year'))
        .show())

In [49]:
sdf_business.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- address: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- city: string (nullable = true)
 |-- is_open: long (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- state: string (nullable = true)
 |-- attributes_AcceptsInsurance: string (nullable = true)
 |-- attributes_AgesAllowed: string (nullable = true)
 |-- attributes_Alcohol: string (nullable = true)
 |-- attributes_Ambience: string (nullable = true)
 |-- attributes_BYOB: string (nullable = true)
 |-- attributes_BYOBCorkage: string (nullable = true)
 |-- attributes_BestNights: string (nullable = true)
 |-- attributes_BikeParking: string (nullable = true)
 |-- attributes_BusinessAcceptsBitcoin: string (nullable = true)
 |-- attributes_BusinessAcceptsCred

## Create number of business checkins table

* `sdf_checkings_by_year`: Number of checkins in 2020.

In [50]:
# Business aggregated checkins by year
sdf_checkings_by_year = (
    sdf_checkin.groupBy(['cluster','business_id','year'])
        .agg(F.count('business_id')
        .alias('num_checkins'))
        .filter(F.col("year") == 2020))
sdf_checkings_by_year.show(10)

+-------+--------------------+----+------------+
|cluster|         business_id|year|num_checkins|
+-------+--------------------+----+------------+
|      3|cMd8wyA2Ihx8ox31b...|2020|          12|
|      3|eE5fG3KsygPx4NG8P...|2020|          22|
|      3|eHZu7KW5cDt4OR1p3...|2020|          14|
|      3|euGJ66qkVIt8NLGWq...|2020|           3|
|      3|f8cfLeoNlcyEdWNUq...|2020|           8|
|      3|fsMX2yJrYuJODzhyG...|2020|          13|
|      3|hAyaNDmsu09dV1jY9...|2020|           3|
|      3|iBzGe-WgOXItmouJL...|2020|          28|
|      3|-41JZd1tneiARMgls...|2020|           2|
|      3|003O_Z6sbCd1rUJG3...|2020|           5|
+-------+--------------------+----+------------+
only showing top 10 rows



- Number of observations in the window year.

In [51]:
fun_preprocessing_summary(sdf_checkings_by_year)

+-------+--------+--------+
|num_obs|min_year|max_year|
+-------+--------+--------+
|  55425|    2020|    2020|
+-------+--------+--------+



## Create number of business tips table

* `sdf_tips_by_year`: Number of tips per business in 2020

In [52]:
# Business aggregated tips by year
sdf_tips_by_year = (
    sdf_tip
        .groupBy(['cluster','business_id','year'])
        .agg(F.count('business_id').alias('num_tips'))
        .filter(F.col("year") == 2020)
)
sdf_tips_by_year.show(10)

+-------+--------------------+----+--------+
|cluster|         business_id|year|num_tips|
+-------+--------------------+----+--------+
|      7|NFb4zqgY-P2A_ISUn...|2020|       3|
|      7|J9W4fPHfZlhwwtDmJ...|2020|       1|
|      7|GtVsVznFezaZFSdLC...|2020|       1|
|      7|QxwMKvWZ9flt1l_Jb...|2020|       2|
|      7|vM5VMgMGZIwPgT8C_...|2020|       2|
|      7|WeMAq5qE6qr5aAUoA...|2020|       1|
|      7|cCVXr-vJp59gkrsng...|2020|       1|
|      3|o3EOOa-Rt--zHmC9R...|2020|      16|
|      3|M3yxURl7qDk9qhStH...|2020|       3|
|      3|eE5fG3KsygPx4NG8P...|2020|       5|
+-------+--------------------+----+--------+
only showing top 10 rows



* Number of observations ion 2020

In [53]:
fun_preprocessing_summary(sdf_tips_by_year)

+-------+--------+--------+
|num_obs|min_year|max_year|
+-------+--------+--------+
|  18604|    2020|    2020|
+-------+--------+--------+



## Create number of business reviews table

* `sdf_reviews_by_year`: Number of reviews and average reviews of businesses in 2020

In [54]:
# Business reviews by year 
sdf_reviews_by_year = (
    sdf_reviews
        .groupBy(['cluster','business_id','year'])
        .agg(
            F.count('business_id').alias('num_reviews'), 
            F.round( F.mean('stars'), 1).alias('mean_stars_reviews'))
        .filter(F.col("year") == 2020)
)
sdf_reviews_by_year.show(10)

+-------+--------------------+----+-----------+------------------+
|cluster|         business_id|year|num_reviews|mean_stars_reviews|
+-------+--------------------+----+-----------+------------------+
|      3|eE5fG3KsygPx4NG8P...|2020|         13|               4.4|
|      3|f8cfLeoNlcyEdWNUq...|2020|          3|               4.7|
|      3|LULoJmcf6d4mALHnE...|2020|          7|               2.4|
|      3|aTgkJk5JWx6Zix_Ij...|2020|          7|               2.0|
|      3|s0tFGT3NAY0bCUsAu...|2020|          2|               4.0|
|      3|HAN_o7lxUIYKFZgsN...|2020|          4|               3.8|
|      3|UaPgLbq5KCliW3zWk...|2020|          1|               1.0|
|      4|1wkxaLRz3cIN5HmLU...|2020|          9|               4.6|
|      4|5xxa-QGFs3B_JXkv3...|2020|         38|               3.8|
|      4|fQNQCT_gdKMYZ8J-D...|2020|          6|               5.0|
+-------+--------------------+----+-----------+------------------+
only showing top 10 rows



* Number of businesses with reviews in 2020.

In [55]:
fun_preprocessing_summary(sdf_reviews_by_year)

+-------+--------+--------+
|num_obs|min_year|max_year|
+-------+--------+--------+
|  89770|    2020|    2020|
+-------+--------+--------+



In [56]:
sdf_business.select(['cluster', 'metropolitan_area', 'business_id', 'name', 'review_count', 'stars']).count()

160585

- `sdf_business_union`: Join to `sdf_business` the following tables:
    + `sdf_checkings_by_year`
    + `sdf_tips_by_year`
    + `sdf_reviews_by_year`

## Create business union table

In [57]:
sdf_business_union = (
    # Business list
    sdf_business
        .select([
            'cluster', 
            'metropolitan_area', 
            'categories', 
            'business_id', 
            'name',
            'latitude',
            'longitude',
            'review_count', 
            'stars'])
        # Join number of business checkins
        .join(sdf_checkings_by_year, on = ['cluster','business_id'], how = 'left')
        # Join number of business tips
        .join(sdf_tips_by_year, on = ['cluster','business_id'], how = 'left')
        # Join the number of business reviews
        .join(sdf_reviews_by_year, on = ['cluster','business_id'], how = 'left')
)
sdf_business_union.show(10)

+-------+--------------------+--------------------+--------------------+--------------------+-------------+--------------+------------+-----+----+------------+----+--------+----+-----------+------------------+
|cluster|         business_id|   metropolitan_area|          categories|                name|     latitude|     longitude|review_count|stars|year|num_checkins|year|num_tips|year|num_reviews|mean_stars_reviews|
+-------+--------------------+--------------------+--------------------+--------------------+-------------+--------------+------------+-----+----+------------+----+--------+----+-----------+------------------+
|      0|-5faLoQMJW3422kbY...|Cambridge, Masach...|piercing, tattoo,...|The Boston Tattoo...|   42.3956419|   -71.1226264|         180|  4.0|2020|           2|null|    null|2020|         10|               4.3|
|      0|-VDPtLMUt7yUN_If4...|Cambridge, Masach...|psychologists, he...|Square Medical Gr...|42.3626930968|-71.1909291521|          16|  2.0|null|        null|n

In [58]:
sdf_business_union.printSchema()

root
 |-- cluster: long (nullable = true)
 |-- business_id: string (nullable = true)
 |-- metropolitan_area: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- num_checkins: long (nullable = true)
 |-- year: integer (nullable = true)
 |-- num_tips: long (nullable = true)
 |-- year: integer (nullable = true)
 |-- num_reviews: long (nullable = true)
 |-- mean_stars_reviews: double (nullable = true)



* Number of rows in business_union

In [59]:
sdf_business_union.count()

160585

* Show the top 1000 business categories.

## Create a Restaurants Catalog

Lets create a catalog to classify business categories in resutrants or others.

Steps:
1. From `sdf_business_union` extract the business `categories` field, count and sort in descending order.
2. Save table to CSV file
3. Manually in a spreadsheet application label `is_restaurant`.

In [60]:
# Save business categories to csv.
df_business_categories = (
    sdf_business_union
        .groupBy('categories')
        .count()
        .sort('count', ascending = False)
        .toPandas()
)
df_business_categories.to_csv('data/preprocessing/grouped_categories.csv', index=False)

## Create restaurants table

Read manually classified catalog of restaurants.

In [61]:
sdf_business_categories_catalog = spark.read.csv("data/preprocessing/grouped_categories_catalog.csv", header = True)

In [62]:
# Filter restaurant businesses
sdf_restaurants = (
    sdf_business_union
        .join(sdf_business_categories_catalog.select(['categories', 'is_restrautrant']), on = ['categories'], how = 'left')
        .filter(col('is_restrautrant') == 1)
)

In [63]:
sdf_restaurants.count()

66013

In [64]:
sdf_restaurants.printSchema()

root
 |-- categories: string (nullable = true)
 |-- cluster: long (nullable = true)
 |-- business_id: string (nullable = true)
 |-- metropolitan_area: string (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- num_checkins: long (nullable = true)
 |-- year: integer (nullable = true)
 |-- num_tips: long (nullable = true)
 |-- year: integer (nullable = true)
 |-- num_reviews: long (nullable = true)
 |-- mean_stars_reviews: double (nullable = true)
 |-- is_restrautrant: string (nullable = true)



In [65]:
# Save restaurants
sdf_restaurants.toPandas().to_csv("data/output/restaurants.csv", index = False)

In [66]:
spark.stop()