# ETL & Datawarehouse
**Purpose** : load, process and save data from a datalake (S3) to a datawarehouse (RDS)

In [1]:
# import 
import os
import boto3
import pandas as pd
from dotenv import load_dotenv
from IPython.display import display
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker 
from sqlalchemy import Column, Integer, String, Float 
from sqlalchemy.ext.declarative import declarative_base

In [2]:
# load dotenv variables
load_dotenv()
DBUSER = os.getenv('DBUSER')
DBPASS = os.getenv('DBPASS')
DBHOST = os.getenv('DBHOST')
DBNAME = os.getenv('DBNAME')

DBNAME

'postgres'

In [3]:
# engine : connected to our db
connection_string = f"postgresql+psycopg2://{DBUSER}:{DBPASS}@{DBHOST}/{DBNAME}"
engine = create_engine(connection_string, echo=True, future=True)


In [4]:
# sqlalchemy session to upload database
Session = sessionmaker(bind=engine)
local_session = Session()

In [5]:
# declarative base
Base = declarative_base()

# create city table
# Let's define our table 
class City(Base):
    __tablename__ = "cities"

    uuid = Column(String, primary_key=True)
    name = Column(String)
    full_address = Column(String)
    latitude = Column(Float)
    longitude = Column(Float)

    def __repr__(self):
        return f"<City(name={self.name})>"

# weather table
class Weather(Base):
    __tablename__ = "weathers"

    weather_id = Column(Integer, primary_key=True)
    city_uuid = Column(String)
    volume_rain = Column(Float)

    def __repr__(self):
        return f"<Weather(uuid={self.city_uuid}, volume={self.volume_rain})>"

In [6]:
# create_table
INIT = True
if INIT : 
    Base.metadata.create_all(engine)

2021-12-04 23:01:10,702 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2021-12-04 23:01:10,703 INFO sqlalchemy.engine.Engine [raw sql] {}
2021-12-04 23:01:10,712 INFO sqlalchemy.engine.Engine select current_schema()
2021-12-04 23:01:10,714 INFO sqlalchemy.engine.Engine [raw sql] {}
2021-12-04 23:01:10,723 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2021-12-04 23:01:10,724 INFO sqlalchemy.engine.Engine [raw sql] {}
2021-12-04 23:01:10,732 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-12-04 23:01:10,735 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
2021-12-04 23:01:10,737 INFO sqlalchemy.engine.Engine [generated in 0.00282s] {'name': 'cities'}
2021-12-04 23:01:10,746 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(nam

### Read Datalake files
* weather_csv
* hotels_booking_csv

In [7]:
# s3 bucket
# dotenv variables
load_dotenv()
AWSS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

# aws session
session = boto3.Session(aws_access_key_id=AWSS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)

# s3 service
s3 = session.resource("s3")

# bucket 
bucket_name = 'kayak-mahadou'
kayak_bucket = s3.Bucket(bucket_name)

In [8]:
# read weather data from s3
weather_key = "weather_data.csv"
weather_csv_obj = s3.Object(bucket_name, weather_key)
weather_csv = weather_csv_obj.get()['Body'].read().decode('utf-8') 

# dataframe
weather_df = pd.read_csv(weather_csv, index_col=[0])
display(weather_df.sample(2))
weather_df.columns

Unnamed: 0,uuid,cities,full_address,latitude,longitude,volume_rain_7days
29,009d2a7b-a86f-474e-96bb-924cf9c48feb,Ariege,"Ariège, Occitanie, France métropolitaine, France",42.945537,1.406554,3.574
4,e211db59-98ca-4f5c-a64c-0af6490a83d1,Rouen,"Rouen, Seine-Maritime, Normandie, France métro...",49.440459,1.093966,3.425


Index(['uuid', 'cities', 'full_address', 'latitude', 'longitude',
       'volume_rain_7days'],
      dtype='object')

In [9]:
# read hotels data from s3
hotels_key = "hotels_booking.csv"
hotels_key_obj = s3.Object(bucket_name, hotels_key)
hotels_csv = hotels_key_obj.get()['Body'].read().decode('utf-8') 

# dataframe
hotels_df = pd.read_csv(hotels_csv, index_col=[0])
display(hotels_df.sample(2))

Unnamed: 0,city,name,url,image_url,score,description
237,saintes-maries-de-la-mer,Hôtel Les Arcades,booking.com/hotel/fr/ha-tel-les-arcades.fr.htm...,https://cf.bstatic.com/xdata/images/hotel/squa...,8.7,L'Hôtel 2 étoiles Les Arcades propose une conn...
244,carcassonne,ibis budget Carcassonne Aéroport - A61,booking.com/hotel/fr/etap-carcassonne-aeroport...,https://cf.bstatic.com/xdata/images/hotel/squa...,7.8,L'ibis budget Carcassonne Aéroport vous accuei...


### City

In [10]:
# get cities df
keep_col = ['uuid', 'cities', 'full_address', 'latitude', 'longitude']
city_df = weather_df[keep_col].copy()

display(city_df.sample(2))

Unnamed: 0,uuid,cities,full_address,latitude,longitude
23,fbfaeec5-dced-417d-b231-6b3e9f9fa000,Uzes,"Uzès, Nîmes, Gard, Occitanie, France métropoli...",44.012128,4.419672
24,4880e451-d96c-442c-a01c-3a8b2cdfeadb,Nimes,"Nîmes, Gard, Occitanie, France métropolitaine,...",43.837425,4.360069


In [11]:
# # all_cities in database
# cities_list = local_session.query(City)..filter_by(name='Joe').first()
# session.query(db.users).filter_by(name='Joe', surname='Dodson')
# user = User.query.filter_by(email=email).first()

In [12]:
# fill City TABLE
for idx in range(city_df.shape[0]) : 
    # get row
    city_row = city_df.iloc[idx]

    # create city object
    city_uuid         = city_row["uuid"]
    city_name         = city_row["cities"]
    city_fulladdress  = city_row["full_address"]
    city_latitude     = city_row["latitude"]
    city_longitude    = city_row["longitude"]
    city = City(uuid=city_uuid, name=city_name, full_address=city_fulladdress,
                latitude=city_latitude, longitude=city_longitude)

    # add & commit
    # verify if cities in table
    city_in_table = local_session.query(City).filter_by(name=city_name).first()

    if not city_in_table : 
        # Add values to db 
        local_session.add(city)

        # Commit the results 
        local_session.commit()   
        
    print(city)

2021-12-04 23:01:11,673 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-12-04 23:01:11,682 INFO sqlalchemy.engine.Engine SELECT cities.uuid AS cities_uuid, cities.name AS cities_name, cities.full_address AS cities_full_address, cities.latitude AS cities_latitude, cities.longitude AS cities_longitude 
FROM cities 
WHERE cities.name = %(name_1)s 
 LIMIT %(param_1)s
2021-12-04 23:01:11,684 INFO sqlalchemy.engine.Engine [generated in 0.00305s] {'name_1': 'Mont Saint Michel', 'param_1': 1}
2021-12-04 23:01:11,700 INFO sqlalchemy.engine.Engine INSERT INTO cities (uuid, name, full_address, latitude, longitude) VALUES (%(uuid)s, %(name)s, %(full_address)s, %(latitude)s, %(longitude)s)
2021-12-04 23:01:11,702 INFO sqlalchemy.engine.Engine [generated in 0.00399s] {'uuid': '87f71cf6-82e0-4d02-b54e-e291624d4a7a', 'name': 'Mont Saint Michel', 'full_address': 'Mont Saint-Michel, Le Mont-Saint-Michel, Avranches, Manche, Normandie, France métropolitaine, 50170, France', 'latitude': 48.6359541, 'lo

### Weather

In [13]:
# weather df
keep_col = ['uuid', 'volume_rain_7days']
weather_df_volume = weather_df[keep_col].copy()
weather_df_volume = weather_df_volume.reset_index()
display(weather_df_volume.sample(2))

Unnamed: 0,index,uuid,volume_rain_7days
11,11,1562b761-a84a-4fca-9d87-c9c474f9ac46,4.777
30,30,4de7a96b-c5e9-4ee2-9d88-e8f2d3587913,3.214


In [14]:
# fill Weather TABLE
for idx in range(weather_df_volume.shape[0]) : 
    # get row
    weather_row = weather_df_volume.iloc[idx]

    # create city object
    weather_id        = int(weather_row["index"])
    city_uuid         = weather_row["uuid"]
    weather_volume    = float(weather_row["volume_rain_7days"])
    weather = Weather(weather_id=weather_id, city_uuid=city_uuid, volume_rain=weather_volume)

    # add & commit
    # verify if cities in table
    weather_in_table = local_session.query(Weather).filter_by(weather_id=weather_id).first()

    if not weather_in_table : 
        # Add values to db 
        local_session.add(weather)

        # Commit the results 
        local_session.commit()   
        
    print(weather)

2021-12-04 23:01:15,471 INFO sqlalchemy.engine.Engine SELECT weathers.weather_id AS weathers_weather_id, weathers.city_uuid AS weathers_city_uuid, weathers.volume_rain AS weathers_volume_rain 
FROM weathers 
WHERE weathers.weather_id = %(weather_id_1)s 
 LIMIT %(param_1)s
2021-12-04 23:01:15,474 INFO sqlalchemy.engine.Engine [generated in 0.00344s] {'weather_id_1': 0, 'param_1': 1}
2021-12-04 23:01:15,482 INFO sqlalchemy.engine.Engine INSERT INTO weathers (weather_id, city_uuid, volume_rain) VALUES (%(weather_id)s, %(city_uuid)s, %(volume_rain)s)
2021-12-04 23:01:15,483 INFO sqlalchemy.engine.Engine [generated in 0.00160s] {'weather_id': 0, 'city_uuid': '87f71cf6-82e0-4d02-b54e-e291624d4a7a', 'volume_rain': 7.118}
2021-12-04 23:01:15,489 INFO sqlalchemy.engine.Engine COMMIT
2021-12-04 23:01:15,496 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-12-04 23:01:15,500 INFO sqlalchemy.engine.Engine SELECT weathers.weather_id AS weathers_weather_id, weathers.city_uuid AS weathers_city_uui

### Hotel