# Airbnb in Rio de Janiero
 

## **Part I**: ETL
Extract data from AWS bucket and load into postgreSQL database 

###  Setup

###### CoLab

In [1]:
# Find the latest version of spark from http://www-us.apache.org/dist/spark/ 
spark_version = 'spark-3.0.1'

# Set Environment Variables
import os
os.environ['SPARK_VERSION'] = spark_version
os.environ['BASE_URL'] = 'http://www-us.apache.org/dist/spark'
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Install spark, java, and findspark
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q $BASE_URL/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Initialize spark session
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [1 InRelease 14.2 kB/88.7                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [1 InRelease 63.4 kB/88.70% [Connecting to archive.ubuntu.com (91.189.88.142)] [Waiting for headers] [Wa                                                                               Hit:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
                                                                               0% [Waiting for headers] [Waiting for headers] [Waiting for headers]0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                                                               Ign:4 https://dev

###### Postgres

In [2]:
# Postgres version
pg_version = 'postgresql-42.2.16.jar'

# Download java database connector for postgres
os.environ['PG_VERSION'] = pg_version
!wget https://jdbc.postgresql.org/download/$PG_VERSION

--2021-01-17 23:14:40--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.1’


2021-01-17 23:14:40 (6.61 MB/s) - ‘postgresql-42.2.16.jar.1’ saved [1002883/1002883]



###### Spark session

In [3]:
from pyspark.sql import SparkSession

# Create session, add postgres drivers to path 
spark = SparkSession\
    .builder\
    .appName("reviewsETL")\
    .config('spark.driver.extraClassPath', f'/content/{pg_version}')\
    .getOrCreate()


### Extract, transform, load

###### Extract from S3

In [4]:
from config import s3_url
from pyspark import SparkFiles
import pandas as pd

# Url for s3 bucket
file = "reviews.csv.gz"

# Add file to context
spark.sparkContext.addFile(s3_url + file)

# Set the following options to avoid issues parsing multiline text
df = spark.read.csv(
    SparkFiles.get(file), 
    header=True,
    multiLine=True,
    escape='"')

df.show(5)

+----------+------+----------+-----------+-------------+--------------------+
|listing_id|    id|      date|reviewer_id|reviewer_name|            comments|
+----------+------+----------+-----------+-------------+--------------------+
|     17878| 64852|2010-07-15|     135370|          Tia|This apartment is...|
|     17878| 76744|2010-08-11|      10206|         Mimi|we had a really g...|
|     17878| 91074|2010-09-06|      80253|          Jan|Staying in Max ap...|
|     17878|137528|2010-11-12|     230449|        Orene|In general very g...|
|     17878|147594|2010-12-01|     219338|        David|The apt was nice ...|
+----------+------+----------+-----------+-------------+--------------------+
only showing top 5 rows



##### Aggregate, collect, and load into RDS

###### Reviews


In [5]:
reviews = df.select(['id','comments'])
reviews.show(3)

+-----+--------------------+
|   id|            comments|
+-----+--------------------+
|64852|This apartment is...|
|76744|we had a really g...|
|91074|Staying in Max ap...|
+-----+--------------------+
only showing top 3 rows



In [6]:
reviews.printSchema()

root
 |-- id: string (nullable = true)
 |-- comments: string (nullable = true)



In [7]:
from pyspark.sql.types import IntegerType

# Convert 'id' from string to integer
reviews = reviews.withColumn('id',reviews['id'].cast(IntegerType()))

reviews.show(3)

+-----+--------------------+
|   id|            comments|
+-----+--------------------+
|64852|This apartment is...|
|76744|we had a really g...|
|91074|Staying in Max ap...|
+-----+--------------------+
only showing top 3 rows



In [8]:
from config import properties, jdbc_url

# Load into database
reviews.write.jdbc(
    table='reviews',
    properties=properties,
    mode='append',
    url=jdbc_url
)

###### Listings

In [9]:
# Group by `listing_id` to get number of reviews for each listing
listings = df.groupBy('listing_id')\
    .agg({'id':'count'})\
    .withColumnRenamed('count(id)','number_of_reviews')\
    .withColumnRenamed('listing_id','id')

listings.show(5)

+-------+-----------------+
|     id|number_of_reviews|
+-------+-----------------+
| 190204|               46|
| 324679|               13|
| 409183|               18|
| 913832|               15|
|1412255|               33|
+-------+-----------------+
only showing top 5 rows



In [10]:
listings.printSchema()

root
 |-- id: string (nullable = true)
 |-- number_of_reviews: long (nullable = false)



In [11]:
# Fix id column to match current database schema
listings = listings.withColumn('id', listings['id'].cast(IntegerType()))
listings.printSchema()

root
 |-- id: integer (nullable = true)
 |-- number_of_reviews: long (nullable = false)



In [12]:
from config import properties, jdbc_url

# Change mode if needed
mode = 'overwrite'

# Write to database
listings.write.jdbc(
    table='listings',
    properties=properties,
    url=jdbc_url,
    mode=mode,
)

###### Reviewers

In [13]:
# Group by individual reviewers
reviewers = df.groupBy('reviewer_id').agg({'id':'count'})
reviewers.show(3)

+-----------+---------+
|reviewer_id|count(id)|
+-----------+---------+
|      21452|        1|
|   13159335|        1|
|    8742267|        1|
+-----------+---------+
only showing top 3 rows



In [14]:
# Get total number of reviews left by each unique reviewer
reviewers = reviewers\
    .withColumnRenamed('count(id)','number_of_reviews')\
    .withColumnRenamed('reviewer_id','id')
    
reviewers.show(3)

+--------+-----------------+
|      id|number_of_reviews|
+--------+-----------------+
|   21452|                1|
|13159335|                1|
| 8742267|                1|
+--------+-----------------+
only showing top 3 rows



In [15]:
reviewers.printSchema()

root
 |-- id: string (nullable = true)
 |-- number_of_reviews: long (nullable = false)



In [16]:
# Fix id column to match current schema
reviewers = reviewers.withColumn('id', reviewers['id'].cast(IntegerType()))
reviewers.printSchema()

root
 |-- id: integer (nullable = true)
 |-- number_of_reviews: long (nullable = false)



In [17]:
reviewers.write.jdbc(
    table='reviewers',
    properties=properties,
    url=jdbc_url,
    mode=mode,
)