## Trips (data.parquet) ETL

Connect to db

In [1]:
from sqlalchemy import create_engine
source = create_engine('mysql+mysqlconnector://test:test123@192.168.99.100:3306/test')

Load parquet into dataframe

In [2]:
import pandas as pd
df = pd.read_parquet('data.parquet')
df

Unnamed: 0,vendor_id,pickup_at,dropoff_at,passenger_count,trip_distance,rate_code_id,store_and_fwd_flag,pickup_location_id,dropoff_location_id,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2019-06-01 00:55:13,2019-06-01 00:56:17,1,0.00,1,N,145,145,2,3.0,0.5,0.5,0.00,0.0,0.3,4.300000,0.0
1,1,2019-06-01 00:06:31,2019-06-01 00:06:52,1,0.00,1,N,262,263,2,2.5,3.0,0.5,0.00,0.0,0.3,6.300000,2.5
2,1,2019-06-01 00:17:05,2019-06-01 00:36:38,1,4.40,1,N,74,7,2,17.5,0.5,0.5,0.00,0.0,0.3,18.799999,0.0
3,1,2019-06-01 00:59:02,2019-06-01 00:59:12,0,0.80,1,N,145,145,2,2.5,1.0,0.5,0.00,0.0,0.3,4.300000,0.0
4,1,2019-06-01 00:03:25,2019-06-01 00:15:42,1,1.70,1,N,113,148,1,9.5,3.0,0.5,2.65,0.0,0.3,15.950000,2.5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6941019,1,2019-06-30 23:23:03,2019-06-30 23:39:48,1,0.90,1,N,68,158,1,11.0,3.0,0.5,2.00,0.0,0.3,16.799999,2.5
6941020,1,2019-06-30 23:50:22,2019-06-30 23:57:01,1,0.50,1,N,246,90,2,6.0,3.0,0.5,0.00,0.0,0.3,9.800000,2.5
6941021,1,2019-06-30 23:58:32,2019-07-01 00:00:42,1,0.20,1,N,90,186,1,3.5,3.0,0.5,1.45,0.0,0.3,8.750000,2.5
6941022,2,2019-06-30 23:23:10,2019-06-30 23:30:45,1,1.38,1,N,140,163,1,7.5,0.5,0.5,2.26,0.0,0.3,13.560000,2.5


Check NaN values and types

In [3]:
df.isna().all()

vendor_id                False
pickup_at                False
dropoff_at               False
passenger_count          False
trip_distance            False
rate_code_id             False
store_and_fwd_flag       False
pickup_location_id       False
dropoff_location_id      False
payment_type             False
fare_amount              False
extra                    False
mta_tax                  False
tip_amount               False
tolls_amount             False
improvement_surcharge    False
total_amount             False
congestion_surcharge     False
dtype: bool

In [4]:
df.dtypes

vendor_id                        object
pickup_at                datetime64[ns]
dropoff_at               datetime64[ns]
passenger_count                    int8
trip_distance                   float32
rate_code_id                     object
store_and_fwd_flag               object
pickup_location_id                int32
dropoff_location_id               int32
payment_type                     object
fare_amount                     float32
extra                           float32
mta_tax                         float32
tip_amount                      float32
tolls_amount                    float32
improvement_surcharge           float32
total_amount                    float32
congestion_surcharge            float32
dtype: object

Write dataframe to sql database (test), table (trips)
PS: I'll load only 100 rows of the dataframe

In [5]:
data = df.iloc[0:100,:]
data.to_sql('trips', con=source, schema='test', if_exists='replace', index=False)

Check what's inside the table of the db

In [6]:
pd.read_sql_query('SELECT * FROM trips', con=source)

Unnamed: 0,vendor_id,pickup_at,dropoff_at,passenger_count,trip_distance,rate_code_id,store_and_fwd_flag,pickup_location_id,dropoff_location_id,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2019-06-01 00:55:13,2019-06-01 00:56:17,1,0.00,1,N,145,145,2,3.0,0.5,0.5,0.00,0.0,0.3,4.30,0.0
1,1,2019-06-01 00:06:31,2019-06-01 00:06:52,1,0.00,1,N,262,263,2,2.5,3.0,0.5,0.00,0.0,0.3,6.30,2.5
2,1,2019-06-01 00:17:05,2019-06-01 00:36:38,1,4.40,1,N,74,7,2,17.5,0.5,0.5,0.00,0.0,0.3,18.80,0.0
3,1,2019-06-01 00:59:02,2019-06-01 00:59:12,0,0.80,1,N,145,145,2,2.5,1.0,0.5,0.00,0.0,0.3,4.30,0.0
4,1,2019-06-01 00:03:25,2019-06-01 00:15:42,1,1.70,1,N,113,148,1,9.5,3.0,0.5,2.65,0.0,0.3,15.95,2.5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,2,2019-06-01 00:11:49,2019-06-01 00:16:33,1,0.85,1,N,234,162,1,5.0,0.5,0.5,2.00,0.0,0.3,10.80,2.5
96,2,2019-06-01 00:55:18,2019-06-01 01:01:25,1,0.85,1,N,233,229,1,5.5,0.5,0.5,1.86,0.0,0.3,11.16,2.5
97,1,2019-06-01 00:34:58,2019-06-01 00:53:17,1,5.10,1,N,97,186,2,18.0,3.0,0.5,0.00,0.0,0.3,21.80,2.5
98,1,2019-06-01 00:54:49,2019-06-01 00:58:49,2,0.80,1,N,164,234,1,5.0,3.0,0.5,1.75,0.0,0.3,10.55,2.5


Write DDL's to data.sql file

In [7]:
create_table = pd.io.sql.get_schema(df.reset_index(), 'test.trips', con=source)
with open('../sql/data.sql', 'w+') as file:
    file.write(create_table)
    file.close()

In [9]:
def sql_insert():
    sql_texts = 'INSERT INTO test.trips (`'+ str('`, `'.join(df.columns))+ '`)\nVALUES '
    for index, row in data.iterrows():   
        if index == len(data) - 1:
            sql_texts += str(tuple(row.values)) + ';' 
        else:
            sql_texts += str(tuple(row.values)) + ',\n'
    return sql_texts

with open('../sql/data.sql', 'a+') as file:
    file.write(sql_insert())
    file.close()
