# **ETL Pipeline (SQL to Python)**

The purpose of this jupyter notebook is to load data from SQL into python.

In [1]:
!pip install sqlalchemy
!pip install dask
!pip install pyodbc



In [2]:
import os
import pandas as pd
import dask.dataframe as dd
from sqlalchemy import create_engine

from warnings import filterwarnings 
filterwarnings('ignore')

## **Extraction**

Load the data from the senor, driver and safety tables from MSSQL into a Pandas Dataframe.

In [4]:
%%time

# set to your own desktop name
PC_name = os.environ['COMPUTERNAME']

# connect to database in mssql
server = f"{PC_name}\SQLEXPRESS" # SQL Server Name
database = "PAI_CA1" # database name
con_string = f'mssql+pyodbc://{server}/{database}?driver=SQL Server'
engine = create_engine(con_string)

# retrieve data from the database
connection = engine.connect()

# driver data
driver = connection.execute('SELECT * FROM Tempdriver')
driver_data = pd.DataFrame(data=driver.fetchall(), columns=driver.keys())

# trip data
safety = connection.execute('SELECT * FROM Tempsafety')
safety_data = pd.DataFrame(data=safety.fetchall(), columns=safety.keys())

connection.close() # close connection explicitly

CPU times: total: 31.2 ms
Wall time: 75 ms


We load the driver and safety data first as compared to the sensor data, it isn't a lot.

In [8]:
%%time
# get sensor data by chunksize

connection = engine.connect().execution_options(stream_results=True)
sensor_data_generator = pd.read_sql_query('SELECT * FROM TempSensor', con_string, chunksize=10**5)
sensor_data = pd.concat([chunk for chunk in sensor_data_generator])

CPU times: total: 19.7 s
Wall time: 1min 30s


We load the sensor data in chunksize as it's much larger compared to the driver and safety data. By using (.execution_options(stream_results=True)), we can load the data in chunks of 100,000 rows making the process mmore efficient (instead of loading it in all at once).

In [9]:
driver_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500 entries, 0 to 499
Data columns (total 8 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   driver_id       500 non-null    int64  
 1   driver_name     500 non-null    object 
 2   date_of_birth   500 non-null    object 
 3   years_of_exp    500 non-null    int64  
 4   gender          500 non-null    object 
 5   car_brand       500 non-null    object 
 6   car_model_year  500 non-null    object 
 7   driver_rating   500 non-null    float64
dtypes: float64(1), int64(2), object(5)
memory usage: 31.4+ KB


In [10]:
safety_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20000 entries, 0 to 19999
Data columns (total 3 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   bookingID  20000 non-null  object
 1   driver_id  20000 non-null  int64 
 2   label      20000 non-null  int64 
dtypes: int64(2), object(1)
memory usage: 468.9+ KB


In [11]:
sensor_data.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 7469656 entries, 0 to 69655
Data columns (total 11 columns):
 #   Column          Dtype  
---  ------          -----  
 0   bookingID       object 
 1   Accuracy        float64
 2   Bearing         float64
 3   acceleration_x  float64
 4   acceleration_y  float64
 5   acceleration_z  float64
 6   gyro_x          float64
 7   gyro_y          float64
 8   gyro_z          float64
 9   second          float64
 10  speed           float64
dtypes: float64(10), object(1)
memory usage: 683.9+ MB


## **Transform and merge the data**

### **Remove Duplicated Rows**

In [12]:
driver_data = driver_data.drop_duplicates()
trip_data = safety_data.drop_duplicates()
sensor_data = sensor_data.drop_duplicates()

### **Merge Dataframes**

In [14]:
%%time

# merge driver and safety data
driver_safety = safety_data.merge(driver_data, on='driver_id', how='left')

# merge driver_trips and sensor data
driver_safety_sensor = sensor_data.merge(driver_safety, on='bookingID', how='left')

CPU times: total: 1.3 s
Wall time: 2.43 s


In [15]:
driver_safety_sensor.head()


Unnamed: 0,bookingID,Accuracy,Bearing,acceleration_x,acceleration_y,acceleration_z,gyro_x,gyro_y,gyro_z,second,speed,driver_id,label,driver_name,date_of_birth,years_of_exp,gender,car_brand,car_model_year,driver_rating
0,206158430281,8.53,0.0,0.154,10.08,-1.207,0.0616,-0.0387,-0.0972,649.0,0.0,84,0,Juliane Brimm,1980-07-23,15,Female,BMW,2008,3.1
1,558345748569,3.9,16.0,1.82,9.886,-0.63,-0.0803,-0.0817,0.0548,810.0,6.32,129,0,Blakeley Skerratt,1972-10-17,5,Female,Audi,1993,4.6
2,541165879336,3.9,135.0,0.742,9.37,3.033,0.0015,0.0655,0.0209,207.0,7.11,455,0,Rafael Oxenbury,1974-09-03,19,Male,Lexus,2006,4.3
3,1486058684421,52.0,107.83,-2.166,-7.768,2.488,-0.1458,,0.0055,804.0,17.23,478,0,Nellie Tomasino,1973-01-04,20,Female,Toyota,2012,4.2
4,627065225376,3.13,88.0,0.898,9.39,1.959,0.0164,0.1021,0.0158,321.0,1.86,213,0,Camille Muldownie,1974-12-26,17,Female,Hyundai,2009,4.4


In [17]:
driver_safety_sensor.info(show_counts=True)


<class 'pandas.core.frame.DataFrame'>
Int64Index: 7469655 entries, 0 to 7469654
Data columns (total 20 columns):
 #   Column          Non-Null Count    Dtype  
---  ------          --------------    -----  
 0   bookingID       7469655 non-null  object 
 1   Accuracy        7326243 non-null  float64
 2   Bearing         7277836 non-null  float64
 3   acceleration_x  7271855 non-null  float64
 4   acceleration_y  7255719 non-null  float64
 5   acceleration_z  7388976 non-null  float64
 6   gyro_x          7310107 non-null  float64
 7   gyro_y          7330416 non-null  float64
 8   gyro_z          7299953 non-null  float64
 9   second          7346552 non-null  float64
 10  speed           7356706 non-null  float64
 11  driver_id       7469655 non-null  int64  
 12  label           7469655 non-null  int64  
 13  driver_name     7469655 non-null  object 
 14  date_of_birth   7469655 non-null  object 
 15  years_of_exp    7469655 non-null  int64  
 16  gender          7469655 non-null  ob

## **Exporting the Final Dataframe**

In [19]:
taxi_data = driver_safety_sensor[['bookingID', 'Accuracy', 'Bearing', 'acceleration_x',
                                  'acceleration_y', 'acceleration_z', 'gyro_x', 'gyro_y',
                                  'gyro_z', 'second', 'speed', 'driver_id',
                                  'label', 'driver_name', 'date_of_birth', 'years_of_exp', 'gender', 'car_brand',
                                  'car_model_year', 'driver_rating']]
taxi_data.info(show_counts=True)

<class 'pandas.core.frame.DataFrame'>
Int64Index: 7469655 entries, 0 to 7469654
Data columns (total 20 columns):
 #   Column          Non-Null Count    Dtype  
---  ------          --------------    -----  
 0   bookingID       7469655 non-null  object 
 1   Accuracy        7326243 non-null  float64
 2   Bearing         7277836 non-null  float64
 3   acceleration_x  7271855 non-null  float64
 4   acceleration_y  7255719 non-null  float64
 5   acceleration_z  7388976 non-null  float64
 6   gyro_x          7310107 non-null  float64
 7   gyro_y          7330416 non-null  float64
 8   gyro_z          7299953 non-null  float64
 9   second          7346552 non-null  float64
 10  speed           7356706 non-null  float64
 11  driver_id       7469655 non-null  int64  
 12  label           7469655 non-null  int64  
 13  driver_name     7469655 non-null  object 
 14  date_of_birth   7469655 non-null  object 
 15  years_of_exp    7469655 non-null  int64  
 16  gender          7469655 non-null  ob

In [22]:
%%time
# if directory does not exist, create it
if not os.path.exists('../Datasets/cleaned/'):
    os.makedirs('../Datasets/cleaned/')

# save data to csv
taxi_data.to_csv('../Datasets/cleaned/taxi_data.csv', index=False)

CPU times: total: 16.1 s
Wall time: 46.9 s
