In [1]:
!pip install -r requirements.txt



In [1]:
import os
import pandas as pd
from sqlalchemy import create_engine, inspect
import psycopg2
import shutil

In [8]:
connection_uri = "postgres+psycopg2://postgres:postgres@database:5432/planetly"

In [9]:
engine = create_engine(connection_uri, pool_pre_ping=True)
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7f75bc6d84f0>

In [10]:
engine

Engine(postgres+psycopg2://postgres:***@database:5432/planetly)

In [11]:
conn = psycopg2.connect(database="planetly", user='postgres',password='postgres', host='database', port='5432')

In [12]:
conn.autocommit = True
cursor = conn.cursor()

## Drivers

In [11]:
df_drivers_raw = pd.read_csv("data/drivers.csv")

In [12]:
df_drivers_clean = df_drivers_raw[['name', 'first_name']].sort_values('name').reset_index(drop=True)

In [13]:
df_drivers_clean = df_drivers_clean.drop_duplicates(subset=['name', 'first_name']).reset_index(drop=True)

In [14]:
df_drivers_clean

Unnamed: 0,name,first_name
0,Aaron,Louise
1,Abbyss,Enrika
2,Abrahmson,Issie
3,Adin,Adlai
4,Adnams,Flynn
...,...,...
995,Zotto,Melanie
996,Zum Felde,Gloriane
997,de Courcey,Dalila
998,de Leon,Kathi


In [20]:
cursor.execute('''
    CREATE TABLE IF NOT EXISTS drivers(
        driver_id SERIAL PRIMARY KEY,
        name VARCHAR(50),
        first_name VARCHAR(50)
    );''')

In [21]:
df_drivers_clean.to_sql('drivers', engine, if_exists='append', index=False)

## Cars

In [15]:
df_veh_cons_raw = pd.read_csv("data/vehicle_fuel_consumptions.csv")

In [16]:
df_veh_cons_raw.columns

Index(['BRAND', 'MODEL', 'VEHICLE CLASS', 'ENGINE SIZE L', 'CYLINDERS',
       'TRANSMISSION', 'FUEL_TYPE', 'FUEL CONSUMPTION (L/100 km)',
       'HWY (L/100 km)', 'COMB (L/100 km)', 'COMB (mpg)',
       'CO2_Emissions(g/km)'],
      dtype='object')

In [17]:
df_veh_cons_raw.rename(columns={'BRAND':'brand', 'MODEL':'model', 'VEHICLE CLASS':'vehicle_class', 'ENGINE SIZE L':'engine_size_l', 'CYLINDERS':'cylinders',
       'TRANSMISSION':'transmission', 'FUEL_TYPE':'fuel_type', 'FUEL CONSUMPTION (L/100 km)':'fuel_consumption_l_per_hundred_km',
       'HWY (L/100 km)':'hwy_l_per_hundred_km', 'COMB (L/100 km)':'comb_l_per_hundred', 'COMB (mpg)':'comb_mpg',
       'CO2_Emissions(g/km)':'co2_emission_g_per_km'}, inplace=True)

In [18]:
df_veh_cons_raw.sort_values('brand', inplace=True)

In [19]:
df_veh_cons_raw = df_veh_cons_raw.reset_index(drop=True)

In [52]:
df_cars_clean = df_veh_cons_raw.drop_duplicates(subset=['brand', 'model','vehicle_class', 'engine_size_l', 'cylinders',
       'transmission', 'fuel_type']).reset_index(drop=True)

In [55]:
cursor.execute('''
    CREATE TABLE IF NOT EXISTS cars(
        car_id SERIAL PRIMARY KEY,
        brand VARCHAR(50), 
        model VARCHAR(50), 
        vehicle_class VARCHAR(50), 
        engine_size_l FLOAT, 
        cylinders FLOAT, 
        transmission VARCHAR(50), 
        fuel_type VARCHAR(50),
        fuel_consumption_l_per_hundred_km FLOAT,
        hwy_l_per_hundred_km FLOAT,
        comb_l_per_hundred FLOAT,
        comb_mpg INT,
        co2_emission_g_per_km INT
    );''')

In [57]:
df_cars_clean.to_sql('cars', engine, if_exists='append', index=False)

## Brand

In [42]:
df_brand = df_cars[['brand']].drop_duplicates()

In [None]:
df_brand.reset_index(drop=True)

## Car consumptions

In [23]:
df_veh_cons_raw.columns

Index(['brand', 'model', 'vehicle_class', 'engine_size_l', 'cylinders',
       'transmission', 'fuel_type', 'fuel_consumption_l_per_hundred_km',
       'hwy_l_per_hundred_km', 'comb_l_per_hundred', 'comb_mpg',
       'co2_emission_g_per_km'],
      dtype='object')

In [26]:
df_cars_db = pd.read_sql_query("SELECT * FROM cars;", engine)

In [None]:
#df_cars_consumption = df_cars_db.merge(df_veh_cons_raw, how='left', on=['brand', 'model', 'vehicle_class', 'engine_size_l',
#       'cylinders', 'transmission', 'fuel_type'], suffixes=['__left', '__right'])[['car_id','fuel_consumption_l_per_hundred_km',
#       'hwy_l_per_hundred_km', 'comb_l_per_hundred', 'comb_mpg',
#       'co2_emission_g_per_km']]

In [None]:
#df_cars_consumption.head()

Unnamed: 0,car_id,fuel_consumption_l_per_hundred_km,hwy_l_per_hundred_km,comb_l_per_hundred,comb_mpg,co2_emission_g_per_km
0,1,8.0,7.5,7.7,37,180
1,2,12.2,9.1,10.7,26,251
2,3,12.4,8.6,10.7,26,249
3,4,11.2,7.5,9.6,29,226
4,5,12.7,9.1,11.1,25,254


In [None]:
#cursor.execute('''
#        CREATE TABLE IF NOT EXISTS cars_consumptions(
#            consumption_id SERIAL PRIMARY KEY,
#            car_id INT,
#            fuel_consumption_l_per_hundred_km FLOAT,
#            hwy_l_per_hundred_km FLOAT,
#            comb_l_per_hundred FLOAT,
#            comb_mpg INT,
#            co2_emission_g_per_km INT,
#            CONSTRAINT fk__cars__car_id__cars_consumptions_car_id
#            FOREIGN KEY (car_id)
#            REFERENCES cars(car_id)
#            ON UPDATE CASCADE ON DELETE RESTRICT
#        );''')

In [None]:
#df_cars_consumption.to_sql('cars_consumptions', engine, if_exists='append', index=False)

In [59]:
df_drivers_logbook_raw = pd.read_csv("data/drivers_logbook.csv")

In [60]:
df_drivers_logbook_raw

Unnamed: 0,brand,model,engine_size_l,cylinders,fuel_type,transmission,name,first_name,start_city,start_country,target_city,target_country,distance_km,date
0,FORD,F-150 (Payload Pkg),3.5,6.0,X,AS6,Schlagman,Blake,Kihniö,Finland,Haukivuori,Finland,232.235123,2014-05-16
1,MINI,JOHN COOPER WORKS COUNTRYMAN ALL4,1.6,4.0,Z,M6,Chitham,Ruthie,Liminka,Finland,Karkkila,Finland,675.654591,2014-07-14
2,TOYOTA,TACOMA,2.7,4.0,X,AS6,Spyvye,Silas,Luvia,Finland,Mellilä,Finland,319.300665,2014-08-14
3,PORSCHE,CAYENNE,3.6,6.0,Z,A8,Maddy,Verine,Kinnula,Finland,Pöytyä,Finland,939.400418,2014-03-07
4,TOYOTA,TACOMA 4WD,3.5,6.0,,AS6,Wellstood,Shawnee,Valkeakoski,Finland,Särkisalo,Finland,746.029780,2014-08-28
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4995,HYUNDAI,ELANTRA,1.8,4.0,X,AS6,Champney,Tessi,Askainen,Finland,Kempele,Finland,730.565888,2014-07-29
4996,GMC,YUKON,5.3,8.0,X,A6,Mateo,Tally,Ruotsinpyhtää,Finland,Porvoo,Finland,761.012486,2014-08-20
4997,FORD,EXPLORER FFV AWD,3.5,6.0,X,AS6,Polsin,Viola,Suolahti,Finland,Lumijoki,Finland,488.798424,2014-02-16
4998,KIA,SOUL,1.6,4.0,X,M6,Bristo,Vivienne,Kerava,Finland,Joroinen,Finland,124.987769,2014-04-14


In [61]:
df_car_log = df_cars_db.merge(df_drivers_logbook_raw, how='right', on=['brand', 'model', 'engine_size_l', 'cylinders', 'fuel_type', 'transmission'])

In [62]:
df_car_log

Unnamed: 0,car_id,brand,model,vehicle_class,engine_size_l,cylinders,transmission,fuel_type,fuel_consumption_l_per_hundred_km,hwy_l_per_hundred_km,...,comb_mpg,co2_emission_g_per_km,name,first_name,start_city,start_country,target_city,target_country,distance_km,date
0,379,FORD,F-150 (Payload Pkg),PICKUP TRUCK - STANDARD,3.5,6.0,AS6,X,14.7,10.7,...,22,303,Schlagman,Blake,Kihniö,Finland,Haukivuori,Finland,232.235123,2014-05-16
1,746,MINI,JOHN COOPER WORKS COUNTRYMAN ALL4,COMPACT,1.6,4.0,M6,Z,9.4,7.6,...,33,201,Chitham,Ruthie,Liminka,Finland,Karkkila,Finland,675.654591,2014-07-14
2,942,TOYOTA,TACOMA,PICKUP TRUCK - SMALL,2.7,4.0,AS6,X,12.0,10.0,...,25,261,Spyvye,Silas,Luvia,Finland,Mellilä,Finland,319.300665,2014-08-14
3,843,PORSCHE,CAYENNE,SUV - STANDARD,3.6,6.0,A8,Z,13.5,9.7,...,24,277,Maddy,Verine,Kinnula,Finland,Pöytyä,Finland,939.400418,2014-03-07
4,921,TOYOTA,TACOMA 4WD,PICKUP TRUCK - SMALL,3.5,6.0,AS6,,13.1,10.5,...,24,280,Wellstood,Shawnee,Valkeakoski,Finland,Särkisalo,Finland,746.029780,2014-08-28
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4995,481,HYUNDAI,ELANTRA,MID-SIZE,1.8,4.0,AS6,X,8.5,6.3,...,38,176,Champney,Tessi,Askainen,Finland,Kempele,Finland,730.565888,2014-07-29
4996,423,GMC,YUKON,SUV - STANDARD,5.3,8.0,A6,X,15.1,10.4,...,22,304,Mateo,Tally,Ruotsinpyhtää,Finland,Porvoo,Finland,761.012486,2014-08-20
4997,354,FORD,EXPLORER FFV AWD,SUV - STANDARD,3.5,6.0,AS6,X,14.4,10.4,...,22,295,Polsin,Viola,Suolahti,Finland,Lumijoki,Finland,488.798424,2014-02-16
4998,587,KIA,SOUL,STATION WAGON - SMALL,1.6,4.0,M6,X,9.9,7.8,...,31,208,Bristo,Vivienne,Kerava,Finland,Joroinen,Finland,124.987769,2014-04-14


In [63]:
values = df_car_log.distance_km * df_car_log.co2_emission_g_per_km

In [64]:
values

0        70367.242330
1       135806.572737
2        83337.473579
3       260213.915921
4       208888.338486
            ...      
4995    128579.596259
4996    231347.795821
4997    144195.535106
4998     25997.455974
4999    284462.655909
Length: 5000, dtype: float64

In [65]:
df_car_log['total_emission'] = values

In [68]:
df_car_log

Unnamed: 0,car_id,brand,model,vehicle_class,engine_size_l,cylinders,transmission,fuel_type,fuel_consumption_l_per_hundred_km,hwy_l_per_hundred_km,...,co2_emission_g_per_km,name,first_name,start_city,start_country,target_city,target_country,distance_km,date,total_emission
0,379,FORD,F-150 (Payload Pkg),PICKUP TRUCK - STANDARD,3.5,6.0,AS6,X,14.7,10.7,...,303,Schlagman,Blake,Kihniö,Finland,Haukivuori,Finland,232.235123,2014-05-16,70367.242330
1,746,MINI,JOHN COOPER WORKS COUNTRYMAN ALL4,COMPACT,1.6,4.0,M6,Z,9.4,7.6,...,201,Chitham,Ruthie,Liminka,Finland,Karkkila,Finland,675.654591,2014-07-14,135806.572737
2,942,TOYOTA,TACOMA,PICKUP TRUCK - SMALL,2.7,4.0,AS6,X,12.0,10.0,...,261,Spyvye,Silas,Luvia,Finland,Mellilä,Finland,319.300665,2014-08-14,83337.473579
3,843,PORSCHE,CAYENNE,SUV - STANDARD,3.6,6.0,A8,Z,13.5,9.7,...,277,Maddy,Verine,Kinnula,Finland,Pöytyä,Finland,939.400418,2014-03-07,260213.915921
4,921,TOYOTA,TACOMA 4WD,PICKUP TRUCK - SMALL,3.5,6.0,AS6,,13.1,10.5,...,280,Wellstood,Shawnee,Valkeakoski,Finland,Särkisalo,Finland,746.029780,2014-08-28,208888.338486
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4995,481,HYUNDAI,ELANTRA,MID-SIZE,1.8,4.0,AS6,X,8.5,6.3,...,176,Champney,Tessi,Askainen,Finland,Kempele,Finland,730.565888,2014-07-29,128579.596259
4996,423,GMC,YUKON,SUV - STANDARD,5.3,8.0,A6,X,15.1,10.4,...,304,Mateo,Tally,Ruotsinpyhtää,Finland,Porvoo,Finland,761.012486,2014-08-20,231347.795821
4997,354,FORD,EXPLORER FFV AWD,SUV - STANDARD,3.5,6.0,AS6,X,14.4,10.4,...,295,Polsin,Viola,Suolahti,Finland,Lumijoki,Finland,488.798424,2014-02-16,144195.535106
4998,587,KIA,SOUL,STATION WAGON - SMALL,1.6,4.0,M6,X,9.9,7.8,...,208,Bristo,Vivienne,Kerava,Finland,Joroinen,Finland,124.987769,2014-04-14,25997.455974


In [66]:
df_car_log.loc[df_car_log['car_id']==357]

Unnamed: 0,car_id,brand,model,vehicle_class,engine_size_l,cylinders,transmission,fuel_type,fuel_consumption_l_per_hundred_km,hwy_l_per_hundred_km,...,co2_emission_g_per_km,name,first_name,start_city,start_country,target_city,target_country,distance_km,date,total_emission
80,357,FORD,FOCUS FFV,COMPACT,2.0,4.0,AM6,X,8.9,6.0,...,178,Sorro,Adorne,Säynätsalo,Finland,Parola,Finland,324.348518,2014-04-22,57734.03629
1931,357,FORD,FOCUS FFV,COMPACT,2.0,4.0,AM6,X,8.9,6.0,...,178,Lawless,Chaddie,Saari,Finland,Karkkila,Finland,879.892396,2014-06-29,156620.846539
2625,357,FORD,FOCUS FFV,COMPACT,2.0,4.0,AM6,X,8.9,6.0,...,178,Kinchlea,Emory,Kärkölä,Finland,Merimasku,Finland,905.628761,2014-05-09,161201.91946
2659,357,FORD,FOCUS FFV,COMPACT,2.0,4.0,AM6,X,8.9,6.0,...,178,Clemintoni,Conney,Askainen,Finland,Salo,Finland,486.579992,2014-03-30,86611.23864
2679,357,FORD,FOCUS FFV,COMPACT,2.0,4.0,AM6,X,8.9,6.0,...,178,Bartot,Thomasine,Ristinummi,Finland,Houtskär,Finland,949.650853,2014-11-22,169037.851847
3261,357,FORD,FOCUS FFV,COMPACT,2.0,4.0,AM6,X,8.9,6.0,...,178,Thurley,Felizio,Raahe,Finland,Längelmäki,Finland,74.656471,2014-01-22,13288.851913
3388,357,FORD,FOCUS FFV,COMPACT,2.0,4.0,AM6,X,8.9,6.0,...,178,Mansour,Garrett,Karkkila,Finland,Porvoo,Finland,285.65046,2014-06-01,50845.781912
3467,357,FORD,FOCUS FFV,COMPACT,2.0,4.0,AM6,X,8.9,6.0,...,178,Sorro,Adorne,Kiihtelysvaara,Finland,Keuruu,Finland,18.606779,2014-01-25,3312.006628
3630,357,FORD,FOCUS FFV,COMPACT,2.0,4.0,AM6,X,8.9,6.0,...,178,Lawless,Chaddie,Saari,Finland,Piippola,Finland,570.945463,2014-10-26,101628.292466
4169,357,FORD,FOCUS FFV,COMPACT,2.0,4.0,AM6,X,8.9,6.0,...,178,Whoolehan,Teodor,Eno,Finland,Nakkila,Finland,347.750794,2014-11-03,61899.641367


In [69]:
df_drivers_db = pd.read_sql_query("SELECT * FROM drivers;", engine)

In [76]:
df_car_driver_log.columns

Index(['car_id', 'brand', 'model', 'vehicle_class', 'engine_size_l',
       'cylinders', 'transmission', 'fuel_type',
       'fuel_consumption_l_per_hundred_km', 'hwy_l_per_hundred_km',
       'comb_l_per_hundred', 'comb_mpg', 'co2_emission_g_per_km', 'name',
       'first_name', 'start_city', 'start_country', 'target_city',
       'target_country', 'distance_km', 'date', 'total_emission', 'driver_id'],
      dtype='object')

In [78]:
df_car_driver_log_clean = df_car_log.merge(df_drivers_db, how='left', on=['name', 'first_name'])[['car_id', 'driver_id', 'start_city', 'start_country', 'target_city',
       'target_country', 'distance_km', 'date', 'total_emission']]

In [80]:
df_car_driver_log_clean[pd.isnull(df_car_driver_log_clean['driver_id'])]

Unnamed: 0,car_id,driver_id,start_city,start_country,target_city,target_country,distance_km,date,total_emission


In [172]:
df_car_driver_log_clean

Unnamed: 0,car_id,driver_id,start_city,start_country,target_city,target_country,distance_km,date,total_emission
0,379,815,Kihniö,Finland,Haukivuori,Finland,232.235123,2014-05-16,70367.242330
1,746,156,Liminka,Finland,Karkkila,Finland,675.654591,2014-07-14,135806.572737
2,942,866,Luvia,Finland,Mellilä,Finland,319.300665,2014-08-14,83337.473579
3,843,600,Kinnula,Finland,Pöytyä,Finland,939.400418,2014-03-07,260213.915921
4,921,962,Valkeakoski,Finland,Särkisalo,Finland,746.029780,2014-08-28,208888.338486
...,...,...,...,...,...,...,...,...,...
4995,481,147,Askainen,Finland,Kempele,Finland,730.565888,2014-07-29,128579.596259
4996,423,619,Ruotsinpyhtää,Finland,Porvoo,Finland,761.012486,2014-08-20,231347.795821
4997,354,735,Suolahti,Finland,Lumijoki,Finland,488.798424,2014-02-16,144195.535106
4998,587,107,Kerava,Finland,Joroinen,Finland,124.987769,2014-04-14,25997.455974


In [173]:
df_car_driver_log_clean.to_sql('car_driver_log', engine, if_exists='append', index=False)

In [82]:
def connect_db():
    print('Connecting to db')
    connection_uri = "postgres+psycopg2://postgres:postgres@database:5432/planetly"
    engine = create_engine(connection_uri, pool_pre_ping=True)
    engine.connect()
    return engine

In [85]:
db_engine = connect_db()

Connecting to db


In [86]:
db_engine

Engine(postgres+psycopg2://postgres:***@database:5432/planetly)

In [27]:
drivers = 'data/drivers.csv'
vehicle_fuel_consumptions = 'data/vehicle_fuel_consumptions.csv'
drivers_logbook = 'data/incoming_data/drivers_logbook.csv'

In [16]:
def extract(drivers, vehicle_fuel_consumptions, drivers_logbook):
    print("extracting the data")
    df_drivers_raw = pd.read_csv(drivers)
    df_veh_cons_raw = pd.read_csv(vehicle_fuel_consumptions)
    #if os.path.isfile(drivers_logbook):
    df_drivers_logbook_raw = pd.read_csv(drivers_logbook)
        #shutil.move("data/incoming_data/drivers_logbook.csv", "data/used_data/drivers_logbook.csv")
    #else:
        #print('No new data!')
        #df_drivers_logbook_raw = pd.DataFrame(columns=['brand', 'model', 'engine_size_l', 'cylinders', 'fuel_type',
       #'transmission', 'name', 'first_name', 'start_city', 'start_country',
       #'target_city', 'target_country', 'distance_km', 'date'])
    return df_drivers_raw, df_veh_cons_raw, df_drivers_logbook_raw

In [170]:
df_drivers_raw, df_veh_cons_raw, df_drivers_logbook_raw = extract(drivers, vehicle_fuel_consumptions, drivers_logbook)

extracting the data


In [157]:
del df_drivers_logbook_raw

In [175]:
len(df_drivers_logbook_raw)

5000

In [177]:
df_drivers_logbook_raw.to_sql('drivers_logbook_raw', engine, if_exists='append', index=False)

In [179]:
df_drivers_logbook_raw.columns

Index(['brand', 'model', 'engine_size_l', 'cylinders', 'fuel_type',
       'transmission', 'name', 'first_name', 'start_city', 'start_country',
       'target_city', 'target_country', 'distance_km', 'date'],
      dtype='object')

In [8]:
drivers = 'data/drivers.csv'
vehicle_fuel_consumptions = 'data/vehicle_fuel_consumptions.csv'
drivers_logbook = 'data/incoming_data/drivers_logbook.csv'

In [2]:
def connect_db():
    connection_uri = "postgres+psycopg2://postgres:postgres@database:5432/planetly"
    engine = create_engine(connection_uri, pool_pre_ping=True)
    engine.connect()
    conn = psycopg2.connect(database="planetly", user='postgres',password='postgres', host='database', port='5432')
    conn.autocommit = True
    cursor = conn.cursor()
    return engine, cursor

In [3]:
def load(df, table_name, if_exists='append'): 
    db_engine, cursor = connect_db()
    df.to_sql(table_name, db_engine, if_exists=if_exists, index=False)
    print(f'loaded {table_name}')


In [4]:
def select_table_from_db(table):
    db_engine, cursor = connect_db()
    df = pd.read_sql_query(f"SELECT * from {table};", db_engine)
    return df

In [5]:
def create_tables():
    db_engine, cursor = connect_db()

    #Creating drivers table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS drivers(
        driver_id SERIAL PRIMARY KEY,
        name VARCHAR(50),
        first_name VARCHAR(50)
    );''')
    print('drivers done')

    #creating cars table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS cars(
        car_id SERIAL PRIMARY KEY,
        brand VARCHAR(50), 
        model VARCHAR(50), 
        vehicle_class VARCHAR(50), 
        engine_size_l FLOAT, 
        cylinders FLOAT, 
        transmission VARCHAR(50), 
        fuel_type VARCHAR(50),
        fuel_consumption_l_per_hundred_km FLOAT,
        hwy_l_per_hundred_km FLOAT,
        comb_l_per_hundred FLOAT,
        comb_mpg INT,
        co2_emission_g_per_km INT
    );''')
    print('car done')

    cursor.execute('''
    CREATE TABLE IF NOT EXISTS car_driver_log(
        car_id INT,
        driver_id INT,
        start_city VARCHAR(50),
        start_country VARCHAR(50),
        target_city VARCHAR(50),
        target_country VARCHAR(50),
        distance_km FLOAT,
        date DATE,
        total_emission FLOAT,
        CONSTRAINT fk__cars__car_id__car_driver_log__car_id
        FOREIGN KEY (car_id)
        REFERENCES cars(car_id)
        ON UPDATE CASCADE ON DELETE RESTRICT,
        CONSTRAINT fk__drivers__driver_id__car_driver_log__driver_id
        FOREIGN KEY (driver_id)
        REFERENCES drivers(driver_id)
        ON UPDATE CASCADE ON DELETE RESTRICT
    );''')   
    print('car_driver_log done')

In [35]:
def extract(drivers, vehicle_fuel_consumptions, drivers_logbook):
    print("extracting the data")
    df_drivers_raw = pd.read_csv(drivers)
    df_veh_cons_raw = pd.read_csv(vehicle_fuel_consumptions)
    df_drivers_logbook_raw = pd.read_csv(drivers_logbook)
    
    #it will check if the data is there or not, if the data is not there it will initiate an empty dataframe.
    #if os.path.isfile(drivers_logbook):
    #    df_drivers_logbook_raw = pd.read_csv(drivers_logbook)
    #    shutil.move("data/incoming_data/drivers_logbook.csv", "data/used_data/drivers_logbook.csv")
    #else:
    #    print('No new data!')
    #    empty dataframe
    #    df_drivers_logbook_raw = pd.DataFrame(columns=['brand', 'model', 'engine_size_l', 'cylinders', 'fuel_type',
    #   'transmission', 'name', 'first_name', 'start_city', 'start_country',
    #   'target_city', 'target_country', 'distance_km', 'date'])
    return df_drivers_raw, df_veh_cons_raw, df_drivers_logbook_raw

In [36]:
def transform_and_load():
    db_engine, cursor = connect_db()
    df_drivers_raw, df_veh_cons_raw, df_drivers_logbook_raw = extract(drivers, vehicle_fuel_consumptions, drivers_logbook)
    print('Fetched the data')
    #driver table transformations
    df_drivers_clean = df_drivers_raw[['name', 'first_name']].sort_values('name').reset_index(drop=True)
    #deduping the data
    df_drivers_clean = df_drivers_clean.drop_duplicates(subset=['name', 'first_name']).reset_index(drop=True)
    df_drivers_count = pd.read_sql_query("SELECT COUNT(driver_id) FROM drivers;", db_engine)
    drivers_count = df_drivers_count.iloc[0].values[0]
    if drivers_count == 0:
        load(df_drivers_clean, table_name='drivers')
    else:
        pass

    #cars table
    df_veh_cons_raw.rename(columns={'BRAND':'brand', 'MODEL':'model', 'VEHICLE CLASS':'vehicle_class', 'ENGINE SIZE L':'engine_size_l', 'CYLINDERS':'cylinders',
       'TRANSMISSION':'transmission', 'FUEL_TYPE':'fuel_type', 'FUEL CONSUMPTION (L/100 km)':'fuel_consumption_l_per_hundred_km',
       'HWY (L/100 km)':'hwy_l_per_hundred_km', 'COMB (L/100 km)':'comb_l_per_hundred', 'COMB (mpg)':'comb_mpg',
       'CO2_Emissions(g/km)':'co2_emission_g_per_km'}, inplace=True)
    df_veh_cons_raw.sort_values('brand', inplace=True)
    df_veh_cons_raw = df_veh_cons_raw.reset_index(drop=True)
    #deduping
    df_cars_clean = df_veh_cons_raw.drop_duplicates(subset=['brand', 'model','vehicle_class', 'engine_size_l', 'cylinders', 'transmission', 'fuel_type']).reset_index(drop=True)
    
    # this logic should only be there if the car and drivers data is static.
    df_cars_count = pd.read_sql_query("SELECT COUNT(car_id) FROM cars;", db_engine)
    cars_count = df_cars_count.iloc[0].values[0]
    if cars_count == 0:
        load(df_cars_clean, table_name='cars')
    else:
        pass
    
    load(df_drivers_logbook_raw, table_name='drivers_logbook_raw')

    df_cars_db = select_table_from_db(table="cars")
    df_drivers_db = select_table_from_db(table="drivers")
    
    # this if condition will check if we have received the new data or not.
    if len(df_drivers_logbook_raw) > 0:
        df_car_log = df_cars_db.merge(df_drivers_logbook_raw, how='right', on=['brand', 'model', 'engine_size_l', 'cylinders', 'fuel_type', 'transmission'])
        # adding a total_emission column
        values = df_car_log.distance_km * df_car_log.co2_emission_g_per_km
        df_car_log['total_emission'] = values
        df_car_driver_log_raw = df_car_log.merge(df_drivers_db, how='left', on=['name', 'first_name'])[['car_id', 'driver_id', 'start_city', 'start_country', 'target_city',
            'target_country', 'distance_km', 'date', 'total_emission']]
        df_cdl_count = pd.read_sql_query("SELECT COUNT(car_id) FROM car_driver_log;", db_engine)
        cdl_count = df_cdl_count.iloc[0].values[0]
        if cdl_count == 0:
            df_car_driver_log_clean = df_car_driver_log_raw.drop_duplicates(subset=['car_id', 'driver_id', 'start_city', 'start_country', 'target_country', 'target_city'])
            load(df_car_driver_log_clean, table_name='car_driver_log')
        else:
            df_car_driver_log_db = select_table_from_db(table='car_driver_log')
            df_car_driver_log_merge = df_car_driver_log_raw.merge(df_car_driver_log_db, on=['car_id', 'driver_id', \
            'start_city', 'start_country', 'target_country', 'target_city'], how='left', indicator=True)
            df_car_driver_log_clean = df_car_driver_log_merge[df_car_driver_log_merge['_merge'] == 'left_only'].drop(['distance_km_y', 'date_y', \
            'total_emission_y', '_merge'], axis=1).rename(columns={'distance_km_x':'distance_km', 'date_x':'date', 'total_emission_x':'total_emission'})
            df_car_driver_log_clean['date']= pd.to_datetime(df_car_driver_log_clean['date']) # making sure the datatype of date column
            load(df_car_driver_log_clean, 'car_driver_log')
            print(df_car_driver_log_clean)
    else:
        print('No new data!')

In [34]:
create_tables()

drivers done
car done
car_driver_log done


In [38]:
transform_and_load()

extracting the data
Fetched the data
loaded drivers_logbook_raw
loaded car_driver_log
Empty DataFrame
Columns: [car_id, driver_id, start_city, start_country, target_city, target_country, distance_km, date, total_emission]
Index: []
