# Yelp: Make Test Data

In [2]:
!spark-shell --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.1
      /_/
                        
Using Scala version 2.12.13, OpenJDK 64-Bit Server VM, 1.8.0_292
Branch HEAD
Compiled by user  on 2021-06-23T22:10:53Z
Revision e9c1449899ac58a10ea28ca6f1e8ed6c7c254d54
Url https://bigdataoss-internal.googlesource.com/third_party/apache/spark
Type --help for more information.


<details>
<summary><b>Table of Contents</b> (click to open)</summary>
<!-- MarkdownTOC -->

1. [Load Full Dataset](#data)
1. [Make Input Test Data](#Source-Test-Data)
1. [Make Expected Test Data](#Make-Expected-Test-Data)
    1. [Testcase: Top Businesses](#Testcase:-Top-Businesses)
    1. [Testcase: Top Restaurants](#Testcase:-Top-Restaurants)
 1. [Copy Test Data to Project Folder](#Copy-Test-Data-to-Project-Folder)
    1. [Input Test Data](#Input-Test-Data)
    1. [Expected Test Data](#Expected-Test-Data)
    
<!-- /MarkdownTOC -->
</details>

In [11]:
# Import Required Libraries
import pyspark.sql.functions as F

### Load Data  <a class="anchor" id="data"></a>

Load the data from google cloud storage to dataproc

In [3]:
file_review = "gs://proj-spark/yelp-dataset/yelp_academic_dataset_review.json"
file_business = "gs://proj-spark/yelp-dataset/yelp_academic_dataset_business.json"
file_users = "gs://proj-spark/yelp-dataset/yelp_academic_dataset_user.json"
file_checkin = "gs://proj-spark/yelp-dataset/yelp_academic_dataset_checkin.json"
file_tip = "gs://proj-spark/yelp-dataset/yelp_academic_dataset_tip.json"

In [4]:
df_review = spark.read.format('json').load(file_review)
df_review.createOrReplaceTempView('reviewTable')

df_business = spark.read.format('json').load(file_business)
df_business.createOrReplaceTempView('businessTable')

df_users = spark.read.format('json').load(file_users)
df_users.createOrReplaceTempView('usersTable')

df_checkin = spark.read.format('json').load(file_checkin)
df_checkin.createOrReplaceTempView('checkinTable')

df_tip = spark.read.format('json').load(file_tip)
df_tip.createOrReplaceTempView('checkinTable')

### Source Test Data

In [12]:
expected_data_path = '/home/bilal/study_private/tests/test_data/expected_data/'
source_data_path = '/home/bilal/study_private/tests/test_data/source_data/'

In [55]:
def make_checkin_data(df_checkin):
    df_checkin_test = df_checkin.sample(withReplacement=False, fraction=0.05, seed=50)\
                                .limit(200)\
                                .coalesce(1)
    df_checkin_test.write.format('json').save('source_data/yelp_academic_dataset_checkin.json')

In [59]:
def make_business_data(df_business):
    df_checkin_test = spark.read.format('json').load('source_data/yelp_academic_dataset_checkin.json')
    joinExpr = df_business.business_id == df_checkin_test.business_id
    df_business_test = df_business.join(F.broadcast(df_checkin_test), joinExpr, 'inner')\
                                  .drop(df_checkin_test.business_id)\
                                  .coalesce(1)
    df_business_test.write.format('json').save('source_data/yelp_academic_dataset_business.json')

In [60]:
def verify(df1, df2, key):
    df1_key = df1.groupby(key).agg(F.count(key))
    df2_key = df2.groupby(key).agg(F.count(key))
    if df1_key.subtract(df2_key).rdd.isEmpty():
        return df2_key.subtract(df1_key).rdd.isEmpty()
    return False

In [61]:
make_checkin_data(df_checkin)
make_business_data(df_business)

In [62]:
df_checkin_test = spark.read.format('json').load('source_data/yelp_academic_dataset_checkin.json')
df_business_test = spark.read.format('json').load('source_data/yelp_academic_dataset_business.json')

In [127]:
# verify that both dataframes have same business_ids
verify(df_checkin_test, df_business_test, 'business_id')

True

###  Make Expected Test Data

In [34]:
# load data from gcs to dataproc
df_checkin_test = spark.read.format('json')\
            .load('gs://proj-spark/yelp-test-dataset/yelp_academic_dataset_checkin.json')
df_business_test = spark.read.format('json')\
            .load('gs://proj-spark/yelp-test-dataset/yelp_academic_dataset_business.json')

#### Testcase: Top Businesses

In [2]:
# pandas dataframes
pdf_checkin_test = df_checkin_test.toPandas()
pdf_business_test = df_business_test.toPandas()

pdf_checkin_test['checkin_count'] = pdf_checkin_test.date.str.split(',').map(len)
pdf_checkin_bs = pdf_checkin_test.merge(pdf_business_test, on='business_id')\
                                 .sort_values(by=['checkin_count','business_id'], ascending=False)

pdf_expected_data_tc2 = pdf_checkin_bs[['business_id', 'name', 'city', 'state', 'stars', 'checkin_count', 'review_count']]

In [3]:
pdf_expected_data_tc2.to_csv('top_businesses.csv', index=False)

#### Testcase: Top Restaurants

In [4]:
pdf_checkin_rest = pdf_checkin_bs[pdf_checkin_bs.categories.str.contains('res', case=False)]
pdf_checkin_rest = pdf_checkin_rest[pdf_checkin_rest.state=='MA']
pdf_checkin_rest = pdf_checkin_rest.sort_values(by=['checkin_count','business_id'], ascending=False)

pdf_expected_data_tc3 = pdf_checkin_rest[['business_id', 'name', 'city', 'state', 'stars', 'checkin_count', 'review_count']]

In [6]:
pdf_expected_data_tc3.to_csv('top_restaurants.csv', index=False)

#### Testcase: Business Categories

In [35]:
# transform categories to category
pdf_business_test = df_business_test.toPandas()
pdf_business_test['category'] = pdf_business_test.categories.str.split(', ')
pdf_business_test = pdf_business_test.explode('category')

# aggregate and sort
pdf_cat_grp = pdf_business_test.groupby('category')[['business_id']]\
                               .agg('count')\
                               .rename(columns={'business_id':'business_count'})\
                               .reset_index()
pdf_expected_data_tc4 = pdf_cat_grp.sort_values(['business_count','category'], ascending=False)

In [53]:
pdf_expected_data_tc4.to_csv('business_categories.csv', index=False)

In [37]:
# Split the categories 
df_business_test = df_business_test.withColumn('categories_splitted', F.split(F.col('categories'),', '))\
    .withColumn('category', F.explode('categories_splitted'))

cat_grp = df_business_test.groupBy('category')\
    .agg(F.count('business_id').alias('business_count'))\
    .orderBy(['business_count','category'], ascending=False)

In [43]:
pdf_expected_data_tc4.reset_index(drop=True)

Unnamed: 0,category,business_count
0,Restaurants,71
1,Food,46
2,Shopping,29
3,Beauty & Spas,26
4,Nightlife,18
...,...,...
275,Airport Lounges,1
276,Air Duct Cleaning,1
277,Adult Education,1
278,Acupuncture,1


### Copy Test Data to Project Folder

### Input Test Data

Copy Source test data to project directory and google cloud storage

In [None]:
# list files in source data
!hdfs dfs -ls  /user/root/source_data

# copy source_data from HDFS to Master Node
!hdfs dfs -copyToLocal /user/root/source_data source_data

# Move source_data to project folder
!mv source_data/yelp_academic_dataset_checkin.json/part-*.json \
    $source_data_path/yelp_academic_dataset_checkin.json
!mv source_data/yelp_academic_dataset_business.json/part-*.json \
    $source_data_path/yelp_academic_dataset_business.json

# Copy source_data to google cloud storage
!gsutil cp $source_data_path/yelp_academic_dataset_checkin.json \
           gs://proj-spark/yelp-test-dataset
!gsutil cp $source_data_path/yelp_academic_dataset_business.json \
           gs://proj-spark/yelp-test-dataset
    
# Delete Data from HDFS 
!hdfs dfs -rm -r  /user/root/source_data

# Delete Data from Master Node
!rm -r source_data

### Expected Test Data

Copy expected data to project folder

In [54]:
!mv top_businesses.csv $expected_data_path/top_businesses.csv
!mv top_restaurants.csv $expected_data_path/top_restaurants.csv
!mv business_categories.csv $expected_data_path/business_categories.csv