# Creation of ETL Pipeline using Pyspark
Since the inital data is a text file, we need to create an etl pipeline that read it and load it to a database later.

In [1]:
import findspark
import pyspark
import random

from pyspark.sql import SparkSession
from pyspark.sql import functions as fun
from pyspark.sql.functions import col
from pyspark.sql.window import Window

import os

# Create a spark session

In [2]:
spark = SparkSession.builder.config("spark.jars", "postgresql-42.2.26.jar") \
        .appName("Recs").getOrCreate()

KeyboardInterrupt: 

# Extract data
First, we need to extract the data and further put it to pyspark dataframe.

In [None]:
offering_dir = os.path.join('data', 'offering.txt')
review_dir = os.path.join('data', 'review.txt')

In [None]:
with open(review_dir) as f:
    review = f.readline()

In [None]:
print(review)

In [None]:
offering = spark.read.option("wholeFile", True).option("mode", "PERMISSIVE").json(offering_dir)
review = spark.read.option("wholeFile", True).option('mode', 'PERMISSIVE').json(review_dir)

# Cleaning the data

The first step is to drop all duplicates row in the dataframe.

In [None]:
offering = offering.dropDuplicates()
review = review.dropDuplicates()

Let's see if the file contains nested json.

In [None]:
offering.printSchema()
review.printSchema()

As we can see in the schema above of review dataframe, there is a nested data. We need to flatten those nested data, to load it in a SQL database.

In [23]:
offering = (offering
            .withColumn('locality', fun.col('address.locality'))
            .withColumn('postal_code', fun.col('address.postal-code'))
            .withColumn('region', fun.col('address.region'))
            .withColumn('street_address', fun.col('address.street-address'))
            .drop(*['address'])
           )
review = (review
            .withColumn('author_id', fun.col('author.id'))
            .withColumn('author_num_cities', fun.col('author.num_cities'))
            .withColumn('author_helpful_votes', fun.col('author.num_helpful_votes'))
            .withColumn('author_num_reviews', fun.col('author.num_reviews'))
            .withColumn('author_num_type_reviews', fun.col('author.num_type_reviews'))
            .withColumn('author_username', fun.col('author.username'))
            .withColumn('ratings_bussiness_service', fun.col('ratings.business_service_(e_g_internet_access)'))
            .withColumn('ratings_check_in_front_desk', fun.col('ratings.check_in_front_desk'))
            .withColumn('ratings_cleanliness', fun.col('ratings.cleanliness'))
            .withColumn('ratings_location', fun.col('ratings.location'))
            .withColumn('ratings_overall', fun.col('ratings.overall'))
            .withColumn('ratings_rooms', fun.col('ratings.rooms'))
            .withColumn('ratings_service', fun.col('ratings.service'))
            .withColumn('ratings_sleep_quality', fun.col('ratings.sleep_quality'))
            .withColumn('ratings_value', fun.col('ratings.value'))
            .drop(*['author', 'ratings'])
           )

The next step is to cast the date which is in string format to timestamp.

In [24]:
review = (review.withColumn("date", fun.to_date("date", "MMMM d, yyyy"))
         .withColumn("period_stayed", fun.to_date("date_stayed", "MMMM yyyy")))
review.printSchema()

root
 |-- date: date (nullable = true)
 |-- date_stayed: string (nullable = true)
 |-- id: long (nullable = true)
 |-- num_helpful_votes: long (nullable = true)
 |-- offering_id: long (nullable = true)
 |-- text: string (nullable = true)
 |-- title: string (nullable = true)
 |-- via_mobile: boolean (nullable = true)
 |-- author_id: string (nullable = true)
 |-- author_num_cities: long (nullable = true)
 |-- author_helpful_votes: long (nullable = true)
 |-- author_num_reviews: long (nullable = true)
 |-- author_num_type_reviews: long (nullable = true)
 |-- author_username: string (nullable = true)
 |-- ratings_bussiness_service: double (nullable = true)
 |-- ratings_check_in_front_desk: double (nullable = true)
 |-- ratings_cleanliness: double (nullable = true)
 |-- ratings_location: double (nullable = true)
 |-- ratings_overall: double (nullable = true)
 |-- ratings_rooms: double (nullable = true)
 |-- ratings_service: double (nullable = true)
 |-- ratings_sleep_quality: double (nullab

Then encode `the author_id` to numerical value.

In [25]:
review = review.withColumn("author_num_id", fun.dense_rank().over(Window.orderBy("author_id")))

# Load to a database
In order to exploit the data in the optimal way, we gonna load it to a Postgre database.

In [26]:
print('Saving offering...')
try :
    offering.write.format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/postgres") \
        .option("driver", "org.postgresql.Driver").option("dbtable", "offering") \
        .option("user", "postgres").option("password", "adm!@#").save()
    print("Save complete !")
except:
    print("Table offering already exist")

Saving offering...
Table offering already exist


In [27]:
review.write.mode("overwrite").format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/postgres") \
    .option("driver", "org.postgresql.Driver").option("dbtable", "review") \
    .option("user", "postgres").option("password", "adm!@#").save()
print("Save complete !")

Save complete !


In [28]:
spark.stop()