## Yelp Data Pipeline

### Source Data 

https://www.yelp.com/dataset

### Setup

1. Install pipenv

```pip install pipenv```

2. Create virtual environment

```pipenv install```

3. Start the shell

```pipenv shell```

4. Copy virutal envirnoment name by using the location printed after launching the shell

```. /Users/user/.local/share/virtualenvs/my-venv-name/bin/activate```

5. Create a kernel for the jupyter notebook to use virtual environment

```
pipenv install ipykernel
python -m ipykernel install --user --name=my-venv-name
```

6. Set the kernel to my-venv-name by going to the `notebook -> Kernel -> Change Kernel -> my-venv-name`

## Staging

First, I will stage the data into parquet format for better performance. Parquet files are optimized to work with complex data in bulk and features different ways for efficient data compression and encoding types. This approach is best especially for those queries that need to read certain columns from a large table. Parquet can only read the needed columns therefore greatly minimizing the IO.

In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext.getOrCreate()
sc.setLogLevel("ERROR")

23/05/22 13:10:59 WARN Utils: Your hostname, Gauthams-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.12 instead (on interface en0)
23/05/22 13:10:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/22 13:11:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("PySpark Read JSON") \
    .getOrCreate()

datasets = ['business', 'review', 'checkin', 'tip', 'user']
# Reading JSON file into dataframe    
for name in datasets:
    df = spark.read \
        .format("json") \
        .option("header", "true") \
        .option("delimiter", ",") \
        .option("inferSchema", "true") \
        .load(f"./data/yelp/{name}.json")
    df.write \
            .format("parquet") \
            .save(f"./data/yelp/processed/{name}/", mode="overwrite")



## Transformation

In [4]:
business = spark.read.parquet("./data/yelp/processed/business/")
review = spark.read.parquet("./data/yelp/processed/review/")
user = spark.read.parquet("./data/yelp/processed/user/")

In [5]:
business.printSchema()
review.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

### Simple Left Join

As seen below, the average time taken reaches 9.23 ms when using a join without any optimization. We will try to optimize further using partitions and broadcast.

In [6]:
%timeit review.join(business, review.business_id==business.business_id, "left")

9.23 ms ± 1.97 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [7]:
business.count()

150346

In [8]:
review.count()

6990280

### Partition RDD by join key

Partitioning by the business id as it is an uniformly distributed key across both datasets, this will enable easy lookup when joining.

In [9]:
review_partitioned = review.repartition(1000, "business_id")
business_partitioned = business.repartition(1000, "business_id")

%timeit review_partitioned.join(business_partitioned, review.business_id==business.business_id, "left")

4.81 ms ± 251 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


### Further performance tuning by broadcasting dimension

The businesses table is a medium size table that can be copied to every partition, so the spark worker nodes do not have to rely on the driver node to get data from the business RDD.

In [10]:
from pyspark.sql.functions import broadcast
broadcast(business_partitioned)
%timeit review_partitioned.join(business_partitioned, review.business_id==business.business_id, "left")

business_and_review = review_partitioned.join(business_partitioned, review.business_id==business.business_id, "left")
business_and_review.printSchema()

4.4 ms ± 60.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- 

## Save transformed dataset

In [None]:
business_and_review.write.format("parquet") \
    .save(f"./data/yelp/transformed/transformed/", mode="overwrite")