<p>In this third phase, we will : </p> 
<ul> 
    <li> retrieve the data from the data lake   </li> 
    <li> clean and structure the data and save it in a relational database </li> 
</ul>

### Table of Contents

* [1. Download the data from the Data Lake](#section1)
* [2. Relational Data Storage](#section2)
* [3. Structure and save cities coordinates data in the DB](#section3)
* [4. Structure and save weather data in the DB](#section4)
* [5. Structure and save hotels data in the DB](#section5)
* [6. DB schema](#section6)

In [6]:
# wrapping data
import pandas as pd
import numpy as np

# predefined functions
from modules import Funct as F

# global params
bucket_name = 'kayak-project'

# 1. Download the data from the Data Lake <a class="anchor" id="section1"></a>

In [7]:
F.download_file_dl(bucket_name, 'cities_coordinates.csv', 'data/cities_coordinates.csv')
F.download_file_dl(bucket_name, 'cities_weather.csv', 'data/cities_weather.csv')
F.download_file_dl(bucket_name, 'hotels.csv', 'data/hotels.csv')

The object does not exist !!
The object does not exist !!


In [7]:
geo_df = pd.read_csv('data/cities_coordinates.csv')
weather_df = pd.read_csv('data/cities_weather.csv')
hotels_df = pd.read_csv('data/hotels.csv')

# 2. Relational Data Storage <a class="anchor" id="section2"></a>

In [8]:
# Amazon Relational Database Service (Amazon RDS)
# create engine using the SQL toolkit and Object Relational Mapper sqlalchemy
# to get access to the database where we will save our structured data
from sqlalchemy import create_engine, text
#PostgreSQL database adapter
import psycopg2

dbuser = ''
dbpass = ''
dbhost = ''
dbname = ''

engine = create_engine(f"postgresql+psycopg2://{dbuser}:{dbpass}@{dbhost}/{dbname}", echo=True)

# 3. Structure and save cities coordinates data in the DB <a class="anchor" id="section3"></a>

In [9]:
# save the retrieved data in Amazon Relational Database
geo_df.to_sql(
    "city",
    engine,
    if_exists='replace',
    index =False
)

2022-01-16 20:42:58,995 INFO sqlalchemy.engine.base.Engine select version()
2022-01-16 20:42:58,996 INFO sqlalchemy.engine.base.Engine {}
2022-01-16 20:42:59,161 INFO sqlalchemy.engine.base.Engine select current_schema()
2022-01-16 20:42:59,162 INFO sqlalchemy.engine.base.Engine {}
2022-01-16 20:42:59,327 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
2022-01-16 20:42:59,328 INFO sqlalchemy.engine.base.Engine {}
2022-01-16 20:42:59,411 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS VARCHAR(60)) AS anon_1
2022-01-16 20:42:59,412 INFO sqlalchemy.engine.base.Engine {}
2022-01-16 20:42:59,494 INFO sqlalchemy.engine.base.Engine show standard_conforming_strings
2022-01-16 20:42:59,495 INFO sqlalchemy.engine.base.Engine {}
2022-01-16 20:42:59,660 INFO sqlalchemy.engine.base.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
20

In [10]:
# define the id column as a primary key for JOINs with other data later
conn = engine.connect()
stmt = text("ALTER TABLE city ADD PRIMARY KEY (id)")
result = conn.execute(stmt)

2022-01-16 20:47:59,905 INFO sqlalchemy.engine.base.Engine ALTER TABLE city ADD PRIMARY KEY (id)
2022-01-16 20:47:59,907 INFO sqlalchemy.engine.base.Engine {}
2022-01-16 20:48:00,233 INFO sqlalchemy.engine.base.Engine COMMIT


# 4. Structure and save weather data in the DB <a class="anchor" id="section4"></a>

In [15]:
# save the retrieved data in Amazon Relational Database
weather_df.to_sql(
    "weather",
    engine,
    if_exists='replace',
)

2022-01-16 21:09:54,175 INFO sqlalchemy.engine.base.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
2022-01-16 21:09:54,176 INFO sqlalchemy.engine.base.Engine {'name': 'weather'}
2022-01-16 21:09:54,422 INFO sqlalchemy.engine.base.Engine 
CREATE TABLE weather (
	index BIGINT, 
	day_time TEXT, 
	temperature FLOAT(53), 
	precipitation_p FLOAT(53), 
	humidity BIGINT, 
	weather TEXT, 
	cid BIGINT
)


2022-01-16 21:09:54,423 INFO sqlalchemy.engine.base.Engine {}
2022-01-16 21:09:54,591 INFO sqlalchemy.engine.base.Engine COMMIT
2022-01-16 21:09:54,674 INFO sqlalchemy.engine.base.Engine CREATE INDEX ix_weather_index ON weather (index)
2022-01-16 21:09:54,675 INFO sqlalchemy.engine.base.Engine {}
2022-01-16 21:09:54,841 INFO sqlalchemy.engine.base.Engine COMMIT
2022-01-16 21:09:54,927 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2022-01-16 21:09:54,930 INFO sqlalchemy.engine.base.Engine IN

In [16]:
# define the index as a primary key
conn = engine.connect()
stmt = text("ALTER TABLE weather ADD PRIMARY KEY (index)")
result = conn.execute(stmt)

2022-01-16 21:10:14,976 INFO sqlalchemy.engine.base.Engine ALTER TABLE weather ADD PRIMARY KEY (index)
2022-01-16 21:10:14,978 INFO sqlalchemy.engine.base.Engine {}
2022-01-16 21:10:15,151 INFO sqlalchemy.engine.base.Engine COMMIT


In [17]:
# define the cid column as a foreign key
stmt = text("ALTER TABLE weather ADD CONSTRAINT fk_city FOREIGN KEY (cid) REFERENCES city (id)")
result = conn.execute(stmt)

2022-01-16 21:10:17,949 INFO sqlalchemy.engine.base.Engine ALTER TABLE weather ADD CONSTRAINT fk_city FOREIGN KEY (cid) REFERENCES city (id)
2022-01-16 21:10:17,951 INFO sqlalchemy.engine.base.Engine {}
2022-01-16 21:10:18,115 INFO sqlalchemy.engine.base.Engine COMMIT


# 5. Structure and save hotels data in the DB <a class="anchor" id="section5"></a>

In [63]:
hotels_df.head(2)

Unnamed: 0,cid,name,star,rating,rating_title,number_of_ratings,price,url,lat,lon
0,0,Le Relais Saint Michel,4 étoiles,7.8,Bien,1 910 expériences vécues,"€ 78,32",https://www.booking.com/hotel/fr/le-relais-sai...,48.617587,-1.510396
1,0,Les Terrasses Poulard,3 étoiles,7.3,Bien,2 170 expériences vécues,"€ 126,84",https://www.booking.com/hotel/fr/les-terrasses...,48.635349,-1.510379


In [64]:
hotels_df.dtypes

cid                    int64
name                  object
star                  object
rating               float64
rating_title          object
number_of_ratings     object
price                 object
url                   object
lat                  float64
lon                  float64
dtype: object

In [65]:
# Clean and Structure
# extract numeric data from columns = [star, number_of_ratings, price]

def get_int(text, split_word):
    if text != text:
        return np.nan
    else:
        text = int(text.split(split_word)[0].replace(u'\xa0', u''))
        return text

def get_float(text, split_word):
    if text != text:
        return np.nan
    else:
        text = float(text.split(split_word)[1].replace(u'\xa0', u'').replace(',', '.'))
        return text

In [66]:
hotels_df['number_of_ratings'] = hotels_df['number_of_ratings'].apply(get_int, args =['expériences'])
hotels_df['star'] = hotels_df['star'].apply(get_int, args =['étoiles']) 
hotels_df['price'] = hotels_df['price'].apply(get_float, args =['€']) 

In [67]:
hotels_df.dtypes

cid                    int64
name                  object
star                 float64
rating               float64
rating_title          object
number_of_ratings      int64
price                float64
url                   object
lat                  float64
lon                  float64
dtype: object

In [70]:
hotels_df.head(2)

Unnamed: 0,cid,name,star,rating,rating_title,number_of_ratings,price,url,lat,lon
0,0,Le Relais Saint Michel,4.0,7.8,Bien,1910,78.32,https://www.booking.com/hotel/fr/le-relais-sai...,48.617587,-1.510396
1,0,Les Terrasses Poulard,3.0,7.3,Bien,2170,126.84,https://www.booking.com/hotel/fr/les-terrasses...,48.635349,-1.510379


In [71]:
# save the retrieved data in Amazon Relational Database
hotels_df.to_sql(
    "hotels",
    engine,
    if_exists='replace',
)

2022-01-16 22:14:18,091 INFO sqlalchemy.engine.base.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
2022-01-16 22:14:18,093 INFO sqlalchemy.engine.base.Engine {'name': 'hotels'}
2022-01-16 22:14:18,341 INFO sqlalchemy.engine.base.Engine 
CREATE TABLE hotels (
	index BIGINT, 
	cid BIGINT, 
	name TEXT, 
	star FLOAT(53), 
	rating FLOAT(53), 
	rating_title TEXT, 
	number_of_ratings BIGINT, 
	price FLOAT(53), 
	url TEXT, 
	lat FLOAT(53), 
	lon FLOAT(53)
)


2022-01-16 22:14:18,342 INFO sqlalchemy.engine.base.Engine {}
2022-01-16 22:14:18,511 INFO sqlalchemy.engine.base.Engine COMMIT
2022-01-16 22:14:18,595 INFO sqlalchemy.engine.base.Engine CREATE INDEX ix_hotels_index ON hotels (index)
2022-01-16 22:14:18,596 INFO sqlalchemy.engine.base.Engine {}
2022-01-16 22:14:18,761 INFO sqlalchemy.engine.base.Engine COMMIT
2022-01-16 22:14:18,847 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2022-0

In [72]:
# define the index as a primary key
conn = engine.connect()
stmt = text("ALTER TABLE hotels ADD PRIMARY KEY (index)")
result = conn.execute(stmt)

2022-01-16 22:14:44,389 INFO sqlalchemy.engine.base.Engine ALTER TABLE hotels ADD PRIMARY KEY (index)
2022-01-16 22:14:44,390 INFO sqlalchemy.engine.base.Engine {}
2022-01-16 22:14:44,559 INFO sqlalchemy.engine.base.Engine COMMIT


In [73]:
# define the cid column as a foreign key
stmt = text("ALTER TABLE hotels ADD CONSTRAINT fk_city FOREIGN KEY (cid) REFERENCES city (id)")
result = conn.execute(stmt)

2022-01-16 22:14:44,649 INFO sqlalchemy.engine.base.Engine ALTER TABLE hotels ADD CONSTRAINT fk_city FOREIGN KEY (cid) REFERENCES city (id)
2022-01-16 22:14:44,652 INFO sqlalchemy.engine.base.Engine {}
2022-01-16 22:14:44,817 INFO sqlalchemy.engine.base.Engine COMMIT


# 6. DB schema <a class="anchor" id="section6"></a>

<img src="data/schema.png" width=1000 height=800>

<image src="data/schema.png" width=100 height=50>