**Envionment Setup**

- **Step 0:** Setup a spark cluster. In this tutorial I use [GCP dataproc](https://cloud.google.com/dataproc).

    > Setup Guide: [CSEE 4121 2022S HW2 programming](https://csee-4121-2022.github.io/homeworks/hw2.html)

- **Step 1:** Download dataset from [yelp dataset](https://www.yelp.com/dataset/documentation/main) and upload all json files except the `photos` into a GCP bucket. In this case the bucket name is `coms4111`, and I placed it into a directory that jupyterlab can directly access it through `GCS` folder.

    ---

    Below step needs to be done every time you create a new cluster

    ---

- **Step 2:** Clone the repository to the cluster's local disk

In [None]:
!git clone https://github.com/Jace-Yang/yelp_db_clone

- **Step 3:** Download external package in order to parse XML files: spark-xml with version 2.12-0.14.0 to support Spark 3.1.2 and Scala 2.12.

In [1]:
!sudo hdfs dfs -get gs://csee4121/homework2/spark-xml_2.12-0.14.0.jar /usr/lib/spark/jars/
    # Reference: https://csee-4121-2022.github.io/homeworks/hw2.html

> Note: if you are using multiple GCP dataproc nodes, run `sudo hdfs dfs -get gs://csee4121/homework2/spark-xml_2.12-0.14.0.jar /usr/lib/spark/jars/` on every worker VM machines by SSH them.

- **Step 4:** Move data from GS into a HDFS directory every time you create a new cluster. We do this by moving data into the local disk first, then to HDFS!

In [2]:
# Gs -> Local
!mkdir yelp_db_clone/data/
!gsutil cp gs://coms4111/notebooks/jupyter/data/*.json file:///yelp_db_clone/data/

Copying gs://coms4111/notebooks/jupyter/data/yelp_academic_dataset_business.json...
Copying gs://coms4111/notebooks/jupyter/data/yelp_academic_dataset_checkin.json...
Copying gs://coms4111/notebooks/jupyter/data/yelp_academic_dataset_review.json...
Copying gs://coms4111/notebooks/jupyter/data/yelp_academic_dataset_tip.json...  
/ [4 files][  5.5 GiB/  5.5 GiB]   71.7 MiB/s                                   
==> NOTE: You are performing a sequence of gsutil operations that may
run significantly faster if you instead use gsutil -m cp ... Please
see the -m section under "gsutil help options" for further information
about when gsutil -m can be advantageous.

Copying gs://coms4111/notebooks/jupyter/data/yelp_academic_dataset_user.json...
- [5 files][  8.6 GiB/  8.6 GiB]  188.8 MiB/s                                   
Operation completed over 5 objects/8.6 GiB.                                      


In [3]:
# Local -> HDFS
!hdfs dfs -cp -f file:///yelp_db_clone/data/* hdfs:///user/dataproc/

In [4]:
# Check whether data is now in HDFS!
!hdfs dfs -ls hdfs:///user/dataproc/

Found 5 items
-rw-r--r--   1 root hadoop  118863795 2022-05-04 21:39 hdfs:///user/dataproc/yelp_academic_dataset_business.json
-rw-r--r--   1 root hadoop  286958945 2022-05-04 21:39 hdfs:///user/dataproc/yelp_academic_dataset_checkin.json
-rw-r--r--   1 root hadoop 5341868833 2022-05-04 21:40 hdfs:///user/dataproc/yelp_academic_dataset_review.json
-rw-r--r--   1 root hadoop  180604475 2022-05-04 21:40 hdfs:///user/dataproc/yelp_academic_dataset_tip.json
-rw-r--r--   1 root hadoop 3363329011 2022-05-04 21:40 hdfs:///user/dataproc/yelp_academic_dataset_user.json


## Examples

In [1]:
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.functions import col, split, explode, udf

### Get a spark session

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/05 04:26:13 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/05/05 04:26:13 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/05/05 04:26:13 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/05/05 04:26:13 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


### Read Data & Print Schema

In [3]:
review = spark.read.json('hdfs:///user/dataproc/yelp_academic_dataset_review.json')
review.printSchema()

                                                                                

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [4]:
business = spark.read.json('hdfs:///user/dataproc/yelp_academic_dataset_business.json')
business.printSchema()



root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

22/05/05 04:26:56 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


- Here we see that spark allows semi-structure! The data like
    ```
    "hours": {
        "Monday": "10:00-21:00",
        "Tuesday": "10:00-21:00",
        "Friday": "10:00-21:00",
        "Wednesday": "10:00-21:00",
        "Thursday": "10:00-21:00",
        "Sunday": "11:00-18:00",
        "Saturday": "10:00-21:00"
    }
    ```
    has now been converted to a structure type automatically!

In [5]:
user = spark.read.json('hdfs:///user/dataproc/yelp_academic_dataset_user.json')
user.printSchema()



root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)



                                                                                

### Filtering / Group By


We only care about those restaurants that are still open! So let's do a filtering first.

In [6]:
business = business.filter(col('is_open')==1)
business.count()

                                                                                

119698

In [7]:
business.groupby('state').count().show()

                                                                                

+-----+-----+
|state|count|
+-----+-----+
|   AZ| 8108|
|   LA| 7676|
|   NJ| 7031|
|   MI|    1|
|   NV| 6277|
|   ID| 3783|
|   CA| 4065|
|   VT|    1|
|   DE| 1894|
|   MO| 8363|
|   IL| 1765|
|   WA|    2|
|  XMS|    1|
|   IN| 8946|
|   TN| 9600|
|   PA|26289|
|   SD|    1|
|   AB| 4346|
|   MA|    2|
|   TX|    4|
+-----+-----+
only showing top 20 rows



In [8]:
business = business \
    .groupby('state') \
    .count() \
    .filter(col('count') > 10).drop('count') \
    .join(business, on='state') 

### Dealing with semi-stuctural data

The way we select those semi-stuctural columns that have been automatically infered is easy!

In [9]:
business.select('name', 'attributes.RestaurantsDelivery', 'attributes.Wifi', 'attributes.BusinessAcceptsBitcoin').show(10)

                                                                                

+--------------------+-------------------+-------+----------------------+
|                name|RestaurantsDelivery|   Wifi|BusinessAcceptsBitcoin|
+--------------------+-------------------+-------+----------------------+
|       The UPS Store|               null|   null|                  null|
|  St Honore Pastries|              False|u'free'|                  null|
|Perkiomen Valley ...|               null|   null|                  null|
|      Sonic Drive-In|               True|  u'no'|                  null|
|     Famous Footwear|               null|   null|                  null|
|      Temple Beth-El|               null|   null|                  null|
|      Sonic Drive-In|               True|  u'no'|                  null|
|           Marshalls|               null|   null|                  null|
|Vietnamese Food T...|               null|   null|                  null|
|             Denny's|               True|  u'no'|                  null|
+--------------------+----------------

- Hey, whats wrong with bitcoin!

However we also have a column that is not!

In [10]:
business.select('business_id', 'categories').show(truncate = 100)

                                                                                

+----------------------+------------------------------------------------------------------------------+
|           business_id|                                                                    categories|
+----------------------+------------------------------------------------------------------------------+
|mpf3x-BjTdTEA3yCZrAYPw|Shipping Centers, Local Services, Notaries, Mailbox Centers, Printing Services|
|MTSW4McQd7CbVtyjqoe9mw|                         Restaurants, Food, Bubble Tea, Coffee & Tea, Bakeries|
|mWMc6_wTdE0EUBKIGXDVfA|                                                     Brewpubs, Breweries, Food|
|CF33F8-E6oudUQ46HnavjQ|  Burgers, Fast Food, Sandwiches, Food, Ice Cream & Frozen Yogurt, Restaurants|
|n_0UpQx1hsNbnPUSlodU8w|      Sporting Goods, Fashion, Shoe Stores, Shopping, Sports Wear, Accessories|
|qkRM_2X51Yqxk3btlwAQIg|                                           Synagogues, Religious Organizations|
|bBDDEgkFA1Otx9Lfe7BZUQ|              Ice Cream & Frozen Yogurt,

In [11]:
def split_trim(categories_strings):
    '''
    Examples
    --------
    >>> split_trim('Ice Cream & Frozen Yogurt, Fast Food, Burgers, Restaurants, Food')
    ['Ice Cream & Frozen Yogurt', 'Fast Food', 'Burgers', 'Restaurants', 'Food']
    '''
    categories = categories_strings.split(', ')
    return categories

In [12]:
business.select('business_id', 'categories') \
    .withColumn('category', explode(split(col("categories"), ", "))) \
    .show(10, truncate = 200)

+----------------------+------------------------------------------------------------------------------+-----------------+
|           business_id|                                                                    categories|         category|
+----------------------+------------------------------------------------------------------------------+-----------------+
|mpf3x-BjTdTEA3yCZrAYPw|Shipping Centers, Local Services, Notaries, Mailbox Centers, Printing Services| Shipping Centers|
|mpf3x-BjTdTEA3yCZrAYPw|Shipping Centers, Local Services, Notaries, Mailbox Centers, Printing Services|   Local Services|
|mpf3x-BjTdTEA3yCZrAYPw|Shipping Centers, Local Services, Notaries, Mailbox Centers, Printing Services|         Notaries|
|mpf3x-BjTdTEA3yCZrAYPw|Shipping Centers, Local Services, Notaries, Mailbox Centers, Printing Services|  Mailbox Centers|
|mpf3x-BjTdTEA3yCZrAYPw|Shipping Centers, Local Services, Notaries, Mailbox Centers, Printing Services|Printing Services|
|MTSW4McQd7CbVtyjqoe9mw|

In [8]:
# Keep a mapping table
business_category = business.select('business_id', 'categories') \
    .withColumn('category', explode(split(col("categories"), ", "))) \
    .drop('categories')

In [9]:
business_category.groupby('category') \
    .count() \
    .sort(col('count').desc()) \
    .show(20, truncate = 50)



+-------------------------+-----+
|                 category|count|
+-------------------------+-----+
|              Restaurants|52268|
|                     Food|27781|
|                 Shopping|24395|
|            Home Services|14356|
|            Beauty & Spas|14292|
|                Nightlife|12281|
|         Health & Medical|11890|
|           Local Services|11198|
|                     Bars|11065|
|               Automotive|10773|
|Event Planning & Services| 9895|
|               Sandwiches| 8366|
|   American (Traditional)| 8139|
|              Active Life| 7687|
|                    Pizza| 7093|
|             Coffee & Tea| 6703|
|                Fast Food| 6472|
|       Breakfast & Brunch| 6239|
|           American (New)| 6097|
|          Hotels & Travel| 5857|
+-------------------------+-----+
only showing top 20 rows



                                                                                

### Join

Functional dependency

In [15]:
%%time 
review_wide = review.join(business.select('business_id', 
                                          col('name').alias('biz_name'), 
                                          'attributes.RestaurantsTakeOut', 
                                          'categories',
                                          'is_open'),
                          on='business_id',
                          how='inner') \
                     .join(user.select('user_id', 
                                      col('name').alias('user_name'), 
                                      'fans', 
                                      'yelping_since'),
                          on='user_id',
                          how='inner')

CPU times: user 3.85 ms, sys: 1.09 ms, total: 4.95 ms
Wall time: 59.4 ms


- Wait! That fast for this huge join??

In [16]:
%%time 
review_wide.count()



CPU times: user 19.3 ms, sys: 7.86 ms, total: 27.2 ms
Wall time: 17 s


                                                                                

5790989

- But why so slow for counting! Aha, this is because they need to execute the delayed join as well due to the lazy commit!

In [17]:
%%time
# But we can explictly tell DB to store it in memory
review_wide = review_wide.persist(pyspark.StorageLevel.MEMORY_ONLY)

CPU times: user 0 ns, sys: 2.19 ms, total: 2.19 ms
Wall time: 92.3 ms


- But again, the DB is doing nothing..

In [18]:
%%time 
# Lets count!
review_wide.count()



CPU times: user 78.9 ms, sys: 6.72 ms, total: 85.6 ms
Wall time: 42.6 s


                                                                                

5790989

- This command is still taking even longer! Because this time it runs the Join and keep it in the memory

In [19]:
%%time 
# Now, this command is getting much more faster!
review_wide.count()



CPU times: user 3.82 ms, sys: 0 ns, total: 3.82 ms
Wall time: 1.44 s


                                                                                

5790989

- And finally!! Counting 6m+ rows in 1~1.5 seconds! Totally fine to me!

In [20]:
review_wide.show(5)

+--------------------+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+------------------+--------------------+-------+---------+----+-------------------+
|             user_id|         business_id|cool|               date|funny|           review_id|stars|                text|useful|            biz_name|RestaurantsTakeOut|          categories|is_open|user_name|fans|      yelping_since|
+--------------------+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+------------------+--------------------+-------+---------+----+-------------------+
|--RJK834fiQXm21Vp...|aIoUwpy5ZFQXUDxWM...|   0|2019-08-25 23:17:52|    0|QPF7spAqCc-D81GeX...|  1.0|There are new own...|     0|     Pete & Shorty's|              True|Seafood, Diners, ...|      1|    Renee|   0|2018-02-04 20:34:16|
|--UhENQdbuWEh0mU5...|K_s-9Wd6vXSfnxYFz...|   1|2017-08-06 02:42

## Tutorials

To better help you know how spark works. I intentially come up with some super complex query on my own (hope it makes sense to you guys)!

### Top 5 specific categories in each state?

In [6]:
from pyspark.sql.window import Window

In [10]:
temp = business_category \
    .join(business.select('business_id', 'state'), on='business_id')  \
    .join(review.select('review_id', 'business_id'), on='business_id')  \
    .groupby(['state', 'category']) \
    .agg(F.count('review_id').alias('# of reviews')).rdd.toDF()#.show()
#    .groupby('state'=='NY')#dropDuplicates(subset=['state']).show()

                                                                                

In [11]:
temp.show(10)

[Stage 18:>                                                         (0 + 1) / 1]

+-----+--------------------+------------+
|state|            category|# of reviews|
+-----+--------------------+------------+
|   IN|        Pet Groomers|        2440|
|   IN|       Home & Garden|        9245|
|   NJ|        Gas Stations|        1199|
|   NJ|            Car Wash|        1626|
|   IN|              Diners|       11110|
|   PA|  Wheel & Rim Repair|        1016|
|   MO|  Convenience Stores|        1517|
|   IL|             Fashion|         891|
|   AZ|Junk Removal & Ha...|         430|
|   CA|   Party Bus Rentals|         753|
+-----+--------------------+------------+
only showing top 10 rows



                                                                                

In [12]:
temp.withColumn('cate_rank_in_state', 
                F.row_number().over(Window.partitionBy("state").orderBy(col("# of reviews").desc()))) \
    .filter(col('cate_rank_in_state') <= 5) \
    .withColumn('category_info', F.concat(col("category"), F.lit(' ('), col("# of reviews"), F.lit(')'))) \
    .withColumn('cate_rank_in_state', F.concat(F.lit('NO.'), col("cate_rank_in_state"), F.lit(' reviewed category'))) \
    .groupby('state') \
    .pivot("cate_rank_in_state").agg(F.first("category_info")).show(50, truncate = 100)

                                                                                

+-----+------------------------------+------------------------------+----------------------+------------------------------+-------------------------------+
|state|        NO.1 reviewed category|        NO.2 reviewed category|NO.3 reviewed category|        NO.4 reviewed category|         NO.5 reviewed category|
+-----+------------------------------+------------------------------+----------------------+------------------------------+-------------------------------+
|   AZ|          Restaurants (267036)|                  Food (98763)|     Nightlife (77897)|                  Bars (75331)|                Mexican (56284)|
|   LA|          Restaurants (558340)|         Cajun/Creole (201824)|         Food (195245)|            Nightlife (187057)|                  Bars (175300)|
|   NJ|          Restaurants (171186)|                  Food (61877)|     Nightlife (35190)|American (Traditional) (34537)|                   Bars (33005)|
|   MI|       Wheel & Rim Repair (11)|              Auto Repair 

### Top 5 longest consecutive reviewing users and reviewed business

In [None]:
# Fetch the user's id with its
user_review = review \
    .sort('date') \
    .select('review_id', 'user_id', 'text', F.to_date(F.date_format(col("date"), "yyyy-MM-dd kk:hh:ss")).alias("date")) \
    .groupby('user_id', 'date').agg(
        F.last('text').alias('latest_review')) 

result = user_review \
    .withColumn('date_id', 
                F.row_number().over(Window.partitionBy("user_id").orderBy(col("date")))) \
    .withColumn('consecutive_id', col('date') - col('date_id')) \
    .groupby('user_id', 'consecutive_id') \
    .agg(F.count('date_id').alias('# of consecutive days'),
         F.first('date').alias('start'),
         F.last('date').alias('end')
        ) \
    .drop('consecutive_id') \
    .sort(col('# of consecutive days').desc()) \
    .limit(5).rdd.toDF()

In [None]:
user.select('user_id', 'name', 'yelping_since') \
    .join(result, on='user_id') \
    .join(user_review.withColumnRenamed('date', 'end'), on=['user_id', 'end']) \
    .sort(col('# of consecutive days').desc()) \
    .select('name', 'yelping_since', '# of consecutive days', 'start', 'end', 'latest_review') \
    .show(truncate = 60)

- https://sparkbyexamples.com/pyspark/pyspark-sql-date-and-timestamp-functions/

In [None]:
# Fetch the business's id with its
business_review = review \
    .sort('date') \
    .select('review_id', 'business_id', 'text', F.to_date(F.date_format(col("date"), "yyyy-MM-dd kk:hh:ss")).alias("date")) \
    .groupby('business_id', 'date').agg(
        F.last('text').alias('latest_review')) 

In [None]:
result_biz = business_review \
    .withColumn('date_id', 
                F.row_number().over(Window.partitionBy("business_id").orderBy(col("date")))) \
    .withColumn('consecutive_id', col('date') - col('date_id')) \
    .groupby('business_id', 'consecutive_id') \
    .agg(F.count('date_id').alias('# of consecutive days'),
         F.first('date').alias('start'),
         F.last('date').alias('end')
        ) \
    .drop('consecutive_id') \
    .sort(col('# of consecutive days').desc()) \
    .limit(5).rdd.toDF()

In [None]:
business.select('business_id', 'name', 'address', 'city', 'state') \
    .join(result_biz, on='business_id') \
    .join(business_review.withColumnRenamed('date', 'end'), on=['business_id', 'end']) \
    .sort(col('# of consecutive days').desc()) \
    .select('name', 'address', 'city', 'state', '# of consecutive days', 'start', 'end', 'latest_review') \
    .show(truncate = 50)

### Popular VS Unpopular!

Inspired by the question 5 of our [Project 2](https://github.com/w4111/project2-s22/blob/main/project2.ipynb), lets define 4 categories of businesses! For a given business B, we will use the number of reviews on B as the first metric, and the average stars those reviews give to the B as the second metric. Then we can classify each user as follows:

- High stars, high amount of reviews   (**popular businesses**)
- High stars, low amount of reviews
- Low stars, high amount of reviews
- Low stars, low amount of reviews  (**unpopular businesses**)

We define the stars and amount of reviews to be high or low based on the rules below:
   
- 1) If `stars < avg(stars of all business in local)` then indegree is said to be low for the user, else it is considered high. **A user is in the graph if it's either src or dst of an edge.**
    
    -  A `valid` review are reviews sent by `active` users. An `active` user is user that (1) registered in 2021 or later (2) registered before 2021 but sent at least 1 review every year since he/she registered.
    
    - A `active` restaurants are those restaurant with at least 1 valid review.
    
- 2) If `avg(likes of all tweets for the user in the graph) < avg(likes for all tweets in tweets table)`, then the average number of likes is said to be low for the user, else it is considered high. A user without tweets has avg like 0.

Now, compute the following: given all tweets by unpopular users, what percentage of them mention a popular user?  We will only consider users in the graph. 

You can use temporary tables to do this question (no need to implement in a single query). Your final output should contain the column:
- unpopular_popular

Cast your final result as DOUBLE in unpopular_popular. For instance, if 50% of tweets by unpopular users mention a popular user, the final output should be 0.5.

In [None]:
review.select(F.year(F.date_format(col("date"), "yyyy-MM-dd kk:hh:ss")).alias("year")) \
    .groupby('year') \
    .count() \
    .sort('year') \
    .show()

In [None]:
# Calculate 'age' of users
user_age = user.select('user_id', (2021 - F.year(F.date_format(col("yelping_since"), "yyyy-MM-dd kk:hh:ss"))).alias("user_age"))
user_age.show()

- For those registered in 2021 and 2022, their `age` will be 0 and 1! Therefore if they have reviewed a restaurant since then, they are active user because 1 >= 1 and 1 >= 0!

In [None]:
# Fetch activate users!
active_user = review.select('user_id', F.year(F.date_format(col("date"), "yyyy-MM-dd kk:hh:ss")).alias("year")) \
    .groupby('user_id') \
    .agg(F.countDistinct('year').alias('n_reviewed_year')) \
    .join(user_age, on='user_id') \
    .filter(col('n_reviewed_year') >= col('user_age')).rdd.toDF()
active_user.show()

In [None]:
valid_reviews = review.join(active_user, on='user_id').join(business.select('business_id', 'state'), on='business_id').rdd.toDF()
state_benchmark = valid_reviews \
    .groupby('state') \
    .agg((F.count('review_id') / F.countDistinct('business_id')).alias("benchmark # of valid reviews"),
          F.mean('stars').alias("benchmark average Stars")).rdd.toDF()
state_benchmark.show()

In [None]:
business_scored = valid_reviews.groupby('business_id', 'state') \
    .agg((F.count('review_id')).alias("# of valid reviews"),
          F.mean('stars').alias("average Stars")) \
    .join(state_benchmark, on='state').rdd.toDF()

In [None]:
pop_biz_by_state = business_scored \
    .filter((col('# of valid reviews') >= col('benchmark # of valid reviews')) &
            (col('average Stars') >= col('benchmark average stars'))) \
    .groupby('state') \
    .agg(F.count('business_id').alias('n_pop_biz')).rdd.toDF()

In [None]:
unpop_biz_by_state = business_scored \
    .filter((col('# of valid reviews') < col('benchmark # of valid reviews')) &
            (col('average Stars') < col('benchmark average stars'))) \
    .groupby('state') \
    .agg(F.count('business_id').alias('n_unpop_biz')).rdd.toDF()

In [None]:
pop_biz_by_state \
    .join(unpop_biz_by_state, on='state') \
    .withColumn('Ratio (%)', F.lit(100) * col('n_pop_biz') / (col('n_pop_biz') + col('n_unpop_biz'))) \
    .withColumn('Ratio (%)', F.format_number(col('Ratio (%)'), 2)) \
    .sort(col('Ratio (%)').desc()) \
    .show()

### What do they mean by elite?

In this part, we focus on how elite users‘ review v.s. non-elite users' behave differently!

Yelp [elite](https://www.yelp.com/elite) squad are those users in a diverse community of passionate writers, photographers, and adventurers. So how would they perform different from normal users in terms of word of choice, # of photos used per review? Let's find out!

In [13]:
user.select('user_id', 'elite').show(8)

+--------------------+--------------------+
|             user_id|               elite|
+--------------------+--------------------+
|qVc8ODYU5SZjKXVBg...|                2007|
|j14WgRoU_-2ZE1aw1...|2009,2010,2011,20...|
|2WnXYQFK0hXEoTxPt...|2009,2010,2011,20...|
|SZDeASXq7o05mMNLs...|      2009,2010,2011|
|hA5lMy-EnncsH4JoR...|                    |
|q_QQ5kBBwlCcbL1s4...|2006,2007,2008,20...|
|cxuxXkcihfCbqt5By...|                    |
|E9kcWJdJUHuTKfQur...|                    |
+--------------------+--------------------+
only showing top 8 rows



**Step 1: Tag each review about whether it is sent by a elite user in his/her elite year!**

In [14]:
user_elite_year = user.select('user_id', 'elite') \
    .withColumn('elite', F.regexp_replace('elite', ',20,20,', ',2020,')) \
    .withColumn('elite_year', explode(split(col("elite"), ","))) \
    .filter(col('elite_year')!='')

In [15]:
user_elite_year.show(4, truncate=100)

+----------------------+----------------------------------------------------------------+----------+
|               user_id|                                                           elite|elite_year|
+----------------------+----------------------------------------------------------------+----------+
|qVc8ODYU5SZjKXVBgXdI7w|                                                            2007|      2007|
|j14WgRoU_-2ZE1aw1dXrJg|2009,2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,2020,2021|      2009|
|j14WgRoU_-2ZE1aw1dXrJg|2009,2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,2020,2021|      2010|
|j14WgRoU_-2ZE1aw1dXrJg|2009,2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,2020,2021|      2011|
+----------------------+----------------------------------------------------------------+----------+
only showing top 4 rows



In [16]:
review_temp = review.withColumn('year', F.year(F.date_format(col("date"), "yyyy-MM-dd kk:hh:ss")))
review_elite_tagged = review_temp \
    .join(user_elite_year, on=review_temp.year == user_elite_year.elite_year, how='left') \
    .withColumn('is_elite_year', (F.when(col("elite_year").isNull(), 0)
                                   .when(col("elite_year").isNotNull(), 1)
                                   .otherwise(None))).rdd.toDF()

                                                                                

**Step 2: Calculate what are the word Tag each review about whether it is sent by a elite user in his/her elite year!**

In [18]:
review_elite_tagged.count()#.show()

[Stage 61:>               (2 + 31) / 54][Stage 62:>                (0 + 1) / 54]

KeyboardInterrupt: 

In [None]:
document_frequency = review_elite_tagged \
    .limit(50000) \
    .withColumn('word', F.explode(F.split("text", " "))) \
    .withColumn('word', F.lower('word')) \
    .groupby('word') \
    .count().rdd.toDF()

In [None]:
document_frequency.show()

In [None]:
term_frequency = review_elite_tagged \
    .limit(50000) \
    .withColumn('word', F.explode(F.split("text", " "))) \
    .withColumn('word', F.lower('word')) \
    .groupby('is_elite_year', 'stars', 'word') \
    .count().rdd.toDF()

In [None]:
term_frequency.show()