# Tecnologias de Processamento de dados - 2019/2020

## Phase II - Group 12


|   Student      | Student ID |  Contribution in hours |
|----------------|------------|----------------|
| Beatriz Lima   |    49377   |   |
| David Almeida  |    54120   |   |
|João Castanheira|    55052   |   |
| Pedro Cotovio  |    55053   |   |





## 0. Get the data

In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from functools import reduce

from scipy import stats
from scipy.stats import norm

import psycopg2 as pg
import psycopg2.extras
import pandas.io.sql as sqlio
from datetime import datetime

The main datasets used in this warehouse are:

- http://insideairbnb.com/get-the-data.html for Lisbon, Portugal. - listings.csv
- https://dadosabertos.turismodeportugal.pt/datasets/alojamento-local) - Alojamento_Local.csv

In [2]:
listings_file_path = '../data/airbnb/listings.csv'
al_file_path = '../data/Alojamento_Local.csv'
df_al = pd.read_csv(al_file_path)
df_listings = pd.read_csv(listings_file_path)

FileNotFoundError: [Errno 2] File b'../data/Alojamento_Local.csv' does not exist: b'../data/Alojamento_Local.csv'

#### Merge _df_listings_ with _alojamento_local.csv_

In order to enrich the main dataset, we can cross it with the dataset from Registo Nacional de Alojamento Local (RNAL) to obtain further information regarding each listing's property, as well as refine already available data, particularly in the case of location data.

In [None]:
def intTryParse(value):
    """Tries to parse string to an integer"""
    try:
        a = int(value)
        return True
    except ValueError:
        return False

In [None]:
# get only listings where 
df_listings_with_license = df_listings[(~df_listings['license'].isnull()) #'license' is not null
                                        & (df_listings['license'] != 'Exempt')] # && != 'Exempt'

# string replace
df_listings_with_license['NrRNAL'] = [s.replace('/AL','').replace('.','') # remove '/AL' and '.' from code
                                      for s in df_listings_with_license['license']]

# get only records where license nr can be converted to int 
df_listings_with_license = df_listings_with_license[[intTryParse(s) # if code can be converted to int
                                                     for s in df_listings_with_license['NrRNAL']]] # keep it

# convert NrRNAL to int before merge the two dataframes
df_listings_with_license['NrRNAL'] = df_listings_with_license['NrRNAL'].astype(np.int64) # convert code to int

# inner join two dataframes
df_listings_al = pd.merge(df_listings_with_license, df_al, how='inner', on='NrRNAL')

Save the intersection of the two files to disk:

In [None]:
listings_al_file_path = '../data/listings_al.csv'
df_listings_al.to_csv(listings_al_file_path,index=False)
print('Dataset size: {}'.format(len(df_listings_al)))

## 1. Dimensions and facts tables of the data warehouse

+ Define and model them in SQL
+ Identify hierarchies and fact granularity
+ Create the dimensions and facts tables in the DBMS (postgreSQL)

These steps are all described in the individual ETL notebooks for each dimension.

As for each fact's granularity, a listings fact refers to each advertisement in the Airbnb website, which is defined to be owned by a certain host, posted on a certain date, with a property in a determined location, a review profile with scores and a defined price per night of stay. A bookings fact refers to a calendar day where a certain property has been booked.

### General schema

![Star schema](images/Schema.png)

## 2. Define an ETL workflow

![ETL pipeline](images/ETL_overall.png)

### 2.1. Dimensions

+ Identify all data sources for all dimensions. Add URL links to all data that should be available. If not public data, point to dropbox files, Google drive, or whatever
+ For each dimension show the code used for modeling, filtering and inserting data
+ Describe the process for inserting facts data

The ETL workflow is defined in separate notebooks for each dimension:

 - ETL_Property.ipynb
 - ETL_Host.ipynb
 - ETL_Review.ipynb
 - ETL_Date.ipynb
 - ETL_Location.ipynb

### 2.2. Processing and inserting facts

In [6]:
from db_connection import dbconnection 

In [103]:
def get_listing_price(listing_id):
    return int(df_listings_al[df_listings_al['id']==listing_id].price.values[0].strip().split('.')[0].replace(',','').replace('$',''))

# function to query table and convert it to pandas dataframe
def query_table(conn, table_name):
    """Returns DataFrame with queried database table"""
    sql = "select * from {};".format(table_name)
    #return dataframe
    return sqlio.read_sql_query(sql, conn)

# for this function to run, the dataframes must have the same columns, in the same order
def get_data_to_insert(df_etl, df_sql,pk):
    """Returns data valid for insertion in dimension from a new ETL-processed DataFrame"""
    if isinstance(pk, list): 
        df_insert = df_etl[~df_etl[pk].apply(tuple,1).isin(df_sql[pk].apply(tuple,1))]
        df_insert = df_insert.drop_duplicates(subset=pk)
    else:  
        df_insert = df_etl[-df_etl[pk].astype(int).isin(df_sql[pk].astype(int))].dropna(how = 'all')
        df_insert = df_insert.drop_duplicates(subset=[pk])
    return df_insert

# function for bulk insert
def insert_data(df, table_name, conn):
    """Inserts selected data into dimension table in database"""
    df_columns = list(df)
    columns = ",".join(df_columns)
    values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) 
    insert_stmt = "INSERT INTO {} ({}) {}".format(table_name,columns,values)
    success = True
    try:
        cursor = conn.cursor()
        psycopg2.extras.execute_batch(cursor, insert_stmt, df.values)
        conn.commit()
        success = True
    except pg.DatabaseError as error:
        success = False
        print('error:{}'.format(error))
    finally:
        if conn is not None:
            conn.close()
    return success

#### 3.1. _'Listings'_ fact table

Load the mappings between each dimension and the listing fact table


In [143]:
listings_date_path = '../processed_dt/df_listings_date.csv'
listings_host_path = '../processed_dt/df_listings_host.csv'
listings_property_path = '../processed_dt/df_listings_property.csv'
listings_review_path = '../processed_dt/df_listings_review.csv'
listings_location_path = '../processed_dt/location_fk.csv'

df_listings_date = pd.read_csv(listings_date_path)[['listing_id','date_id']]
df_listings_host = pd.read_csv(listings_host_path)[['listing_id','host_id']]
df_listings_property = pd.read_csv(listings_property_path).rename(columns={'ID':'listing_id','Property':'property_id'})[['listing_id','property_id']]
df_listings_review = pd.read_csv(listings_review_path)[['listing_id','review_id']]
df_listings_location = pd.read_csv(listings_location_path).rename(columns={'fk':'location_id','listings_id':'listing_id'})[['listing_id','location_id']]

Inner join all dataframes by 'listing_id'

In [144]:
#inner join all dataframes, by listing_id
dfs = [df_listings_date, df_listings_host, df_listings_property, df_listings_review, df_listings_location]
df_listings_facts_etl = reduce(lambda  left,right: pd.merge(left,right,on=['listing_id'], how='inner'), dfs)
#get the fact metric
df_listings_facts_etl['price_per_night'] = [get_listing_price(i) for i in df_listings_facts_etl['listing_id']]

Remove listings where price_per_night = 0

In [145]:
df_listings_facts_etl = df_listings_facts_etl[df_listings_facts_etl['price_per_night']>0]

Query listings table and convert it to dataframe:

In [146]:
conn = psycopg2.connect(host = dbconnection.server_host,database = dbconnection.dbname, user = dbconnection.dbusername,password = dbconnection.dbpassword,sslmode=dbconnection.sslmode,gssencmode=dbconnection.gssencmode)
df_listings_facts_sql = query_table(conn, 'listings')
conn.close()
df_listings_facts_sql.head()

Unnamed: 0,listing_id,host_id,date_id,location_id,property_id,review_id,price_per_night


Get only new listings that are not in the database:

In [151]:
df_listings_insert = get_data_to_insert(df_listings_facts_etl,df_listings_facts_sql,'listing_id')
df_listings_insert

Unnamed: 0,listing_id,date_id,host_id,property_id,review_id,location_id,price_per_night


Insert listings into the database

In [148]:
conn = psycopg2.connect(host = dbconnection.server_host,database = dbconnection.dbname, user = dbconnection.dbusername,password = dbconnection.dbpassword,sslmode=dbconnection.sslmode,gssencmode=dbconnection.gssencmode)
df_date_sql = query_table(conn, 'date')

In [149]:
#https://stackoverflow.com/questions/50626058/psycopg2-cant-adapt-type-numpy-int64
from psycopg2.extensions import register_adapter, AsIs
psycopg2.extensions.register_adapter(np.int64, psycopg2._psycopg.AsIs)

if len(df_listings_insert) > 0:
    table_name = 'listings'
    conn = psycopg2.connect(host = dbconnection.server_host,database = dbconnection.dbname, user = dbconnection.dbusername,password = dbconnection.dbpassword,sslmode=dbconnection.sslmode,gssencmode=dbconnection.gssencmode)
    success = insert_data(df_listings_insert,table_name, conn)
    conn.close()
    if success == True: print('Data inserted successfully')
else: print('No data to insert')

Data inserted successfully


Load data in listings fact table and save it in `df_listings_facts_sql` dataframe.

In [150]:
conn = psycopg2.connect(host = dbconnection.server_host,database = dbconnection.dbname, user = dbconnection.dbusername,password = dbconnection.dbpassword,sslmode=dbconnection.sslmode,gssencmode=dbconnection.gssencmode)
df_listings_facts_sql = query_table(conn, 'listings')
conn.close()

#### 3.2. _'Bookings'_ facts table

In this section we will load the Bookings fact table.

In [20]:
def date_pk(date):
    """Builds date primary key"""
    return int(date.strftime('%d%m%Y'))

Read data from calendar.csv, which contains the data to insert into the Bookings fact table.

In [18]:
df_calendar = pd.read_csv('../data/airbnb/calendar.csv')
print(df_calendar.shape)

(9125846, 7)


In [30]:
df_calendar.head()

Unnamed: 0,listing_id,date,available,price,adjusted_price,minimum_nights,maximum_nights
0,41791859,2020-01-28,t,$120.00,$120.00,7.0,1125.0
1,41791859,2020-01-29,t,$120.00,$120.00,7.0,1125.0
2,41791859,2020-01-30,t,$120.00,$120.00,7.0,1125.0
3,41791859,2020-01-31,t,$120.00,$120.00,7.0,1125.0
4,41791859,2020-02-01,t,$120.00,$120.00,7.0,1125.0


Calendar file has more than 9M records, around the number of listings * 365 days per year. For the purpose of this project, we will just read the first k rows of the calender file.

We will retrieve only the records where available = 'f', which are the records that correspond to future bookings.

In [137]:
#read just the first k items
k = 100000
df_bookings_etl = df_calendar[df_calendar['available'] == 'f'].iloc[:k][['listing_id','date','price']]

#create columns with the date primary key
df_bookings_etl['date_id'] = [date_pk(datetime.strptime(d, "%Y-%m-%d")) for d in df_bookings_etl['date']]

#remove date column
df_bookings_etl = df_bookings_etl.drop(['date'], axis=1)

#rename columns price
df_bookings_etl = df_bookings_etl.rename(columns={'price':'price_per_night'})

#format column to int
df_bookings_etl['price_per_night'] = [int(i.strip().split('.')[0].replace(',','').replace('$','')) for i in df_bookings_etl['price_per_night']]

#drop duplicates if exists
df_bookings_etl = df_bookings_etl.drop_duplicates(subset=['listing_id','date_id'])

print(df_bookings_etl.shape)

(100000, 3)


In [131]:
df_bookings_etl.head()

Unnamed: 0,listing_id,price_per_night,date_id
0,41791859,120,28012020
1,41791859,120,29012020
2,41791859,120,30012020
3,41791859,120,31012020
4,41791859,120,1022020


Now lets merge the bookings data that we selected (the first 100k records) with the listings available in the database. With that we ensure that we are only inserting booking facts that contain some value for each foreign key.

We're doing this because during the dimensions' ETL process we do not consider some records that have missing values, for instance.

In [138]:
df_bookings_etl = pd.merge(df_listings_facts_sql,df_bookings_etl,how='inner',on='listing_id')[['property_id','date_id_y','host_id','location_id','price_per_night_y']]
df_bookings_etl = df_bookings_etl.rename(columns = {'price_per_night_y':'price_per_night'})
df_bookings_etl = df_bookings_etl.rename(columns = {'date_id_y':'date_id'})
df_bookings_etl.shape

(57566, 5)

Query Bookings fact table:

In [139]:
conn = psycopg2.connect(host = dbconnection.server_host,database = dbconnection.dbname, user = dbconnection.dbusername,password = dbconnection.dbpassword,sslmode=dbconnection.sslmode,gssencmode=dbconnection.gssencmode)
df_bookings_sql = query_table(conn, 'booking')
conn.close()
df_bookings_sql

Unnamed: 0,property_id,date_id,host_id,location_id,price_per_night


Retrive the data needed to insert only:

In [141]:
df_bookings_insert = get_data_to_insert(df_bookings_etl,df_bookings_sql,['property_id','date_id'])
df_bookings_insert

Unnamed: 0,property_id,date_id,host_id,location_id,price_per_night
0,1,28012020,107347,1194,50
1,1,29012020,107347,1194,50
2,1,30012020,107347,1194,50
3,1,31012020,107347,1194,50
4,1,1022020,107347,1194,50
...,...,...,...,...,...
57561,112,22012021,7665008,743,89
57562,112,23012021,7665008,743,89
57563,112,24012021,7665008,743,89
57564,112,25012021,7665008,743,87


Insert Bookings data into database:

In [142]:
from psycopg2.extensions import register_adapter, AsIs
psycopg2.extensions.register_adapter(np.int64, psycopg2._psycopg.AsIs)

if len(df_bookingsavailability_insert) > 0:
    table_name = 'booking'
    conn = psycopg2.connect(host = dbconnection.server_host,database = dbconnection.dbname, user = dbconnection.dbusername,password = dbconnection.dbpassword,sslmode=dbconnection.sslmode,gssencmode=dbconnection.gssencmode)
    success = insert_data(df_bookings_insert,table_name, conn)
    conn.close()
    if success == True: print('Data inserted successfully')
else: print('No data to insert')

Data inserted successfully


## 4. Critical assessment of the work
+ Describe potential issues with the ETL procedure used
+ Compare your schema to the one previously defined in phase I
+ Discuss the issues for updating the data warehouse with novel data

One potential problem with this ETL is that the dimensions end up storing additional information. In each dimension, some records were discarded, given the ammount of missing data. The merging was only done in later stages of the processing, after dimensions' data were ready for loading. Thus, we end up keeping only the facts containing foreign keys for all dimensions, meaning that some records in the dimensions are actually never used.

Nevertheless, these 'factless' dimension records don't actually take up much space, comparing with the overall size of the data warehouse, since the majority of space is taken by the facts table.

Critical assessments of ETL for each dimension are explained in each dimension's notebook, including the issues for updating the data warehouse with novel data.