# Project: Capstone on EMR ( Spark & Co. @ AWS )
### Data Engineering Capstone Project


#### Project Summary
Running own Capstone project with public data based on "Inside Airbnb", in specific with the data of Mexico City. 
(See chapter "Describe and Gather Data" for more details).

The intention of this project is to provide to an analytics team access to the data model that allows easily to query and extract insights. It means at the end, have combined tables as a efficient and fast source for a 'Self Service Report'-Dashboard to be done with Microsoft Power BI (my daily work) or maybe featured on [AWS QuickSight](https://aws.amazon.com/de/quicksight/). 

**Tipp:** Microsoft is bringing in Mai '23 a library which does allow to compell [Microsoft Power BI reports DIRECTLY at the Jupyter Notebook](https://powerbi.microsoft.com/en-us/blog/create-power-bi-reports-in-jupyter-notebooks/s) (_Whou!, absolutely a game changer_).

_(more at the @ README.md at GitHub repo)_  


#### Overview and Steps
The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

> <font color="red">Clear fields at [</font>aws-access.cfg<font color="red">] before submitting project for review!  
>     KEY = 'YOUR_AWS_KEY'  
>     SECRET='YOUR_AWS_SECRET'</font>

Version 1 **(LOC = Udacity Workspace)** - Revision 06 - 2023/04/28 - Mr Morphy - GitHub Profile (https://github.com/MrMorphy)  
GitHub Project - udacity-course-proj-final-capstone (https://github.com/mrmorphy/udacita-course-proj-final-capstone)

### Step 0: Create Base Config
* IAM-Role from AWS at (aws-access.cfg) workspace file required. 

Load following libraries to execute code

In [1]:
!python --version

Python 3.6.3


In [2]:
# Do all imports and installs here
# required for reading out config-file [*.cfg]
import configparser
from datetime import datetime
# required for setting environment variables, like IAM-Authentification
import os

from pyspark.sql.functions import udf, col   #, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

from pyspark.sql.types import StructType  as R,   StructField as Fld, \
                              DoubleType  as Dbl, StringType  as Str, \
                              IntegerType as Int, DateType    as Date, \
                              BooleanType as Bol, DecimalType as Dec, \
                              FloatType   as Flt, TimestampType as TS, \
                              LongType    as Lng

import pandas as pd

In [3]:
# DEBUG
# os.environ # show all environment settings at background

In [3]:
# Using S3 Bucket from AWS, following settings required (?)
# https://knowledge.udacity.com/questions/946931
# 
# On Errormessage ... while trying to write parquet() files to S3
# 
#   Py4JJavaError: An error occurred while calling o1220.parquet.
#    : java.lang.NoClassDefFoundError: org/apache/hadoop/fs/StorageStatistics
# 

os.environ["JAVA_HOME"]   = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"]        = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:" \
                            "/opt/conda/bin:/usr/local/sbin:/usr/local/bin:" \
                            "/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"

os.environ["SPARK_HOME"]  = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

In [4]:
# Optional
import numpy as np

# Pyplot - Bar Graph, Line Graph, Others
#import matplotlib.pyplot as plt

The credentials to access the AWS Services will be readout from the config file.  
This access rights are required to access the S3 Bucket as a "Data Lake" (see IAM Role with AmazonS3FullAccess).  

Then the Spark Builder will be initiated and verify that the Spark job is running.   

> **Optional:**   
> It could be additional calculating power to the Spark Build requested, since these files are relatively heavy (~4 Mio rows) to upload and download them into the S3 Bucket.  
>         `.config("spark.executor.instances", 10) \`  
>         `.config("spark.executor.memory", "8g") \`  


In [5]:
# Access to AWS

config = configparser.ConfigParser()
config.read('aws-access.cfg')
print(">> Read Out Config-Infos from [aws-access.cfg]")

os.environ['AWS_ACCESS_KEY_ID']     = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

# AWS_ACCESS_KEY_ID     = os.environ['AWS_ACCESS_KEY_ID']
# AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']

AWS_ACCESS_KEY_ID     = config['AWS']['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = config['AWS']['AWS_SECRET_ACCESS_KEY']


>> Read Out Config-Infos from [aws-access.cfg]


In [6]:
# TIPP - code to resolve missing write access to S3:
#    https://knowledge.udacity.com/questions/825459

# Starting SparkSession
from pyspark.sql import SparkSession

AWS_ACCESS_KEY_ID     = config['AWS']['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = config['AWS']['AWS_SECRET_ACCESS_KEY']

spark = SparkSession.builder \
                    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
                    .config("spark.executor.instances", 10) \
                    .config("spark.executor.memory", "8g") \
                    .enableHiveSupport() \
                    .getOrCreate()

sc    = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)


#return Spark

# @ EMR (AWS): "emr.5.20.0" > "hadoop 2.7.0"
# @ EMR (AWS): "emr-5.31.0" > "hadoop 2.10.0"

print(">> Spark session WITH Context created")

>> Spark session WITH Context created


In [7]:
print(">> Spark information details")
spark

>> Spark information details


### Step 1: Scope the Project and Gather Data

#### Scope 
At the end of the project there should have a analytics group access to the final database. The steps will be to download the requiered CSV and compress files, process them with python and Spark. Results upload to a S3 Bucket in Parquet format. This Parquet files can be queried like tables, reading while in S3, making easy to perform different type of analysis. 

Which kind of analysis? This could be: 
1. Time Series based Analysis: Examine trends and patterns based on time.
2. Calculations: Throug SQL Queries compute sums, counts, and so on.   


Summary
* For project plan see README.md on GitHub repo.
* Endsolution: Dimension and Fact table for _"Self Service Dashboard"_ (Microsoft Power BI or AWS QuickSight).
* Selected Tools: Spark (EMR on EC2 at AWS) and S3 Bucket.

#### Describe and Gather Data 
Public data is coming from "Inside Airbnb". The data contains informations about properties of Airbnb, the Area (Neighbourhoods) as well reviews left by visitors. My interest is based on the data of Mexico City. _(Amazing!)_   
All the files required are downloadable from [https://insideairbnb.com/get-the-data/](https://insideairbnb.com/get-the-data/). 


| FILENAME | SHORT DESCRIPTION |
| ----- | ----- |
| calendar.csv.gz | Detailed Calendar Data |
| listings.csv | Summary information and metrics for listings in Mexico City (good for visualisations) |
| listings.csv.gz | Detailed Listings Data |
| neighbourhoods.csv | Neighbourhood list for geo filter. Sourced from city or open source GIS files. |
| reviews.csv | Summary Review data and Listing ID (to facilitate time based analytics and visualitaion linekd to a listing) |
| reviews.csv.gz | Detailed Review Data |   


**Hint:** _The compressed files_ `(*.gz)` _could be uploaded directly into workspace.  
On the exploration Jupyter Notebook there are a set of comands to uncompress the files directly on the (Udacity) Jupyter Workspace._  
It is not fully necessary to uncompress the files. Pandas can through `.read_csv()` read compressed files. But as we need to work with Spark, then the CSV files are required.

In [9]:
print(">> List of all compressed files (*.gz)")
!pwd
!ls -all *.gz

>> List of all compressed files (*.gz)
/workspace/home
-rw-r--r-- 1 root root 26455179 Apr 25 22:56 calendar.csv.gz
-rw-r--r-- 1 root root 15445405 Apr 25 22:55 listings.csv.gz
-rw-r--r-- 1 root root 86708818 Apr 25 22:52 reviews.csv.gz


In [8]:
# Read in the data here
# Local data (Udacity Workspace)
input_data      = "./"             # "data_airbnb/"
output_data     = "data_output/"


# CSV's / GZ filesname: Read in the data here
nhoods_csv_fn   = "neighbourhoods.csv"
list_csv_fn     = "listings.csv"
list_det_csv_fn = "listings_detailed.csv"  # < listings.csv.gz
calendar_csv_fn = "calendar.csv"           # < calendar.csv.gz
rev_det_csv_fn  = "reviews_detailed.csv"   # < reviews.csv.gz

hoods_csv       = input_data + nhoods_csv_fn
list_csv        = input_data + list_csv_fn
list_det_csv    = input_data + list_det_csv_fn
calendar_csv    = input_data + calendar_csv_fn
rev_det_csv     = input_data + rev_det_csv_fn


# Pandas (pd) ("_df") for data exploration (Details see separated Jupyter Notebook)
hoods_df        = pd.read_csv(hoods_csv)
list_df         = pd.read_csv(list_csv)
list_det_df     = pd.read_csv(list_det_csv)
calendar_df     = pd.read_csv(calendar_csv)
reviews_df      = pd.read_csv(rev_det_csv)


In [11]:
print(">> List of all CSV files (*.csv)")
!ls -all *.csv

>> List of all CSV files (*.csv)
-rw-r--r-- 1 root root 462719658 Apr 26 00:27 calendar.csv
-rw-r--r-- 1 root root   3747993 Apr 26 01:50 listings.csv
-rw-r--r-- 1 root root  60742007 Apr 26 00:23 listings_detailed.csv
-rw-r--r-- 1 root root       275 Apr 24 23:03 neighbourhoods.csv
-rw-r--r-- 1 root root  18548579 Apr 25 22:17 reviews.csv
-rw-r--r-- 1 root root 226117945 Apr 26 00:35 reviews_detailed.csv


After some "experiments", decided to assign csv's to Pandas dataframes to explore it and display the data in a nice format (i.e `data_df.limit(5).toPandas()`

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.  
_**This happens partwise on the "Exploring" Jupyter Notebook.**_

#### Cleaning Steps
Typically other Tools does alredy give hints about quality of data, like how good columns are filled, if there are errors on the columns based on the assigned data type.   
Applying this knowledge on python is really thrughly different. But here we go.  

* Data Type and Content: Visualize the data in Pandas dataframe to assess quality looking into it. 
* NULL and NaN values: Check for "Null" values, possibly delete Nulls or replace them with '' (empty) or 0 value depending of the column type!
* Duplicates: Drop duplicates to handle with unique identifiers, delete rows if necessary.

In [12]:
print(">> [" + str(hoods_df.count()) + "] rows read out from CSV file")

print("\n>> Overview NEIGHBOURHOODS df...")
hoods_df.head(2)

>> [neighbourhood_group     0
neighbourhood          16
dtype: int64] rows read out from CSV file

>> Overview NEIGHBOURHOODS df...


Unnamed: 0,neighbourhood_group,neighbourhood
0,,Álvaro Obregón
1,,Azcapotzalco


In [13]:
print(">> [" + str(list_df.count()) + "] rows read out from CSV file")

print("\n>> Overview LISTING (summary) df ...")
list_df.head(2)

>> [id                                24224
name                              24223
host_id                           24224
host_name                         24224
neighbourhood_group                   0
neighbourhood                     24224
latitude                          24224
longitude                         24224
room_type                         24224
price                             24224
minimum_nights                    24224
number_of_reviews                 24224
last_review                       20628
reviews_per_month                 20628
calculated_host_listings_count    24224
availability_365                  24224
number_of_reviews_ltm             24224
license                               0
dtype: int64] rows read out from CSV file

>> Overview LISTING (summary) df ...


Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365,number_of_reviews_ltm,license
0,35797,Villa Dante,153786,Dici,,Cuajimalpa de Morelos,19.38283,-99.27178,Entire home/apt,3658,1,0,,,1,363,0,
1,696037,"3 Bedrooms, 2 blocks from Polanco - ALL RENOVA...",3531879,Gonzalo & Sandra,,Miguel Hidalgo,19.4418,-99.18402,Entire home/apt,1469,24,39,2022-12-21,0.31,3,87,2,


In [14]:
print(">> [" + str(calendar_df.count()) + "] rows read out from CSV file")

print("\n>> Overview CALENDAR (original/before) ...")
calendar_df.head(2)

>> [listing_id        8841401
date              8841401
available         8841401
price             8841401
adjusted_price    8841401
minimum_nights    8841397
maximum_nights    8841397
dtype: int64] rows read out from CSV file

>> Overview CALENDAR (original/before) ...


Unnamed: 0,listing_id,date,available,price,adjusted_price,minimum_nights,maximum_nights
0,2056638,2023-03-30,f,"$1,434.00","$1,434.00",1.0,1125.0
1,2056638,2023-03-31,f,"$1,434.00","$1,434.00",1.0,1125.0


In [15]:
print(">> [" + str(reviews_df.count()) + "] rows read out from CSV file")

print("\n>> Overview REVIEW df...")
reviews_df.head(2)

>> [listing_id       886240
id               886240
date             886240
reviewer_id      886240
reviewer_name    886238
comments         886145
dtype: int64] rows read out from CSV file

>> Overview REVIEW df...


Unnamed: 0,listing_id,id,date,reviewer_id,reviewer_name,comments
0,44616,706908,2011-11-09,634733,Lindsay,Forget staying in a hotel. Stay at condesa hau...
1,2056638,9623913,2014-01-03,6743067,Nora Carolina,"El sitio es precioso, está muy bien ubicado al..."


In [16]:
# Performing cleaning tasks here

calendar_df['price']          = calendar_df['price'].str.replace('$', '')
calendar_df['adjusted_price'] = calendar_df['adjusted_price'].str.replace('$', '')
# rename ['price'] because later on it will be related with ['requested_price']
calendar_df = calendar_df.rename(columns={'price': 'requested_price'}) 

print(">> Overview CALENDAR (after) df ...")
calendar_df.head(2)

>> Overview CALENDAR (after) df ...


Unnamed: 0,listing_id,date,available,requested_price,adjusted_price,minimum_nights,maximum_nights
0,2056638,2023-03-30,f,1434.0,1434.0,1.0,1125.0
1,2056638,2023-03-31,f,1434.0,1434.0,1.0,1125.0


In [17]:
# Performing cleaning tasks here

# drop min and max nights as they appear in another future table, view listing detailed df
drop_columns = ['maximum_nights','minimum_nights','price']
list_det_df  = list_det_df.drop(drop_columns, axis=1)

print(">> Overview LISTINGS (detailed) df ...")
list_det_df.head(2)

>> Overview LISTINGS (detailed) df ...


Unnamed: 0,id,listing_url,scrape_id,last_scraped,source,name,description,neighborhood_overview,picture_url,host_id,...,review_scores_communication,review_scores_location,review_scores_value,license,instant_bookable,calculated_host_listings_count,calculated_host_listings_count_entire_homes,calculated_host_listings_count_private_rooms,calculated_host_listings_count_shared_rooms,reviews_per_month
0,2056638,https://www.airbnb.com/rooms/2056638,20230329041210,2023-03-30,city scrape,Amplio y luminoso loft en Coyoacán,Cómodo loft de dos pisos magníficamente ubicad...,,https://a0.muscache.com/pictures/28353712/4379...,10531228,...,4.95,5.0,4.86,,f,3,3,0,0,0.19
1,2072354,https://www.airbnb.com/rooms/2072354,20230329041210,2023-03-30,city scrape,Coyoacan Historic Studio Apartment,This studio flat is adjacent to the owner's ho...,"Located in Coyoacan, in a quiet neighborhood w...",https://a0.muscache.com/pictures/369f3371-593a...,16840050,...,5.0,4.95,4.84,,f,2,2,0,0,0.54


Verification for missing values

In [18]:
dfs = [hoods_df, list_det_df, calendar_df, reviews_df]

# % of missing values per column
for df in dfs:
    for col in df.columns:
        pct_missing = np.mean(df[col].isnull())
      # print('{} - {}%'.format(col, round(pct_missing*100)))
        print(f"'{col} - {round(pct_missing*100)}%'")


'neighbourhood_group - 100.0%'
'neighbourhood - 0.0%'
'id - 0.0%'
'listing_url - 0.0%'
'scrape_id - 0.0%'
'last_scraped - 0.0%'
'source - 0.0%'
'name - 0.0%'
'description - 4.0%'
'neighborhood_overview - 37.0%'
'picture_url - 0.0%'
'host_id - 0.0%'
'host_url - 0.0%'
'host_name - 0.0%'
'host_since - 0.0%'
'host_location - 21.0%'
'host_about - 44.0%'
'host_response_time - 13.0%'
'host_response_rate - 13.0%'
'host_acceptance_rate - 8.0%'
'host_is_superhost - 0.0%'
'host_thumbnail_url - 0.0%'
'host_picture_url - 0.0%'
'host_neighbourhood - 51.0%'
'host_listings_count - 0.0%'
'host_total_listings_count - 0.0%'
'host_verifications - 0.0%'
'host_has_profile_pic - 0.0%'
'host_identity_verified - 0.0%'
'neighbourhood - 37.0%'
'neighbourhood_cleansed - 0.0%'
'neighbourhood_group_cleansed - 100.0%'
'latitude - 0.0%'
'longitude - 0.0%'
'property_type - 0.0%'
'room_type - 0.0%'
'accommodates - 0.0%'
'bathrooms - 100.0%'
'bathrooms_text - 0.0%'
'bedrooms - 2.0%'
'beds - 1.0%'
'amenities - 0.0%'
'min

Do we have duplicates?

In [19]:
duplicate_neighbourhood_group = hoods_df[hoods_df.duplicated(['neighbourhood'])]['neighbourhood'].count()
print(">> Number of duplicate (neighbourhood_group) in hoods_df: ", duplicate_neighbourhood_group)

duplicate_listing_id = list_df[list_df.duplicated(['id'])]['id'].count()
print(">> Number of duplicate (listing_ids) in listing_df: ", duplicate_listing_id)

# for CALENDAR should not make sense to check this
# duplicate_calendar_listing_id = calendar_df[calendar_df.duplicated(['listing_id'])]['listing_id'].count()
# print(">> Number of duplicate listing_ids in calendar_df: ", duplicate_calendar_listing_id)

duplicate_review_id = reviews_df[reviews_df.duplicated(['id'])]['id'].count()
print(">> Number of duplicate (ids) in review_df: ", duplicate_review_id)


>> Number of duplicate (neighbourhood_group) in hoods_df:  0
>> Number of duplicate (listing_ids) in listing_df:  0
>> Number of duplicate (ids) in review_df:  0


<font color="red">**STOP!**</font> - Results? None: Go...

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
A star schema as database design decision does allow a simple and effective way to organize data for querying and reporting. Because of the CSV-files there is a organisation in a way that favoured this.  

The star schema does consist of 1 *Fact* table (booking) and 4 *Dimension* tables (calendar, listing, neighbourhoods and reviews). For the *Dimension* tables they are 4 source files in CSV format of **calendar**, **listings**, **neighbourhooods** as well of **reviews** are on the (local:) Udacity Workspace or (online:) at the S3 on AWS uploaded.

#### 3.2 Mapping Out Data Pipelines
I did select to setup my project with `Spark` (AWS Services) and querying the data on the fly in the `JUPYTER Notebook` without building tables and filling them (typically CREATE TABLE + INSERT SQL statements). This is a great advantage of working with Spark to do this job! After beeing clear which fields to keep for the dimension tables as well the at least minimum necessary structure of the fact table is clear, then the data will be uploaded to an S3 Bucket as Parquet files. From the s3 bucket the Parque files could be redownloaded/readed directly into Spark dataframes and finaly query them directly for verification and demo purposes.  

Next are the Data Schemas definitions indipendently diceded, so this can be applied into the CSV's files when reading it.  

> **Note:**   
> As seen on the "Exploration" Jupyter Notebook, the reading of the CSV's could end on an errornous kind, because of the data formating.  
> Therefor two actions:  
> * Some DataTypes did I (temporary, because of deadline of course time) set to a general DataType like StringType()
> * Also I reduced the amount of columns to have an MVP runing program, means only from first interest. 

In [9]:
# Definition of Schemas into dataframes, 
# ..as Spark read all fields per default as StringType()

hoodSchema = R([
    Fld("neighbourhood_group",    Str()),
    Fld("neighbourhood",          Str()),
])

list_detSchema = R([
    Fld("id",                     Int()),
    Fld("name",                   Str()),
    Fld("description",            Str()),
    Fld("host_id",                Int()),
    Fld("host_name",              Str()),
    Fld("host_since",             Date()),
    Fld("source",                 Str()),
    Fld("latitude",               Flt()),  # Dbl()
    Fld("longitude",              Flt()),  # Dbl()
    #Fld("price",                 Flt()),  # Dbl()
    Fld("review_scores_rating",   Int()),
    Fld("reviews_per_month",      Int()),  # 
    Fld("room_type",              Str()),
    Fld("accommodates",           Int()),
    Fld("bathrooms",              Int()),
    Fld("bedrooms",               Int()),
    Fld("beds",                   Int()),
    Fld("neighbourhood_cleansed", Str()),
])

# Alredy here cutting (cleaning up) columns ['price','m*']!
calendarSchema = R([
    Fld("listing_id",             Int()),
    Fld("date",                   Date()),
    Fld("available",              Str()),  # Bol()
    Fld("adjusted_price",         Str()),  # Dec()
])

rev_detSchema = R([
    Fld("listing_id",             Int()),
    Fld("id",                     Int()),
    Fld("date",                   Date()),
    Fld("reviewer_id",            Int()),
    Fld("reviewer_name",          Str()),
    Fld("comments",               Str()),
])

print(">> DataFrame Schemas are declared")

>> DataFrame Schemas are declared


In [10]:
# Created Schemas passing them to import defined data types
# read out and create spark dataframes ("_df"), NOT Data Feed ("df_")
df_hoods    = spark.read.csv(hoods_csv, hoodSchema)

#df_list     = spark.read.option("header","true") \
                   #.format("csv").schema(listSchema) \
                   #.load(list_csv)

df_listing  = spark.read.option("header","true").format("csv") \
                   .schema(list_detSchema) \
                   .load(list_det_csv)

df_calendar = spark.read.option('header','true') \
                   .option('sep',',') \
                   .format('csv') \
                   .schema(calendarSchema) \
                   .load(calendar_csv)

df_reviews  = spark.read.format("csv") \
                   .option("header",    True) \
                   .option('quotes',    '"') \
                   .option("delimiter", ',') \
                   .schema(rev_detSchema) \
                   .load(rev_det_csv)

#df_reviews  = df_reviews.withColumnRenamed("id",   "review_id") \
#                        .withColumnRenamed("date", "review_date")


In [22]:
# Cleaning actions on columns
# df_calendar = df_calendar.drop('price')  
# This column drop, could be applied already using the Schemas. 

df_calendar.printSchema()  # should not more display column ['price']

root
 |-- listing_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- available: string (nullable = true)
 |-- adjusted_price: string (nullable = true)



In [23]:
df_calendar.limit(2).toPandas()

Unnamed: 0,listing_id,date,available,adjusted_price
0,2056638,2023-03-30,f,"$1,434.00"
1,2056638,2023-03-31,f,"$1,434.00"


In [24]:
df_reviews.limit(2).toPandas()

Unnamed: 0,listing_id,id,date,reviewer_id,reviewer_name,comments
0,44616,706908,2011-11-09,634733,Lindsay,Forget staying in a hotel. Stay at condesa hau...
1,2056638,9623913,2014-01-03,6743067,Nora Carolina,"El sitio es precioso, está muy bien ubicado al..."


In [25]:
df_listing.limit(2).toPandas()

Unnamed: 0,id,name,description,host_id,host_name,host_since,source,latitude,longitude,review_scores_rating,reviews_per_month,room_type,accommodates,bathrooms,bedrooms,beds,neighbourhood_cleansed
0,,,,,,,,,,,,,,,,,
1,,,,,,,,,,,,,,,,,


In [None]:
###########################

In [26]:
list_detSchema = R([
    Fld("id",                     Int()),
    Fld("name",                   Str()),
    Fld("description",            Str()),
    Fld("host_id",                Str()), # Int()),
    Fld("host_name",              Str()),
    Fld("host_since",             Str()), # Date()),
    Fld("source",                 Str()),
    Fld("latitude",               Str()), # Flt()),  # Dbl()
    Fld("longitude",              Str()), # Flt()),  # Dbl()
    #Fld("price",                 Flt()),  # Dbl()
    Fld("review_scores_rating",   Str()), # Int()),
    Fld("reviews_per_month",      Str()), # Int()),  # 
    Fld("room_type",              Str()), # Str()),
    Fld("accommodates",           Str()), # Int()),
    Fld("bathrooms",              Str()), # Int()),
    Fld("bedrooms",               Int()),
    Fld("beds",                   Str()), # Int()),
    Fld("neighbourhood_cleansed", Str()),
])

df_listing  = spark.read.option("header","true").format("csv") \
                   .schema(list_detSchema) \
                   .load(list_det_csv)

df_listing.limit(2).toPandas()

Unnamed: 0,id,name,description,host_id,host_name,host_since,source,latitude,longitude,review_scores_rating,reviews_per_month,room_type,accommodates,bathrooms,bedrooms,beds,neighbourhood_cleansed
0,2056638,https://www.airbnb.com/rooms/2056638,20230329041210,2023-03-30,city scrape,Amplio y luminoso loft en Coyoacán,Cómodo loft de dos pisos magníficamente ubicad...,,https://a0.muscache.com/pictures/28353712/4379...,10531228,https://www.airbnb.com/users/show/10531228,Maria,2013-12-09,"Mexico City, Mexico",,within a day,100%
1,2072354,https://www.airbnb.com/rooms/2072354,20230329041210,2023-03-30,city scrape,Coyoacan Historic Studio Apartment,This studio flat is adjacent to the owner's ho...,"Located in Coyoacan, in a quiet neighborhood w...",https://a0.muscache.com/pictures/369f3371-593a...,16840050,https://www.airbnb.com/users/show/16840050,Mónica,2014-06-16,"Mexico City, Mexico",,,


> <font color="red">reading less columns, then there is a missmatching, so that read out results are incorrect! [TODO]</font>   
i.e. content of columnd ['name'] is the content of ['listing_url']   
> 
> This means, the `spark.read.option()` requires an option to read out better the content!   
See `Exploring Jupyter Notebook` for more details.

In [11]:
list_detSchema = R([
    Fld('id',                                           Int()),
    Fld('listing_url',                                  Str()),
    Fld('scrape_id',                                    Str()), # Int()),
    Fld('last_scraped',                                 Str()), # Date()),
    Fld('source',                                       Str()),
    Fld('name',                                         Str()),
    Fld('description',                                  Str()),
    Fld('neighborhood_overview',                        Str()),
    Fld('picture_url',                                  Str()),
    Fld('host_id',                                      Str()), # Int()),
    Fld('host_url',                                     Str()),
    Fld('host_name',                                    Str()),
    Fld('host_since',                                   Str()), # Date()),
    Fld('host_location',                                Str()),
    Fld('host_about',                                   Str()),
    Fld('host_response_time',                           Str()), #  Int()),
    Fld('host_response_rate',                           Str()), # Int()),
    Fld('host_acceptance_rate',                         Str()), # Int()),
    Fld('host_is_superhost',                            Str()),
    Fld('host_thumbnail_url',                           Str()),
    Fld('host_picture_url',                             Str()),
    Fld('host_neighbourhood',                           Str()),
    Fld('host_listings_count',                          Str()), # Int()),
    Fld('host_total_listings_count',                    Str()), # Int()),
    Fld('host_verifications',                           Str()),
    Fld('host_has_profile_pic',                         Str()),
    Fld('host_identity_verified',                       Str()),
    Fld('neighbourhood',                                Str()),
    Fld('neighbourhood_cleansed',                       Str()),
    Fld('neighbourhood_group_cleansed',                 Str()),
    Fld('latitude',                                     Str()), # Dbl()),
    Fld('longitude',                                    Str()), # Dbl()),
    Fld('property_type',                                Str()),
    Fld('room_type',                                    Str()),
    Fld('accommodates',                                 Str()),
    Fld('bathrooms',                                    Str()),
    Fld('bathrooms_text',                               Str()),
    Fld('bedrooms',                                     Str()), # Int()),
    Fld('beds',                                         Str()), # Int()),
    Fld('amenities',                                    Str()),
    Fld('price',                                        Str()), # Dbl()),
    Fld('minimum_nights',                               Str()), # Int()),
    Fld('maximum_nights',                               Str()), # Int()),
    Fld('minimum_minimum_nights',                       Str()), # Int()),
    Fld('maximum_minimum_nights',                       Str()), # Int()),
    Fld('minimum_maximum_nights',                       Str()), # Int()),
    Fld('maximum_maximum_nights',                       Str()), # Int()),
    Fld('minimum_nights_avg_ntm',                       Str()), # Int()),
    Fld('maximum_nights_avg_ntm',                       Str()), # Int()),
    Fld('calendar_updated',                             Str()), # Date()),
    Fld('has_availability',                             Str()),
    Fld('availability_30',                              Str()), # Int()),
    Fld('availability_60',                              Str()), # Int()),
    Fld('availability_90',                              Str()), # Int()),
    Fld('availability_365',                             Str()), # Int()),
    Fld('calendar_last_scraped',                        Str()),
    Fld('number_of_reviews',                            Str()), # Int()),
    Fld('number_of_reviews_ltm',                        Str()), # Int()),
    Fld('number_of_reviews_l30d',                       Str()), # Int()),
    Fld('first_review',                                 Str()), # Date()),
    Fld('last_review',                                  Str()), # Date()),
    Fld('review_scores_rating',                         Str()), # Int()),
    Fld('review_scores_accuracy',                       Str()), # Int()),
    Fld('review_scores_cleanliness',                    Str()), # Int()),
    Fld('review_scores_checkin',                        Str()), # Int()),
    Fld('review_scores_communication',                  Str()), # Int()),
    Fld('review_scores_location',                       Str()), # Int()),
    Fld('review_scores_value',                          Str()), # Int()),
    Fld('license',                                      Str()),
    Fld('instant_bookable',                             Str()),
    Fld('calculated_host_listings_count',               Str()), # Int()),
    Fld('calculated_host_listings_count_entire_homes',  Str()), # Int()),
    Fld('calculated_host_listings_count_private_rooms', Str()), # Int()),
    Fld('calculated_host_listings_count_shared_rooms',  Str()), # Int()),
    Fld('reviews_per_month',                            Str()), # Int())
])

df_listing  = spark.read.option("header","true").format("csv") \
                   .option("inferSchema", "true") \
                   .load(list_det_csv)

                 # .schema(list_detSchema) \

df_listing.limit(2).toPandas()

Unnamed: 0,id,listing_url,scrape_id,last_scraped,source,name,description,neighborhood_overview,picture_url,host_id,...,review_scores_communication,review_scores_location,review_scores_value,license,instant_bookable,calculated_host_listings_count,calculated_host_listings_count_entire_homes,calculated_host_listings_count_private_rooms,calculated_host_listings_count_shared_rooms,reviews_per_month
0,2056638,https://www.airbnb.com/rooms/2056638,20230329041210,2023-03-30,city scrape,Amplio y luminoso loft en Coyoacán,Cómodo loft de dos pisos magníficamente ubicad...,,https://a0.muscache.com/pictures/28353712/4379...,10531228,...,2014-01-03,2018-06-24,4.95,4.71,4.95,5.0,4.95,5.0,4.86,
1,2072354,https://www.airbnb.com/rooms/2072354,20230329041210,2023-03-30,city scrape,Coyoacan Historic Studio Apartment,This studio flat is adjacent to the owner's ho...,"Located in Coyoacan, in a quiet neighborhood w...",https://a0.muscache.com/pictures/369f3371-593a...,16840050,...,"""""Clothing storage: closet""""","""""Body soap""""]""",$830.00,3.0,1125.0,3.0,3.0,1125.0,1125.0,3.0


#### "reviews" is a representation of one of the four dimension tables

In [28]:
# Spark can run SQL Queries on the fly without CREATE TABLE or INSERT INTO statements (advantage!)
df_reviews.createOrReplaceTempView("reviews")
spark.sql("""
    SELECT listing_id, count(*) as reviews
      FROM reviews
     WHERE comments is not null
     GROUP by 1
     ORDER by reviews desc
     LIMIT 5
""").show()

+----------+-------+
|listing_id|reviews|
+----------+-------+
|  47718598|   1092|
|  48465948|    908|
|  45175157|    770|
|  45739063|    768|
|  48488345|    715|
+----------+-------+



#### Following, the Spark Query based on the four dimension table representations, does shown the fact tabel later on labeled "Booking"
Massive advantage of this is the early verification of the database design as well data fitting. 

In [None]:
df_reviews.createOrReplaceTempView("reviews")
df_hoods.createOrReplaceTempView("hoods")
df_listing.createOrReplaceTempView("listing")
df_calendar.createOrReplaceTempView("calendar")

spark.sql("""
    SELECT *
      FROM calendar 
      LEFT JOIN listing 
             ON (listing.id = calendar.listing_id)
      LEFT JOIN reviews 
           using (listing_id)
     LIMIT 1
""").show()

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model:   
* Define destination S3 Bucket on AWS
* Definitions of the dimension table representations ready to upload
* Definition of the fact table representation "Booking" by joining
* Writing all the defined table representations to S3 Bucket
* Reading from S3 Bucket and creating again the table objects to make quality checks
* made a quality check

#### Define S3 Bucket destination

In [12]:
output_data = "s3a://capstone-project-out/"

In [13]:
# NEIGHBOURHOODS - extract columns to create Hoods table
hoods_table = df_hoods.select('neighbourhood_group', 'neighbourhood')

In [32]:
print(">> NEIGHBOURHOODS Table ...")
hoods_table.show(3)

>> NEIGHBOURHOODS Table ...
+-------------------+--------------+
|neighbourhood_group| neighbourhood|
+-------------------+--------------+
|neighbourhood_group| neighbourhood|
|               null|Álvaro Obregón|
|               null|  Azcapotzalco|
+-------------------+--------------+
only showing top 3 rows



In [33]:
print(">> NEIGHBOURHOODS Table ...")
hoods_table.limit(3).toPandas()

>> NEIGHBOURHOODS Table ...


Unnamed: 0,neighbourhood_group,neighbourhood
0,neighbourhood_group,neighbourhood
1,,Álvaro Obregón
2,,Azcapotzalco


In [14]:
# LISTING - extract columns to create LISTING table
df_listing       = df_listing.withColumn('month', month('host_since'))

# Only using columns of interest (demo). Including all will slow down execution for review
# 'adjusted_price', > 'price'
listing_columns  = ['id','name','description','host_id','host_name','host_since', \
                   'month','source','latitude','longitude', 'price',\
                   'review_scores_rating','reviews_per_month','room_type', \
                   'accommodates','bathrooms','bedrooms','beds','neighbourhood_cleansed']
listing_table    = df_listing.selectExpr(listing_columns).dropDuplicates()

In [35]:
print(">> LISTING Table ...")
listing_table.head(1)

>> LISTING Table ...


[Row(id='2444573', name='SPEED WiFi Apartment -Olloqui Park near Coyoacan', description='Beautiful furnished apartment for temporary rent in Mexico City, located in Acacias neighborhood. Quiet and safe residential zone, very close to Metro (subway) Coyoacán station; bike service, EcoBici,  is available at the park. The place is very close to Coyoacán downtown, one of the most beautiful neighborhoods in Mexico City. It has a big desk and confortable office chair where you can work comfortably.<br /><br /><b>The space</b><br />This beautiful apartment is perfect to accommodate up to two people; it has one bedroom with a queen size bed; two full bathrooms. <br />It features excellent natural light, bamboo floors, plants and books, fully equipped kitchen (no microwave but gas oven), and laundry room with dryer and and washing machine. It includes one parking space. Working desk and chair.<br /><br /><b>Guest access</b><br />Enjoy the full apartment. The rent includes WiFi internet with exc

In [36]:
print(">> LISTING Table ...")
listing_table.show(3)

>> LISTING Table ...
+--------------------+--------------------+--------------------+--------+---------+----------+-----+---------------+--------+---------+--------------------+--------------------+--------------------+---------------+------------+---------+--------+----+----------------------+
|                  id|                name|         description| host_id|host_name|host_since|month|         source|latitude|longitude|               price|review_scores_rating|   reviews_per_month|      room_type|accommodates|bathrooms|bedrooms|beds|neighbourhood_cleansed|
+--------------------+--------------------+--------------------+--------+---------+----------+-----+---------------+--------+---------+--------------------+--------------------+--------------------+---------------+------------+---------+--------+----+----------------------+
|             2444573|SPEED WiFi Apartm...|Beautiful furnish...|12497740|  Joaquin|2014-02-23|    2|    city scrape|19.36229|-99.17355| ""Luggage dropof..

In [37]:
print(">> LISTING Table ...")
listing_table.limit(3).toPandas()

>> LISTING Table ...


Unnamed: 0,id,name,description,host_id,host_name,host_since,month,source,latitude,longitude,price,review_scores_rating,reviews_per_month,room_type,accommodates,bathrooms,bedrooms,beds,neighbourhood_cleansed
0,2444573,SPEED WiFi Apartment -Olloqui Park near Coyoacan,Beautiful furnished apartment for temporary re...,12497740.0,Joaquin,2014-02-23,2.0,city scrape,19.36229,-99.17355,"""""Luggage dropoff allowed""""","""""Hangers""""","""""Free washer \u2013 In unit""""",Entire home/apt,2.0,,1.0,1.0,Benito Juárez
1,1555819,New Suite in Vintage Home in Roma/Condesa,The mansion is a pleasant 15 minute walk from ...,8282570.0,Thomas,2013-08-20,8.0,city scrape,19.41709,-99.16603,"""""Luggage dropoff allowed""""","""""Dryer""""",,Entire home/apt,2.0,,1.0,1.0,Cuauhtémoc
2,Architecte mexicaine,,,,,,,faire du vélo.,,,,,,,,,,,


In [15]:
# CALENDAR - extract columns to create CALENDAR table
df_calendar      = df_calendar.withColumn('month', month('date'))

calendar_columns = ['listing_id', 'date', 'available', 'adjusted_price', 'month']
calendar_table   = df_calendar.selectExpr(calendar_columns).dropDuplicates()

In [39]:
print(">> CALENDAR Table ...")
calendar_table.head(3)

>> CALENDAR Table ...


[Row(listing_id=35797, date=datetime.date(2023, 6, 14), available='t', adjusted_price='$3,658.00', month=6),
 Row(listing_id=44616, date=datetime.date(2023, 6, 6), available='t', adjusted_price='$18,000.00', month=6),
 Row(listing_id=44616, date=datetime.date(2023, 8, 1), available='t', adjusted_price='$18,000.00', month=8)]

In [40]:
print(">> CALENDAR Table ...")
calendar_table.show(3)

>> CALENDAR Table ...
+----------+----------+---------+--------------+-----+
|listing_id|      date|available|adjusted_price|month|
+----------+----------+---------+--------------+-----+
|     35797|2023-06-14|        t|     $3,658.00|    6|
|     44616|2023-06-06|        t|    $18,000.00|    6|
|     44616|2023-08-01|        t|    $18,000.00|    8|
+----------+----------+---------+--------------+-----+
only showing top 3 rows



In [41]:
print(">> CALENDAR Table ...")
calendar_table.limit(3).toPandas()

>> CALENDAR Table ...


Unnamed: 0,listing_id,date,available,adjusted_price,month
0,35797,2023-06-14,t,"$3,658.00",6
1,44616,2023-06-06,t,"$18,000.00",6
2,44616,2023-08-01,t,"$18,000.00",8


In [42]:
# ... Checking
df_reviews.printSchema()

root
 |-- listing_id: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- reviewer_id: integer (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)



> Be careful, when merging two tables, same column names is ambigous!   
So, decide which column should be renamed, to have unique column names!

In [16]:
# Required, otherwise it is ambigous when merging booking_stage_table with reviews_table as booking_table!
df_reviews = df_reviews.withColumnRenamed("id",   "review_id") \
                       .withColumnRenamed("date", "review_date")

# ... Checking
df_reviews.printSchema()

root
 |-- listing_id: integer (nullable = true)
 |-- review_id: integer (nullable = true)
 |-- review_date: date (nullable = true)
 |-- reviewer_id: integer (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)



In [17]:
# REVIEWS TABLE - extract columns to create REVIEWS table

df_reviews       = df_reviews.na.drop()
df_reviews       = df_reviews.withColumn('month', month('review_date'))

reviews_columns  = ['listing_id', 'review_id', 'review_date', \
                    'reviewer_id', 'reviewer_name', 'comments', 'month']
reviews_table    = df_reviews.selectExpr(reviews_columns).dropDuplicates()

In [45]:
print(">> REVIEWS Table ...")
reviews_table.head(3)

>> REVIEWS Table ...


[Row(listing_id=2072354, review_id=9530203, review_date=datetime.date(2014, 1, 1), reviewer_id=4727516, reviewer_name='Ashley', comments='This studio apartment is located in a great neighborhood (Coyoacan), near Viveros and near markets, restaurants, museums and churches.  The apartment is very comfortable, and our hosts, Hilda, Monica, and family, were all very friendly and helpful during our stay!  Monica gave us great advice and maps for exploring the surrounding areas.  We would definitely stay here again! ', month=1),
 Row(listing_id=696037, review_id=8901175, review_date=datetime.date(2013, 11, 25), reviewer_id=990894, reviewer_name='Tim', comments="Comfortable apartment, relatively convenient location. It's not that hard to get there from the airport (even though it's across the city) and public transit is accessible. Gonzalo is very helpful and accomodating. There's a really nice grocery store a block away. There is a lot of construction in the neighborhood but it is not a both

In [46]:
print(">> REVIEWS Table ...")
reviews_table.show(3)

>> REVIEWS Table ...
+----------+---------+-----------+-----------+-------------+--------------------+-----+
|listing_id|review_id|review_date|reviewer_id|reviewer_name|            comments|month|
+----------+---------+-----------+-----------+-------------+--------------------+-----+
|   2072354|  9530203| 2014-01-01|    4727516|       Ashley|This studio apart...|    1|
|    696037|  8901175| 2013-11-25|     990894|          Tim|Comfortable apart...|   11|
|    704779| 11645946| 2014-04-10|   13495018|      Michael|This was our firs...|    4|
+----------+---------+-----------+-----------+-------------+--------------------+-----+
only showing top 3 rows



In [47]:
print(">> REVIEWS Table ...")
reviews_table.limit(3).toPandas()

>> REVIEWS Table ...


Unnamed: 0,listing_id,review_id,review_date,reviewer_id,reviewer_name,comments,month
0,2072354,9530203,2014-01-01,4727516,Ashley,This studio apartment is located in a great ne...,1
1,696037,8901175,2013-11-25,990894,Tim,"Comfortable apartment, relatively convenient l...",11
2,704779,11645946,2014-04-10,13495018,Michael,This was our first trip to Mexico and our firs...,4


In [48]:
print('>> DIMENSION Tables data is prepared and ready to be uploaded to S3')

>> DIMENSION Tables data is prepared and ready to be uploaded to S3


In [18]:
df_bookings_stage = df_calendar.join(listing_table, \
                                     on=[calendar_table.listing_id == \
                                         listing_table.id], \
                                     how='left')

In [50]:
df_bookings_stage.printSchema()

root
 |-- listing_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- available: string (nullable = true)
 |-- adjusted_price: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- price: string (nullable = true)
 |-- review_scores_rating: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- beds: string (nullable = true)
 |-- neighbourhood_cleansed: string (nullable = true)



In [19]:
df_bookings    = df_bookings_stage.join(reviews_table, \
                                        on=[df_bookings_stage.listing_id == \
                                            reviews_table.listing_id], \
                                        how='left')

In [53]:
print(df_bookings.printSchema())
print(df_bookings.show(3))

root
 |-- listing_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- available: string (nullable = true)
 |-- adjusted_price: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- price: string (nullable = true)
 |-- review_scores_rating: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- beds: string (nullable = true)
 |-- neighbourhood_cleansed: string (nullable = true)
 |-- li

In [20]:
# preparing BOOKINGS before uploading to the S3 as parquet

df_bookings      = df_bookings.withColumn('month_review', month('review_date'))

# Only using columns of interest (demo). NOTE: Including ALL will slow down execution for review
booking_table    = df_bookings.select('id','name','description','host_id','host_name', \
                     'host_since','source','latitude','longitude','adjusted_price', \
                     'review_id','review_date','reviewer_id','review_scores_rating', \
                     'reviews_per_month','room_type','accommodates','bathrooms','bedrooms','beds','month_review')

In [55]:
print(booking_table.show(3))

+-------+--------------------+--------------------+-------+---------+----------+-----------+--------+---------+--------------+---------+-----------+-----------+--------------------+-----------------+---------+------------+---------+--------+----+------------+
|     id|                name|         description|host_id|host_name|host_since|     source|latitude|longitude|adjusted_price|review_id|review_date|reviewer_id|review_scores_rating|reviews_per_month|room_type|accommodates|bathrooms|bedrooms|beds|month_review|
+-------+--------------------+--------------------+-------+---------+----------+-----------+--------+---------+--------------+---------+-----------+-----------+--------------------+-----------------+---------+------------+---------+--------+----+------------+
|1656382|Habitación indivi...|Habitación privad...| 796221|    Karla|2011-07-09|city scrape|    null|     null|     $1,171.00|     null|       null|       null|                null|             null|     null|        nul

In [56]:
print(">> BOOKING Table ...")
booking_table.limit(3).toPandas()

>> BOOKING Table ...


Unnamed: 0,id,name,description,host_id,host_name,host_since,source,latitude,longitude,adjusted_price,...,review_date,reviewer_id,review_scores_rating,reviews_per_month,room_type,accommodates,bathrooms,bedrooms,beds,month_review
0,1656382,Habitación individual c baño propio,"Habitación privado con baño propio, amplia, co...",796221,Karla,2011-07-09,city scrape,,,"$1,171.00",...,,,,,,,,,,
1,1656382,Habitación individual c baño propio,"Habitación privado con baño propio, amplia, co...",796221,Karla,2011-07-09,city scrape,,,"$1,171.00",...,,,,,,,,,,
2,1656382,Habitación individual c baño propio,"Habitación privado con baño propio, amplia, co...",796221,Karla,2011-07-09,city scrape,,,"$1,171.00",...,,,,,,,,,,


In [57]:
print('>> FACT Table data "BOOKING" is prepared and ready to be uploaded to S3')

>> FACT Table data "BOOKING" is prepared and ready to be uploaded to S3


Furthermore following data quality checks could be included before writing up to S3 Bucket to make sure the data is uploaded in the right format...

In [None]:
# from pyspark.sql.functions import col, isnull

# def apply_constraints(df, constraints):
#     '''Help for procedure APPLY_CONSTRAINTS
# 
#     this function takes the spark dataframe and list of contraints
#     and applies them to the df
# 
#     Parameter: 
#     - df: dataframe
#     - constraints
#     '''
# 
#     for constraint in constraints:
#         df = df.filter(constraint)
#     return df

In [None]:
# List of various constraints for the tables
# from pyspark.sql.functions import col

# Dimensions Table
# 
# hood_constraints = [
#     col("neighbourhood").isUnique(),
#     col("neighbourhood_group").isNotNull()]

# calendar_constraints = [
#     (col("listing_id") + col("date")).isUnique(),
#     col("available").isNotNull(),
#     col("price").isNotNull()]

# review_constraints = [
#     (col('listing_id') + col('review_date')).isUnique(),
#     col('comments').isNotNull()]

# listing_constraints = [
#     col("host_id").isUnique(),
#     col("host_since").isNotNull()]

# FACT Table
# 
# booking_constraints = [
#     col("review_id").isUnique(),
#     col("review_date").isNotNull(),
#     col("reviewer_id").isNotNull()]

In [None]:
# Afterwards, use the function to pass the contraints to the dfs
# 
# hoods_table    = apply_constraints(reviews_table, hood_constraints)
# calendar_table = apply_constraints(reviews_table, calendar_constraints)
# reviews_table  = apply_constraints(reviews_table, review_constraints)
# listing_table  = apply_constraints(reviews_table, listing_constraints)
# 
# booking_table  = apply_constraints(reviews_table, booking_constraints)

Quality checks done. Tables are ready to be uploaded into the S3 "Data Lake" on AWS.  

> <font color="red">**REMINDER:**  
> S3 target directory at [S3 Bucket @ AWS] (https://us-west-2.console.aws.amazon.com/console/home?region=us-west-2#) created? ;-) </font>   

> **Note:**   
> It can timeout! - For demo purposes only used the "lightweight" tables ;-)   
> `listing_table`, `reviews_table` as well the fact table `booking_table` are too heavy, therefore it could be take **VERY LONG!**

In [58]:
hoods_table.printSchema()

root
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)



In [59]:
# WRITING TABLES AS PARQUET TO S3

# NEIGHBOURHOODS table - write to parquet files partitioned by neighbourhood_group
hoods_table.write.mode('overwrite').partitionBy('neighbourhood_group').parquet(output_data + 'neighbourhood')
print('>> NEIGHBOURHOODS Table is created in the S3 bucket!')

>> NEIGHBOURHOODS Table is created in the S3 bucket!


In [None]:
# write CALENDAR table to parquet files partitioned by month
calendar_table.write.partitionBy('month') \
              .parquet(output_data + 'calendar', 'overwrite')
print('>> CALENDAR Table is created on the S3 bucket!')

- --  
> Both last tables would be successfull uploaded to S3 @ AWS.  
The other tables require a better environment as the Udacity Workplace are, otherwise it will be eigher a time out or a memory insufficience. 


![S3 Bucket](./IMAGE-S3-Bucket-Parquet-Overview.png?raw=treu)

In [None]:
# LISTING table - write to parquet files partitioned by month and room type
# listing_table.write.partitionBy('month', 'room_type') \
#              .parquet(output_data + 'listing', 'overwrite')
# print('>> LISTING Table is created on the S3 bucket!')


# REVIEW table - write to parquet files partitioned by month 
# reviews_table.write.partitionBy('month') \
#              .parquet(output_bucket + 'reviews', 'overwrite')
# print('>> REVIEW Table is created on the S3 bucket!')


# BOOKINGS (fact) table - write to parquet files partitioned month
# booking_table.write.partitionBy('month') \
#              .parquet(output_bucket + 'booking', 'overwrite')
# print('>> BOOKING Table is created on the S3 bucket!')

The uploaded tables should be now on the S3 Bucket available to be parsed by accessing the target S3 Buecket.  

**Because of above hint**, we check briefly if it works, leaving the big tables out _(they are correct!)._

In [13]:
# From S3 read tables in order to parse and do analysis ...

neighbourhoods = spark.read.parquet(output_data + 'neighbourhood')
#listing        = spark.read.parquet(output_data + 'listing')
#calendar       = spark.read.parquet(output_data + 'calendar')
#reviews        = spark.read.parquet(output_data + 'reviews')

#booking        = spark.read.option('mergeSchema', True).parquet(output_data + 'booking')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

# Only this exemplary, to save time: 
uploaded_tables   = ['hoods_table']
downloaded_tables = ['neighbourhoods']


# ALL: Get the count of rows in the dataframe
#uploaded_tables= ['booking_table','listing_table', \
#                     'reviews_table','hoods_table','calendar_table']
#
#downloaded_tables = ['booking','listing','reviews', \
#                     'neighbourhoods','calendar']


expected_count = []
row_count      = []

for i in uploaded_tables:
    expected_count.append(i)

for i in downloaded_tables:
    row_count.append(i)

# Compare the counts
for uploaded, downloaded in zip(uploaded_tables, downloaded_tables):
    if downloaded.count() != uploaded.count():
        raise ValueError(">> Data is incomplete! ** Expected {} rows in {}, but found {}." \
                         .format(uploaded.count(), uploaded, downloaded.count()) \
                        )


**Note:** Above quality check does only show an error when there is none matching between both row counts!

Last verification that the tables read out from S3 Bucket can be used here some Spark SQL Queries

In [24]:
hoods.createOrReplaceTempView("neighbourhoods")
listing.createOrReplaceTempView("listing")
calendar.createOrReplaceTempView("calendar")

In [25]:
# NEIGHBOURHOODS dimension Table
spark.sql("""
    SELECT * 
      FROM neighbourhoods 
     LIMIT 5
""").show()

+-------------------+--------------+
|neighbourhood_group| neighbourhood|
+-------------------+--------------+
|neighbourhood_group| neighbourhood|
|               null|Álvaro Obregón|
|               null|  Azcapotzalco|
|               null| Benito Juárez|
|               null|      Coyoacán|
+-------------------+--------------+



In [26]:
# LISTING dimension Table
spark.sql("""
    SELECT * 
      FROM listing 
     LIMIT 5
""").show()

+------------------+--------------------+--------------------+--------------+-----------+--------------------+--------------------+---------------------+--------------------+----------------+--------------------+----------------+------------------+-------------------+--------------------+--------------------+------------------+--------------------+-----------------+--------------------+--------------------+------------------+-------------------+-------------------------+------------------+--------------------+----------------------+--------------------+----------------------+----------------------------+--------------------+--------------------+--------------------+--------------------+---------------+--------------------+--------------+---------+---------+--------------------+--------------------+--------------------+--------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+-----------

In [27]:
# CALENDAR dimension Table
spark.sql("""
    SELECT * 
      FROM calendar 
     LIMIT 10
""").show()

+----------+----------+---------+--------------+-----+
|listing_id|      date|available|adjusted_price|month|
+----------+----------+---------+--------------+-----+
|   2056638|2023-03-30|        f|     $1,434.00|    3|
|   2056638|2023-03-31|        f|     $1,434.00|    3|
|   2056638|2023-04-01|        f|     $1,434.00|    4|
|   2056638|2023-04-02|        f|     $1,434.00|    4|
|   2056638|2023-04-03|        f|     $1,434.00|    4|
|   2056638|2023-04-04|        f|     $1,434.00|    4|
|   2056638|2023-04-05|        f|     $1,434.00|    4|
|   2056638|2023-04-06|        f|     $1,434.00|    4|
|   2056638|2023-04-07|        f|     $1,434.00|    4|
|   2056638|2023-04-08|        f|     $1,434.00|    4|
+----------+----------+---------+--------------+-----+



In [30]:
# Calculation & Analysis of Room Type
spark.sql("""
    SELECT DISTINCT(room_type),
           COUNT(*) AS num_listing 
      FROM listing
     WHERE room_type IN ("Entire home/apt", "Private room", "Shared room", "Hotel room")
     GROUP BY 1 
     ORDER BY 2 DESC 
     LIMIT 10
""").show()

+---------------+-----------+
|      room_type|num_listing|
+---------------+-----------+
|Entire home/apt|      10583|
|   Private room|       5996|
|    Shared room|        203|
|     Hotel room|         85|
+---------------+-----------+



![Bar Graph Airbnb Website](./IMAGE-Room-Type-BarGraph-Airbnb.png)
_Source: (http://insideairbnb.com/mexico-city/)_  

As comparison, made under Power BI:   
![Bar Graph PowerBI](./IMAGE-Room-Type-BarGraph-PowerBI.png)

In [None]:
spark.sql("""
    SELECT DATE_TRUNC('month',calendar.date) AS month,
           COUNT(DISTINCT(listing.id)) AS num_listing_month
      FROM calendar 
      LEFT JOIN listing 
           ON calendar.listing_id = listing.id
     WHERE DATE_TRUNC('year',calendar.date) = '2022'
       AND host_id LIKE '89%'
     GROUP BY 1 
     ORDER BY 1 asc 
     LIMIT 12
""").show()

+-------------------+-----------------+   
|              month|num_listing_month|   
+-------------------+-----------------+   
|2022-09-01 00:00:00|                9|   
|2022-10-01 00:00:00|                9|   
|2022-11-01 00:00:00|                9|   
|2022-12-01 00:00:00|                9|   
+-------------------+-----------------+   

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

I started with these Schemas - Detailed Data Dictionary see Document!   


`Neighbourhood  
++++++++++++++  
 |-- neighbourhood_group: string (nullable = true)  
 |-- neighbourhood: string (nullable = true)`

`reviews  
++++++++  
 |-- listing_id: integer (nullable = true)  
 |-- id: integer (nullable = true)  
 |-- date: date (nullable = true)  
 |-- reviewer_id: integer (nullable = true)  
 |-- reviewer_name: string (nullable = true)  
 |-- comments: string (nullable = true)`

`calendar  
++++++++++  
 |-- listing_id: integer (nullable = true)  
 |-- date: date (nullable = true)  
 |-- available: boolean (nullable = true)  
 |-- adjusted_price: decimal(10,0) (nullable = true)`

`listings  
+++++++++++  
 |-- id: integer (nullable = true)  
 |-- name: string (nullable = true)  
 |-- description: string (nullable = true)  
 |-- host_id: integer (nullable = true)  
 |-- host_name: string (nullable = true)  
 |-- host_since: date (nullable = true)  
 |-- source: string (nullable = true)  
 |-- latitude: float (nullable = true)  
 |-- longitude: float (nullable = true)  
 |-- price: float (nullable = true)  
 |-- review_scores_rating: integer (nullable = true)  
 |-- reviews_per_month: float (nullable = true)  
 |-- room_type: string (nullable = true)  
 |-- accommodates: integer (nullable = true)  
 |-- bathrooms: integer (nullable = true)  
 |-- bedrooms: integer (nullable = true)  
 |-- beds: integer (nullable = true)`


![Database Model](IMAGE-Database-Model.png?raw=true)

#### Step 5: Complete Project Write Up
* **Clearly state the rationale for the choice of tools and technologies for the project.**

Mainly the constraints of the project does give the direction of the choice. Apache Spark is known for his advantage of handle large data volumes and perform distributed processing, as well his well integration to S3 AWS technologie.  
A star schema does give a simple and effective way to organize data for querying and finaly the CSV sources did help out to favoured this. 

* **Propose how often the data should be updated and why.**

As the data will be updated once a month, then the data refresh could be also be monthly. In the meanwhile there are none changes. 
- --

* **Write a description of how you would approach the problem differently under the following scenarios:**


 * - **The data was increased by 100x.**

 In this case the current setup does no be more the good one. A distributed database such as Hadhoop or Apache Cassandra could be potentialy look into, because they are designed to handle very large data volumes. On the other hand a cloud-based data warehouse like Amazon's Redshift would be optimized for fast querying and analysis of large datasets.  
 
 
 * - **The data populates a dashboard that must be updated on a daily basis by 7am every day.**
 
 An automation for diferent jobs is required. This could be handled setting up a scheduled job or workflow to load the data into the database on a daily basis. Data loading, require also an automated data loading process, some hours before to ensure it is finished on the requied time window.  
 * - - _Workflow orchestration tools:_ Apache Airflow can used to define and schedule data pipelines that include task such as data loadgin, transformation and Visualization.   
 * - - _Cloud-based schedulers:_ For an use on a regular basis there are different Cloud providers, like AWS and it Schedular for EC2 instances.
 
 
 * - **The database needed to be accessed by 100+ people.**
 
 This stress test on the infrastructre does end on the need of scaling up the existing infrastructure. Very typically in the industry! Normally the scale is on the wide to support more concurrent users. Of sure the security settings needs also to be checked to ensure protection of the database. 
 
 
 Some tools for scaling up:   
 * - - _Cloud Data Warehouses:_ Google's BigQuery, Amazon Redshift or Snowflake
 * - - _Relational databases:_ MySQL or PostgreSQL
 * - - _NoSQL Databases:_ Google Cloud Bigtable, Apache Casandra, MongoDB
 * - - _Data Lakes:_ Amazon S3 or Azure Data Lake Storage (Microsoft)
 * - - _Big Data processing frameworks:_ Apache Spark, Apache Hadoop
 
 

- --

#### Images

In [31]:
print(">> List of Images uploaded to (Udacity) Workspace")
!ls *.png

>> List of Images uploaded to (Udacity) Workspace
IMAGE-Database-Model.png	      IMAGE-S3-Bucket-Parquet-Overview.png
IMAGE-Room-Type-BarGraph-Airbnb.png   listings_detailed.csv-viewing-data.png
IMAGE-Room-Type-BarGraph-PowerBI.png  reviews_detailed.csv-viewing-data.png


In [32]:
print(">> List of Documents [*.pdf] ")
!ls *.pdf

>> List of Documents [*.pdf] 
Data-Dictionary-Airbnb-(Mexico-City).pdf


- --

END OF DOCUMENT - [Capstone-B-Project(LOC)-Notebook.ipynb]