# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import numpy as np

import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.functions import monotonically_increasing_id, row_number, desc
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, TimestampType, StructType, StructField, StringType, DateType, BooleanType, DecimalType, DoubleType

In [2]:
config = configparser.ConfigParser()
config.read_file(open('creds.cfg'))

KEY             = config.get('AWS','KEY')
SECRET          = config.get('AWS','SECRET')

pd.DataFrame({"Param":
                  ["KEY", "SECRET"],
              "Value":
                  [KEY, SECRET] })

config = configparser.ConfigParser()
config.read('creds.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['KEY']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['SECRET']

spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .config("spark.executor.instances", 10) \
        .config("spark.executor.memory", "8g") \
        .getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
The goal of this project is to provide an analytics team with access to a data model that allows them to easily query and extract insights. To achieve this, I am downloading CSV files, processing them with Python and Spark, and uploading the resulting data to an S3 bucket in the form of Parquet files. These Parquet files can be queried like tables, reading from S3, making it easy for the analytics team to perform various types of analysis.

Some examples of specific analyses that can be performed with this data model include:

1. Aggregate calculations: Use SQL queries to compute sums, averages, counts, and other aggregate values for various dimensions of the data.

2. Time series analysis: Examine trends and patterns over time by querying the data using time-based filters.

3. Correlation analysis: Identify relationships between different variables in the data by computing correlations and performing regression analyses.

4. Segmentation: Divide the data into groups based on common characteristics and analyze each group separately to identify trends and patterns within each segment.

5. Predictive modeling: Use machine learning algorithms to build models that can predict future outcomes based on the data, e.g. classification algorithms.

Overall, this data model will enable the analytics BI team to gain a deeper understanding of the data and uncover valuable insights that will inform and drive business decisions. 

#### Describe and Gather Data 
The data set I am using has been gathered from this website (http://insideairbnb.com/get-the-data/) and contains various csv files with information about listing of the properties in airbnb, neighbourhood, and reviews left by visitors.

In [3]:
calendar_csv = 'airbnb_data/calendar.csv'
list_det_csv = 'airbnb_data/listings_detailed.csv'
list_csv = 'airbnb_data/listings.csv'
hoods_csv = 'airbnb_data/neighbourhoods.csv'
reviews_csv = 'airbnb_data/reviews_detailed.csv'

##pandas df for data exploration
calendar_df = pd.read_csv(calendar_csv)
list_det_df = pd.read_csv(list_det_csv)
list_df = pd.read_csv(list_csv)
hoods_df = pd.read_csv(hoods_csv)
reviews_df = pd.read_csv(reviews_csv)

I turned the csv into pd dataframes first to explore and display the data in a nice format

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
1. visualise the data in a pandas dataframe to assess quality by eye
2. check for nulls, possibly delete nulls or replace with '' or 0 depending on column type
3. check for duplciates on key column like unique identifiers, drop the rows if any

In [4]:
#rename price at it is also found in another future table, view calendar df
calendar_df = calendar_df.rename(columns={'price': 'requested_price'}) #doing this retroactively as I realised could be an issue later on
print(calendar_df.head(2))

#view review df
print(reviews_df.head(2))

#view listing df
print(list_df.head(2))

#drop min and max nights as they appear in another future table, view listing detailed df
to_drop = ['maximum_nights','minimum_nights','price']
list_det_df = list_det_df.drop(to_drop, axis=1)
print(list_det_df.head(2))

#view neighbourhoods df
print(hoods_df.head(2))

           listing_id        date available requested_price adjusted_price  \
0  652868795892201022  2022-09-15         f          $85.00         $85.00   
1  652868795892201022  2022-09-16         f          $85.00         $85.00   

   minimum_nights  maximum_nights  
0             1.0          1125.0  
1             1.0          1125.0  
   listing_id      id        date  reviewer_id reviewer_name  \
0        3176    4283  2009-06-20        21475         Milan   
1       22438  218181  2011-04-05       401483     Alexandre   

                                            comments  
0  excellent stay, i would highly recommend it. a...  
1  Javier gave us quite a fright when none of his...  
                   id                                               name  \
0  652868795892201022  Kleine Auszeit? Oder Business-Trip? Alles mögl...   
1            27080612  Apartment with Living/Sleeping Room & own Kitchen   

     host_id    host_name     neighbourhood_group neighbourhood   lati

At this point I already want to merge listing with listing detailed to create one df,
I do this below:

In [5]:
#merging the two listing df to create one, left join on the detailed
listing_df = pd.merge(list_det_df, list_df,  how='left', left_on=['id'], right_on = ['id'])

# Transpose the dataframe and drop duplicate columns
listing_df_transposed = listing_df.T
listing_df_unique = listing_df_transposed.drop_duplicates()

# Transpose the dataframe back to its original shape
listing_df = listing_df_unique.T

#remove the suffix _x generated by column duplication
suffix = '_x'
listing_df = listing_df.rename(columns={col: col.replace(suffix, '') for col in listing_df.columns})

In [6]:
#cleaning the columns further in order to cast correct data types in order to convert from pd to spark df
listing_df = listing_df.fillna('')

listing_df['price'] = listing_df['price'].replace('$', '')
null_to_zero = ['bedrooms','beds','review_scores_rating','review_scores_accuracy','review_scores_cleanliness',
                'review_scores_checkin','reviews_per_month','review_scores_communication','review_scores_location','review_scores_value']
for i in null_to_zero:
    listing_df[i] = listing_df[i].replace('', 0)

Check for missing values:

In [7]:
dfs = [listing_df,calendar_df,reviews_df,hoods_df]

# % of missing values per column
for df in dfs:
    for col in df.columns:
        pct_missing = np.mean(df[col].isnull())
        print('{} - {}%'.format(col, round(pct_missing*100)))

id - 0.0%
listing_url - 0.0%
scrape_id - 0.0%
last_scraped - 0.0%
source - 0.0%
name - 0.0%
description - 0.0%
neighborhood_overview - 0.0%
picture_url - 0.0%
host_id - 0.0%
host_url - 0.0%
host_name - 0.0%
host_since - 0.0%
host_location - 0.0%
host_about - 0.0%
host_response_time - 0.0%
host_response_rate - 0.0%
host_acceptance_rate - 0.0%
host_is_superhost - 0.0%
host_thumbnail_url - 0.0%
host_picture_url - 0.0%
host_neighbourhood - 0.0%
host_listings_count - 0.0%
host_total_listings_count - 0.0%
host_verifications - 0.0%
host_has_profile_pic - 0.0%
host_identity_verified - 0.0%
neighbourhood - 0.0%
neighbourhood_cleansed - 0.0%
neighbourhood_group_cleansed - 0.0%
latitude - 0.0%
longitude - 0.0%
property_type - 0.0%
room_type - 0.0%
accommodates - 0.0%
bathrooms - 0.0%
bathrooms_text - 0.0%
bedrooms - 0.0%
beds - 0.0%
amenities - 0.0%
minimum_minimum_nights - 0.0%
maximum_minimum_nights - 0.0%
minimum_maximum_nights - 0.0%
maximum_maximum_nights - 0.0%
minimum_nights_avg_ntm - 0.0%

In [8]:
#dropping bathroom as it is always empty
listing_df = listing_df.drop('bathrooms',axis=1)

Check for duplicates now:

In [9]:
duplicate_listing_id = listing_df[listing_df.duplicated(['id'])]['id'].count()
print("Number of duplicate listing_ids in listing: ", duplicate_listing_id)

#makes no sense to check this for calendar
# duplicate_calendar_listing_id = calendar_df[calendar_df.duplicated(['listing_id'])]['listing_id'].count()
# print("Number of duplicate listing_ids in calendar: ", duplicate_calendar_listing_id)

duplicate_review_id = reviews_df[reviews_df.duplicated(['id'])]['id'].count()
print("Number of duplicate ids in review: ", duplicate_review_id)

duplicate_neighbourhood_group = hoods_df[hoods_df.duplicated(['neighbourhood'])]['neighbourhood'].count()
print("Number of duplicate neighbourhood_group in hoods_df: ", duplicate_neighbourhood_group)

Number of duplicate listing_ids in listing:  0
Number of duplicate ids in review:  0
Number of duplicate neighbourhood_group in hoods_df:  0


No duplicates on key columns, moving on..

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Star Schema would be the model of choice as I believe for the way the data is presented and already available, it can very well be tranformed into dimension tables without the need to normalise any further. Additionally I am planning to build a central fact table by joining calendar, listing and reviews to create a source to retrieve events. 

#### 3.2 Mapping Out Data Pipelines
I am currently using Spark, so I will be parsing and querying the data on the fly in the notebook without building tables and filling them (CREATE & INSERT/COPY statements).
This is a great advantage of using Spark to do this job.
Once the fields to keep in the dimension tables and the structure of the fact table is clear, I will upload the files in parquet format in an S3 bucket.
From there I will redownload/read the parquet files directly into spark dataframes and query directly for demonstration purposes.

Below I am passing the credentials to access the AWS services, specifically I will use an S3 bucket as "data lake",
and configuring the environment.

I will then initiate the Spark builder to run the Spark job. Note that I requested additional calculating power to the Spark builder,
as these files are relatively heavy to upload/download to S3.

Below I define the schema so that I can apply it to the file when reading it.
Disclaimer, it was not working for all the files so I had to manually modify the data types of the fields to transform a pandas df into a spark df.

In [10]:
#forcing the schema onto the dataframes as spark reads all fields as string

listing_schema = StructType([
        StructField('id', IntegerType()),
        StructField('listing_url', StringType()),
        StructField('scrape_id', IntegerType()),
        StructField('last_scraped', TimestampType()),
        StructField('source', StringType()),
        StructField('name', StringType()),
        StructField('description', StringType()),
        StructField('neighborhood_overview', StringType()),
        StructField('picture_url', StringType()),
        StructField('host_id', IntegerType()),
        StructField('host_url', StringType()),
        StructField('host_name', StringType()),
        StructField('host_since', TimestampType()),
        StructField('host_location', StringType()),
        StructField('host_about', StringType()),
        StructField('host_response_time', IntegerType()),
        StructField('host_response_rate', IntegerType()),
        StructField('host_acceptance_rate', IntegerType()),
        StructField('host_is_superhost', StringType()),
        StructField('host_thumbnail_url', StringType()),
        StructField('host_picture_url', StringType()),
        StructField('host_neighbourhood', StringType()),
        StructField('host_listings_count', IntegerType()),
        StructField('host_total_listings_count', IntegerType()),
        StructField('host_verifications', StringType()),
        StructField('host_has_profile_pic', StringType()),
        StructField('host_identity_verified', StringType()),
        StructField('neighbourhood', StringType()),
        StructField('neighbourhood_cleansed', StringType()),
        StructField('neighbourhood_group_cleansed', StringType()),
        StructField('latitude', DoubleType()),
        StructField('longitude', DoubleType()),
        StructField('property_type', StringType()),
        StructField('room_type', StringType()),
        StructField('accommodates', StringType()),
        #StructField('bathrooms', StringType()),
        StructField('bathrooms_text', StringType()),
        StructField('bedrooms', IntegerType()),
        StructField('beds', IntegerType()),
        StructField('amenities', StringType()),
        StructField('price', DoubleType()),
        StructField('minimum_nights', IntegerType()),
        StructField('maximum_nights', IntegerType()),
        StructField('minimum_minimum_nights', IntegerType()),
        StructField('maximum_minimum_nights', IntegerType()),
        StructField('minimum_maximum_nights', IntegerType()),
        StructField('maximum_maximum_nights', IntegerType()),
        StructField('minimum_nights_avg_ntm', IntegerType()),
        StructField('maximum_nights_avg_ntm', IntegerType()),
        #StructField('calendar_updated', TimestampType()),
        StructField('has_availability', StringType()),
        StructField('availability_30', IntegerType()),
        StructField('availability_60', IntegerType()),
        StructField('availability_90', IntegerType()),
        StructField('availability_365', IntegerType()),
        StructField('calendar_last_scraped', StringType()),
        StructField('number_of_reviews', IntegerType()),
        StructField('number_of_reviews_ltm', IntegerType()),
        StructField('number_of_reviews_l30d', IntegerType()),
        StructField('first_review', TimestampType()),
        StructField('last_review', TimestampType()),
        StructField('review_scores_rating', IntegerType()),
        StructField('review_scores_accuracy', IntegerType()),
        StructField('review_scores_cleanliness', IntegerType()),
        StructField('review_scores_checkin', IntegerType()),
        StructField('review_scores_communication', IntegerType()),
        StructField('review_scores_location', IntegerType()),
        StructField('review_scores_value', IntegerType()),
        StructField('license', StringType()),
        StructField('instant_bookable', StringType()),
        StructField('calculated_host_listings_count', IntegerType()),
        StructField('calculated_host_listings_count_entire_homes', IntegerType()),
        StructField('calculated_host_listings_count_private_rooms', IntegerType()),
        StructField('calculated_host_listings_count_shared_rooms', IntegerType()),
        StructField('reviews_per_month', IntegerType())
    ])

calendar_schema = StructType([
        StructField('listing_id', StringType(), True),
        StructField('date', TimestampType(), True),
        StructField('available', BooleanType(), True),
        StructField('requested_price', DecimalType(), True),
        StructField('adjusted_price', DecimalType(), True)
    ])

hoods_schema = StructType([
        StructField('neighbourhood_group', StringType()),
        StructField('neighbourhood', StringType())
    ])

reviews_schema = StructType([
        StructField('listing_id', IntegerType()),
        StructField('id', IntegerType()),
        StructField('date', TimestampType()),
        StructField('reviewer_id', IntegerType()),
        StructField('reviewer_name', StringType()),
        StructField('comments', StringType())
    ])

the code below works better already inferring the schema, still doesn't work for listing.
For listing I have merged at the DF level already above, I need to create the spark dataframe from the pandas df instead of the csv.
Then recheck the schema

In [11]:
listing_df['id'] = listing_df['id'].astype('int32')
listing_df['listing_url'] = listing_df['listing_url'].astype('str')
listing_df['scrape_id'] = listing_df['scrape_id'].astype('int32')
listing_df['last_scraped'] = pd.to_datetime(listing_df['last_scraped'])
listing_df['source'] = listing_df['source'].astype('str')
listing_df['name'] = listing_df['name'].astype('str')
listing_df['description'] = listing_df['description'].astype('str')
listing_df['neighborhood_overview'] = listing_df['neighborhood_overview'].astype('str')
listing_df['picture_url'] = listing_df['picture_url'].astype('str')
listing_df['host_id'] = listing_df['host_id'].astype('int32')
listing_df['host_url'] = listing_df['host_url'].astype('str')
listing_df['host_name'] = listing_df['host_name'].astype('str')
listing_df['host_since'] = pd.to_datetime(listing_df['host_since'])
listing_df['host_location'] = listing_df['host_location'].astype('str')
listing_df['host_about'] = listing_df['host_about'].astype('str')
listing_df['host_response_time'] = listing_df['host_response_time'].astype('str')
listing_df['host_response_rate'] = listing_df['host_response_rate'].astype('str')
listing_df['host_acceptance_rate'] = listing_df['host_acceptance_rate'].astype('str')
listing_df['host_is_superhost'] = listing_df['host_is_superhost'].astype('str')
listing_df['host_thumbnail_url'] = listing_df['host_thumbnail_url'].astype('str')
listing_df['host_picture_url'] = listing_df['host_picture_url'].astype('str')
listing_df['host_neighbourhood'] = listing_df['host_neighbourhood'].astype('str')
listing_df['host_listings_count'] = listing_df['host_listings_count'].astype('str')
listing_df['host_total_listings_count'] = listing_df['host_total_listings_count'].astype('str')
listing_df['host_verifications'] = listing_df['host_verifications'].astype('str')
listing_df['host_has_profile_pic'] = listing_df['host_has_profile_pic'].astype('str')
listing_df['host_identity_verified'] = listing_df['host_identity_verified'].astype('str')
listing_df['neighbourhood'] = listing_df['neighbourhood'].astype('str')
listing_df['neighbourhood_cleansed'] = listing_df['neighbourhood_cleansed'].astype('str')
listing_df['neighbourhood_group_cleansed'] = listing_df['neighbourhood_group_cleansed'].astype('str')
listing_df['latitude'] = listing_df['latitude'].astype('float')
listing_df['longitude'] = listing_df['longitude'].astype('float')
listing_df['property_type'] = listing_df['property_type'].astype('str')
listing_df['room_type'] = listing_df['room_type'].astype('str')
listing_df['accommodates'] = listing_df['accommodates'].astype('int32')
listing_df['bathrooms_text'] = listing_df['bathrooms_text'].astype('str')
listing_df['bedrooms'] = listing_df['bedrooms'].astype('str')
listing_df['beds'] = listing_df['beds'].astype('int32')
listing_df['amenities'] = listing_df['amenities'].astype('str')
listing_df['price'] = listing_df['price'].replace('$', '', regex=True).astype('float', errors='ignore')
# listing_df['minimum_nights'] = listing_df['minimum_nights'].astype('int32')
# listing_df['maximum_nights'] = listing_df['maximum_nights'].astype('int32')
listing_df['minimum_minimum_nights'] = listing_df['minimum_minimum_nights'].astype('int32')
listing_df['maximum_minimum_nights'] = listing_df['maximum_minimum_nights'].astype('int32')
listing_df['minimum_maximum_nights'] = listing_df['minimum_maximum_nights'].astype('int32')
listing_df['maximum_maximum_nights'] = listing_df['maximum_maximum_nights'].astype('int32')
listing_df['minimum_nights_avg_ntm'] = listing_df['minimum_nights_avg_ntm'].astype('int32')
listing_df['maximum_nights_avg_ntm'] = listing_df['maximum_nights_avg_ntm'].astype('int32')
listing_df['has_availability'] = listing_df['has_availability'].astype('str')
listing_df['availability_30'] = listing_df['availability_30'].astype('int32')
listing_df['availability_60'] = listing_df['availability_60'].astype('int32')
listing_df['availability_90'] = listing_df['availability_90'].astype('int32')
listing_df['availability_365'] = listing_df['availability_365'].astype('int32')
#listing_df['calendar_last_scraped'] = pd.to_datetime(listing_df['calendar_last_scraped'])
listing_df['number_of_reviews'] = listing_df['number_of_reviews'].astype('int32')
listing_df['number_of_reviews_ltm'] = listing_df['number_of_reviews_ltm'].astype('int32')
listing_df['number_of_reviews_l30d'] = listing_df['number_of_reviews_l30d'].astype('int32')
listing_df['first_review'] = pd.to_datetime(listing_df['first_review'])
listing_df['last_review'] = pd.to_datetime(listing_df['last_review'])
listing_df['review_scores_rating'] = listing_df['review_scores_rating'].astype('int32')
listing_df['review_scores_accuracy'] = listing_df['review_scores_accuracy'].astype('int32')
listing_df['review_scores_cleanliness'] = listing_df['review_scores_cleanliness'].astype('int32')
listing_df['review_scores_checkin'] = listing_df['review_scores_checkin'].astype('int32')
listing_df['review_scores_communication'] = listing_df['review_scores_communication'].astype('int32')
listing_df['review_scores_location'] = listing_df['review_scores_location'].astype('int32')
listing_df['review_scores_value'] = listing_df['review_scores_value'].astype('int32')
#listing_df['requires_license'] = listing_df['requires_license'].astype('str')
listing_df['license'] = listing_df['license'].astype('str')
#listing_df['jurisdiction_names'] = listing_df['jurisdiction_names'].astype('str')
#listing_df['cancellation_policy'] = listing_df['cancellation_policy'].astype('str')
#listing_df['is_business_travel_ready'] = listing_df['is_business_travel_ready'].astype('str')
listing_df['instant_bookable'] = listing_df['instant_bookable'].astype('str')
#listing_df['require_guest_profile_picture'] = listing_df['require_guest_profile_picture'].astype('str')
#listing_df['require_guest_phone_verification'] = listing_df['require_guest_phone_verification'].astype('str')
listing_df['calculated_host_listings_count'] = listing_df['calculated_host_listings_count'].astype('str')
listing_df['calculated_host_listings_count_entire_homes'] = listing_df['calculated_host_listings_count_entire_homes'].astype('str')
listing_df['calculated_host_listings_count_private_rooms'] = listing_df['calculated_host_listings_count_private_rooms'].astype('str')
listing_df['calculated_host_listings_count_shared_rooms'] = listing_df['calculated_host_listings_count_shared_rooms'].astype('str')
listing_df['reviews_per_month'] = listing_df['reviews_per_month'].astype('float')

In [None]:
# #just a check to debug cell above
# print(listing_df['review_scores_communication'].unique())

In [12]:
#creating spark dataframes and passing the schemas to impart data type, exept for listing which converts from a pandas df
df_hoods = spark.read.csv(hoods_csv, header=True, sep=",", schema=hoods_schema)
df_reviews = spark.read.csv(reviews_csv, header=True, sep=",", schema=reviews_schema)
df_reviews = df_reviews.withColumnRenamed("id", "review_id").withColumnRenamed("date", "review_date")
df_listing = spark.createDataFrame(listing_df)
df_calendar = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').option('sep', ',').load(calendar_csv)

In [13]:
df_calendar = df_calendar.drop('price')
df_calendar.printSchema()

root
 |-- listing_id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- available: string (nullable = true)
 |-- adjusted_price: string (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- maximum_nights: integer (nullable = true)



In [14]:
#spark can run sql queries on the fly without create or insert statements
df_reviews.createOrReplaceTempView("reviews")
spark.sql("""
    SELECT listing_id, count(*) as reviews
    FROM reviews
    WHERE comments is not null
    GROUP by 1
    ORDER by reviews desc
    LIMIT 5
""").show()

+----------+-------+
|listing_id|reviews|
+----------+-------+
|    292864|    703|
|    517425|    700|
|    652366|    652|
|    264459|    627|
|  10103689|    598|
+----------+-------+



The query below is basically my fact table, see:

In [15]:
df_reviews.createOrReplaceTempView("reviews")
df_hoods.createOrReplaceTempView("hoods")
df_listing.createOrReplaceTempView("listing")
df_calendar.createOrReplaceTempView("calendar")

spark.sql("""
    SELECT *
    from calendar 
    left join listing 
        on listing.id=calendar.listing_id
    left join reviews 
        using (listing_id)
    limit 1
""").show()

+----------+----------+---------+--------------+--------------+--------------+-----+--------------------+---------+-------------------+-----------+--------------------+--------------------+---------------------+--------------------+-------+--------------------+---------+-------------------+---------------+--------------------+------------------+------------------+--------------------+-----------------+--------------------+--------------------+------------------+-------------------+-------------------------+--------------------+--------------------+----------------------+---------------+----------------------+----------------------------+--------+---------+------------------+---------------+------------+--------------+--------+----+--------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------+---------------+---------------+---------------+----------------+-----------------+

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model:
1. defining the destination bucket
2. defining the dimension "tables" to upload with the select method, and viewing them to make sure there's no error
3. creating the fact table "Bookings" by joining together the fact tables
4. writing all the tables to S3 with the write method, assessing proper partitioning and name
5. reading from S3 and creating again the tables objects 
6. quality checks that the rows retrieved match the uploaded in the next section

In [16]:
output_bucket = "s3a://udacitycapstone123/"

In [17]:
#preparing the various "tables" before uploading to the S3 as parquet

# REVIEW TABLE - extract columns to create review table
df_reviews = df_reviews.na.drop()
df_reviews = df_reviews.withColumn('month', month('review_date'))

reviews_table = df_reviews.select('listing_id', 'review_id', 'review_date', 'reviewer_id', 'reviewer_name', 'comments', 'month')
print(reviews_table.show(3))

#NEIGHBOURHOODS - extract columns to create Hoods table
hoods_table = df_hoods.select('neighbourhood_group', 'neighbourhood')
print(hoods_table.show(3))

#CALENDAR - extract columns to create CALENDAR table
df_calendar = df_calendar.withColumn('month', month('date'))
calendar_table = df_calendar.select('listing_id', 'month', 'date', 'available', 'adjusted_price', 'minimum_nights', 'maximum_nights')
print(calendar_table.show(3))

#LISTING - extract columns to create LISTING table
df_listing = df_listing.withColumn('month', month('host_since'))
#including only some columns in the listing table for demostration purposes and because it is very slow
listing_table = df_listing.select('id', 'month','name','description','host_id','host_name','host_since','source','latitude','longitude','price','review_scores_rating','reviews_per_month','room_type','neighbourhood')
print(listing_table.show(3))

+----------+---------+-------------------+-----------+-------------+--------------------+-----+
|listing_id|review_id|        review_date|reviewer_id|reviewer_name|            comments|month|
+----------+---------+-------------------+-----------+-------------+--------------------+-----+
|      3176|     4283|2009-06-20 00:00:00|      21475|        Milan|excellent stay, i...|    6|
|     22438|   218181|2011-04-05 00:00:00|     401483|    Alexandre|Javier gave us qu...|    4|
|      3176|   134722|2010-11-07 00:00:00|     263467|       George|Britta's apartmen...|   11|
+----------+---------+-------------------+-----------+-------------+--------------------+-----+
only showing top 3 rows

None
+--------------------+-------------------+
| neighbourhood_group|      neighbourhood|
+--------------------+-------------------+
|Charlottenburg-Wilm.|          Barstraße|
|Charlottenburg-Wilm.|Charlottenburg Nord|
|Charlottenburg-Wilm.|Düsseldorfer Straße|
+--------------------+------------------

In [18]:
##Creating the fact table BOOKINGS now
df_bookings_stg = df_calendar.join(listing_table, on=[df_calendar.listing_id == listing_table.id], how='left')
df_bookings = df_bookings_stg.join(df_reviews, on=[df_bookings_stg.listing_id == df_reviews.listing_id], how='left')
print(df_bookings.printSchema())
print(df_bookings.show(3))

#preparing BOOKINGS before uploading to the S3 as parquet
# extract columns to create review table
df_bookings = df_bookings.withColumn('month_review', month('review_date'))
#including only some columns in the listing table for demostration purposes and because it is very slow
booking_table = df_bookings.select('id', 'month_review','name','description','host_id','host_name','host_since','source','latitude','longitude','adjusted_price','review_id','review_date',
                                   'reviewer_id','review_scores_rating','reviews_per_month','room_type')
print(booking_table.show(3))

root
 |-- listing_id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- available: string (nullable = true)
 |-- adjusted_price: string (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- maximum_nights: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- id: long (nullable = true)
 |-- month: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- host_id: long (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: timestamp (nullable = true)
 |-- source: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- price: double (nullable = true)
 |-- review_scores_rating: long (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- listing_id: integer (nullable = true)
 |-- review_id: integer (nullable = true)
 |-- review_date: timestamp (nullable = true)
 

I could include data quality checks before writing to S3 to make sure the data is uploaded in the right format

In [None]:
# from pyspark.sql.functions import col, isnull

# def apply_constraints(df, constraints):
#     '''
#     this function takes the spark dataframe and list of contraints
#     and applies them to the df
#     '''
#     for constraint in constraints:
#         df = df.filter(constraint)
#     return df

In [None]:
# #below the list of various constraints for the tables
# from pyspark.sql.functions import col

# review_constraints = [
#     (col('listing_id') + col('review_date')).isUnique(),
#     col('comments').isNotNull()]

# hood_constraints = [
#     col("neighbourhood").isUnique(),
#     col("neighbourhood_group").isNotNull()]

# calendar_constraints = [
#     (col("listing_id") + col("date")).isUnique(),
#     col("available").isNotNull(),
#     col("price").isNotNull()]

# listing_constraints = [
#     col("host_id").isUnique(),
#     col("host_since").isNotNull()]

# booking_constraints = [
#     col("review_id").isUnique(),
#     col("review_date").isNotNull(),
#     col("reviewer_id").isNotNull()]

In [None]:
# #I can now use the function to pass the contraints to the dfs
# reviews_table = apply_constraints(reviews_table, review_constraints)
# hoods_table = apply_constraints(reviews_table, hood_constraints)
# calendar_table = apply_constraints(reviews_table, calendar_constraints)
# listing_table = apply_constraints(reviews_table, listing_constraints)
# booking_table = apply_constraints(reviews_table, booking_constraints)

Now that the tables have been checked for quality, they are ready to get uploaded to the S3 "data lake".

Disclaimer: It can timeout!

In [19]:
# WRITING TABLES AS PARQUET TO S3

# write REVIEW table to parquet files partitioned by month 
#reviews_table.write.mode('overwrite').partitionBy('month').parquet(output_bucket + 'reviews')
# print('Review Table is created in the S3 bucket!')

# write HOODS table to parquet files partitioned by neighbourhood_group
hoods_table.write.mode('overwrite').partitionBy('neighbourhood_group').parquet(output_bucket + 'neighbourhoods')
print('Neighbourhoods Table is created in the S3 bucket!')

# write CALENDAR table to parquet files partitioned by month
calendar_table.write.mode('overwrite').partitionBy('month').parquet(output_bucket + 'calendar')
print('Calendar Table is created in the S3 bucket!')

# write LISTING table to parquet files partitioned by month and room type
listing_table.write.mode('overwrite').partitionBy('month', 'room_type').parquet(output_bucket + 'listing')
print('Listing Table is created in the S3 bucket!')

# write BOOKINGS (fact) table to parquet files partitioned month
#booking_table.write.mode('overwrite').partitionBy('month_review').parquet(output_bucket + 'booking') #, 'room_type'
# print('Booking Table is created in the S3 bucket!')

Neighbourhoods Table is created in the S3 bucket!
Calendar Table is created in the S3 bucket!
Listing Table is created in the S3 bucket!


**The two tables below TIME-OUT sometimes, so I isolated them here, this is not a syntax/code error (pls check the s3 to see they have been written before, they are too heavy (reviews..))**

In [None]:
reviews_table.write.mode('overwrite').partitionBy('month').parquet(output_bucket + 'reviews')
print('Review Table is created in the S3 bucket!')

booking_table.write.mode('overwrite').partitionBy('month_review').parquet(output_bucket + 'booking')
print('Booking Table is created in the S3 bucket!')

The tables are now available to parse by accessing the S3 bucket like so:

Disclaimer: It can timeout! (you can check the mentioned S3 bucket to see that it worked.

In [20]:
#reading the tables from the S3 in order to parse, run analysis etc

listing = spark.read.parquet(output_bucket + 'listing')
calendar = spark.read.parquet(output_bucket + 'calendar')
neighbourhoods = spark.read.parquet(output_bucket + 'neighbourhoods')
# reviews = spark.read.parquet(output_bucket + 'reviews')
# booking = spark.read.parquet(output_bucket + 'booking')

**The two tables below TIME-OUT sometimes, so I isolated them here, this is not a syntax/code error**

In [None]:
reviews = spark.read.parquet(output_bucket + 'reviews')
booking = spark.read.parquet(output_bucket + 'booking')

Quality check to see if all the rows are retrieve after I upload as parquet and retrieve the parquet

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [25]:
# Get the count of rows in the dataframe
uploaded_table = [listing_table,hoods_table,calendar_table] #'booking_table','reviews_table'
downloaded_tables = [listing,neighbourhoods,calendar] #'booking','reviews'

expected_count = []
row_count = []

for i in uploaded_table:
    expected_count.append(i)
    
for i in downloaded_tables:
    row_count.append(i)

# Compare the counts
for uploaded, downloaded in zip(uploaded_table, downloaded_tables):
    if downloaded.count() != uploaded.count():
        raise ValueError("Data is incomplete. Expected {} rows in {} but found {}".format(uploaded.count(), uploaded, downloaded.count()))

The quality check above shows matching number of rows by throwing no error

Finally to prove that the tables read from S3 can be used I run some SQL query via Spark:

In [36]:
neighbourhoods.createOrReplaceTempView("hoods")
listing.createOrReplaceTempView("listing")
calendar.createOrReplaceTempView("calendar")

In [33]:
spark.sql("""
    select * 
    from calendar 
    limit 10
""").show()

+------------------+----------+---------+--------------+--------------+--------------+-----+
|        listing_id|      date|available|adjusted_price|minimum_nights|maximum_nights|month|
+------------------+----------+---------+--------------+--------------+--------------+-----+
|652868795892201022|2022-09-16|        f|        $85.00|             1|          1125|    9|
|652868795892201022|2022-09-17|        f|        $85.00|             1|          1125|    9|
|652868795892201022|2022-09-18|        f|        $85.00|             1|          1125|    9|
|652868795892201022|2022-09-19|        f|        $85.00|             1|          1125|    9|
|652868795892201022|2022-09-20|        f|        $96.00|             1|          1125|    9|
|652868795892201022|2022-09-21|        f|        $97.00|             1|          1125|    9|
|652868795892201022|2022-09-22|        f|        $99.00|             1|          1125|    9|
|652868795892201022|2022-09-23|        f|       $103.00|             1

In [34]:
spark.sql("""
    select * 
    from listing 
    limit 10
""").show()

+------+--------------------+--------------------+-------+------------+-------------------+---------------+------------------+------------------+-----+--------------------+-----------------+---------------+-----+---------------+
|    id|                name|         description|host_id|   host_name|         host_since|         source|          latitude|         longitude|price|review_scores_rating|reviews_per_month|  neighbourhood|month|      room_type|
+------+--------------------+--------------------+-------+------------+-------------------+---------------+------------------+------------------+-----+--------------------+-----------------+---------------+-----+---------------+
|179102|   The Special Place|Our appartment is...| 857327|        Alke|2011-07-23 00:00:00|    city scrape|          52.57051|          13.39872|280.0|                   4|             1.15|Berlin, Germany|    7|Entire home/apt|
|180440|CITY STUDIO WEST@...|<b>The space</b><...| 864399|Roland & Sam|2011-07-25 00

In [35]:
spark.sql("""
    select * 
    from hoods 
    limit 10
""").show()

+---------------+-------------------+
|  neighbourhood|neighbourhood_group|
+---------------+-------------------+
|      Adlershof| Treptow - Köpenick|
|Allende-Viertel| Treptow - Köpenick|
|   Altglienicke| Treptow - Köpenick|
| Altstadt-Kietz| Treptow - Köpenick|
|   Alt  Treptow| Treptow - Köpenick|
| Baumschulenweg| Treptow - Köpenick|
|      Bohnsdorf| Treptow - Köpenick|
|   Dammvorstadt| Treptow - Köpenick|
|Friedrichshagen| Treptow - Köpenick|
|         Grünau| Treptow - Köpenick|
+---------------+-------------------+



In [37]:
spark.sql("""
    select distinct room_type,
        count(*) as num_listing 
    from listing
    group by 1 
    order by 2 desc 
    limit 10
""").show()

+---------------+-----------+
|      room_type|num_listing|
+---------------+-----------+
|Entire home/apt|       9904|
|   Private room|       6438|
|    Shared room|        192|
|     Hotel room|        146|
+---------------+-----------+



In [41]:
spark.sql("""
    select 
        date_trunc('month',calendar.date) as month,
        count(distinct listing.id) as num_listing_month
    from calendar 
    left join listing 
        on calendar.listing_id = listing.id
    where date_trunc('year',calendar.date) = '2022'
        and host_id like '89%'
    group by 1 
    order by 1 asc 
    limit 12
""").show()

+-------------------+-----------------+
|              month|num_listing_month|
+-------------------+-----------------+
|2022-09-01 00:00:00|                9|
|2022-10-01 00:00:00|                9|
|2022-11-01 00:00:00|                9|
|2022-12-01 00:00:00|                9|
+-------------------+-----------------+



#### 4.3 Data dictionary 
The data dictionary is included as separate file in the workspace or github repo you are checking.
I also included the database ERD diagram in a png format.

#### Step 5: Project Write Up
Questions + Answers:
* **Clearly state the rationale for the choice of tools and technologies for the project.**

The choice of tools and technologies for the project was based on the requirements and constraints of the project. Apache Spark was chosen for its ability to handle large data volumes and perform distributed processing, as well as its integration with S3. A star schema was chosen as the database design because it is a simple and effective way to organize data for querying and reporting, and the csv were already organised in a way that favoured this.

* **Propose how often the data should be updated and why.**

The data should be updated as often as is necessary to ensure that the dashboards are accurate and up-to-date. This will depend on the specific requirements and needs of the business. If the data is constantly changing and needs to be reflected in real-time, it may need to be updated more frequently. On the other hand, if the data only changes occasionally and the dashboard does not need to be updated in real-time, it may be sufficient to update the data less frequently, e.g. once a day.

* **Write a description of how you would approach the problem differently under the following scenarios:**

* **The data was increased by 100x.**
 
 If the data was increased by 100x, the current approach to the problem may not be sufficient to handle the larger data volume. One potential solution could be to use a distributed database such as Hadoop or Apache Cassandra, which are designed to handle very large data volumes. Another option could be to involve a cloud-based data warehousing solution such as Amazon Redshift, which is optimized for fast querying and analysis of large datasets.

 
 * **The data populates a dashboard that must be updated on a daily basis by 7am every day.**
 
 If the dashboard must be updated on a daily basis by 7am every day, it will be necessary to automate the data update process. This could involve setting up a scheduled job or workflow to load the data into the database on a daily basis. It may also be necessary to optimize the data loading process to ensure that it can be completed within the required time frame.
 Examples of tools that can be used to update the dashboards on a daily basis:
 -Workflow orchestration tools: Apache Airflow can be used to define and schedule data pipelines that include tasks such as data loading, transformation, and visualization.
 -Cloud-based schedulers: Cloud providers offer scheduling services that can be used to run jobs on a regular basis. Example, Amazon Web Services (AWS) offers the Simple Scheduler for EC2 instances.
 
 
 * **The database needed to be accessed by 100+ people.**
 
 If the database needed to be accessed by 100+ people, it may be necessary to scale up the infrastructure to support the increased demand. This could involve using a higher-capacity database server or a distributed database system that can scale horizontally to support more concurrent users. It may also be necessary to implement additional security measures to ensure that the database is protected against unauthorized access.
 
 Here are some tools that you could use to scale up the infrastructure and support more concurrent users:
-Cloud data warehouses: Amazon Redshift, Google BigQuery, Snowflake
-Relational databases: PostgreSQL, MySQL
-NoSQL databases: Apache Cassandra, MongoDB, Google Cloud Bigtable
-Data lakes: Amazon S3, Azure Data Lake Storage
-Big data processing frameworks: Apache Spark, Apache Hadoop
 
 