# Question 1. Knowing docker tags

Answer 1: `-rm`

# Question 2. Understanding docker first run

Answer 2: `0.42.0`

# Loading Yellow Trip Data for January 2021 into PostgreSQL Database Server

## Import libraries

In [2]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from sqlalchemy import create_engine
from time import time

## Establish Database Connection Engine using SQLAlchemy

In [3]:
## database-engine://user:pass@hostname:port/database
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x1e1ef3a40b0>

## Ingest January 2021 Yellow Trip Data in Parquet format using pyarrow RecordBatch for Chunking

In [24]:
df = pd.read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet')
df.head()


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,


In [5]:
table_ytd = pq.ParquetFile(('.\\datasets\\yellow_tripdata_2021-01.parquet')).read()

num_rows = table_ytd.num_rows
offset = 0

while num_rows > 0:
    length = min(100_000, num_rows)

    start_time = time()
    table_ytd.slice(offset=offset, length=length)\
        .to_pandas()\
        .to_sql('yellow-taxi-data', con=engine, if_exists='append', index=False, method='multi')
    end_time = time()

    print(f'Inserted {length:,} rows in {(end_time - start_time):.2f} seconds \
          \t\t[{(offset+length)/table_ytd.num_rows:.2%}]({offset+1:,}-{offset+length:,})')
    num_rows -= length
    offset += length

print("Writing to database complete.")

Inserted 100,000 in 12.64 seconds [7.30%]		(1-100,000)
Inserted 100,000 in 11.35 seconds [14.60%]		(100,001-200,000)
Inserted 100,000 in 11.34 seconds [21.90%]		(200,001-300,000)
Inserted 100,000 in 13.23 seconds [29.20%]		(300,001-400,000)
Inserted 100,000 in 11.76 seconds [36.50%]		(400,001-500,000)
Inserted 100,000 in 12.63 seconds [43.80%]		(500,001-600,000)
Inserted 100,000 in 11.11 seconds [51.10%]		(600,001-700,000)
Inserted 100,000 in 15.14 seconds [58.40%]		(700,001-800,000)
Inserted 100,000 in 15.73 seconds [65.70%]		(800,001-900,000)
Inserted 100,000 in 12.10 seconds [73.01%]		(900,001-1,000,000)
Inserted 100,000 in 14.55 seconds [80.31%]		(1,000,001-1,100,000)
Inserted 100,000 in 14.25 seconds [87.61%]		(1,100,001-1,200,000)
Inserted 100,000 in 12.99 seconds [94.91%]		(1,200,001-1,300,000)
Inserted 69,769 in 9.53 seconds [100.00%]		(1,300,001-1,369,769)
Writing to database complete.


## Review schema of January 2021 Yellow Trip Data

In [6]:
schema = pd.io.sql.get_schema(table_ytd.to_pandas(), name='yellow_taxi_data')
print(schema)

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "airport_fee" REAL
)


###### **Note:** As can be observed above, data types are already correct. Hence, data transformation is not required for the purposes of this data ingestion process.

# Question 3-6

## Ingest September 2019 Green Trip Data to PostgreSQL Server

In [12]:
df_gtd = pd.read_csv('.\\datasets\\green_tripdata_2019-09.csv', iterator=True, chunksize=100000)

for df in df_gtd:
    start_time = time()
    
    df['lpep_pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])
    df['lpep_dropoff_datetime'] = pd.to_datetime(df['lpep_dropoff_datetime'])

    df.to_sql('green-taxi-data', con=engine, if_exists='append')

    total_time = time() - start_time

    print(f'{df.shape[0]:,} row(s) have been ingested into green-taxi-data table. Process took {total_time:.2f} seconds.')
    
print('Ingestion completed.')

100,000 row(s) have been ingested into green-taxi-data table. Process took 16.01 seconds.
100,000 row(s) have been ingested into green-taxi-data table. Process took 15.43 seconds.
100,000 row(s) have been ingested into green-taxi-data table. Process took 16.79 seconds.


  for df in df_gtd:


100,000 row(s) have been ingested into green-taxi-data table. Process took 15.52 seconds.
49,063 row(s) have been ingested into green-taxi-data table. Process took 5.87 seconds.
Ingestion completed.


In [14]:
df_gtd = pd.read_csv('.\\datasets\\green_tripdata_2019-09.csv')
schema = pd.io.sql.get_schema(df_gtd, name='green_taxi_data')
print(schema)

CREATE TABLE "green_taxi_data" (
"VendorID" REAL,
  "lpep_pickup_datetime" TEXT,
  "lpep_dropoff_datetime" TEXT,
  "store_and_fwd_flag" TEXT,
  "RatecodeID" REAL,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "ehail_fee" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "payment_type" REAL,
  "trip_type" REAL,
  "congestion_surcharge" REAL
)


  df_gtd = pd.read_csv('.\\datasets\\green_tripdata_2019-09.csv')


## Ingest Taxi Zone data

In [10]:
taxi_zone = pd.read_csv('.\\datasets\\taxi+_zone_lookup.csv')
taxi_zone.to_sql(name='taxi-zone', con=engine, if_exists='replace', index=False, method='multi')

print(f'Dataset successfully ingested.')

Dataset successfully ingested.


## Question # 3

How many taxi trips were totally made on September 18th 2019?



In [18]:
query1 = '''
  SELECT COUNT(*) as "Total no. of taxi trips"
  FROM "green-taxi-data"
  WHERE lpep_pickup_datetime::DATE = '2019-09-18'
    AND lpep_dropoff_datetime::DATE = '2019-09-18'
'''

df_gtdcount = pd.read_sql_query(query1, con=engine)
df_gtdcount

Unnamed: 0,Total no. of taxi trips
0,15612


## Question # 4

Which was the pick up day with the largest trip distance? Use the pick up time for your calculations.

In [19]:
query2 = '''
  SELECT lpep_pickup_datetime::DATE as "Pickup day with the largest trip distance"
  FROM "green-taxi-data"
  WHERE trip_distance = (SELECT MAX(trip_distance) FROM "green-taxi-data")
'''

df_ltd = pd.read_sql_query(query2, con=engine)
df_ltd

Unnamed: 0,Pickup day with the largest trip distance
0,2019-09-26


## Question # 5

Consider lpep_pickup_datetime in '2019-09-18' and ignoring Borough has Unknown

Which were the 3 pick up Boroughs that had a sum of total_amount superior to 50000?



In [20]:
query3 = '''
  SELECT *
  FROM "taxi-zone"
'''
query4 = '''
  SELECT *
  FROM "green-taxi-data"
  WHERE lpep_pickup_datetime::DATE = '2019-09-18'
'''

df_taxi_zone = pd.read_sql_query(query3, con=engine)
borough_df = pd.read_sql_query(query4, con=engine)

df_borough50k = borough_df\
  .merge(df_taxi_zone, left_on='PULocationID', right_on='LocationID', how='left')\
  .loc[:, ['lpep_pickup_datetime', 'Borough', 'total_amount']]\
  .loc[lambda x: x['Borough'] != 'Unknown']\
  .pivot_table(values='total_amount', index='Borough', aggfunc='sum')\
  .reset_index()\
  .loc[lambda x: x['total_amount'] > 50000]\
  .sort_values(by='total_amount', ascending=False)\
  .style.format({'total_amount': '{:,.2f}'})

df_borough50k

Unnamed: 0,Borough,total_amount
1,Brooklyn,96333.24
2,Manhattan,92271.3
3,Queens,78671.71


## Question # 6: For the passengers picked up in September 2019 in the zone name Astoria, which was the drop off zone that had the largest tip? We want the name of the zone, not the id.

In [21]:
# Get zone data from postgresql database
query6 = '''
      SELECT "LocationID", "Zone" 
      FROM "taxi-zone"
'''
df_zone = pd.read_sql(query6, con=engine)


# Get tip amount data per pickup & dropoff location of green-taxi-data where pickup date is Sept. 2019 from postgresql database
query7 = '''
      SELECT "PULocationID","DOLocationID","tip_amount"
      FROM "green-taxi-data" AS gtd
      WHERE EXTRACT(YEAR FROM gtd."lpep_pickup_datetime") = 2019 AND
            EXTRACT(MONTH FROM gtd."lpep_pickup_datetime") = 9
'''
df_gtd = pd.read_sql(query7, con=engine)


# Join and transform selected zone and green taxi data 
df_largest_tip = df_gtd.merge(df_zone, how='left', left_on='PULocationID', right_on='LocationID')\
        .rename(columns={'Zone':'Pickup Zone'})\
        .drop(columns=['LocationID'])\
        .merge(df_zone, how='left', left_on='DOLocationID', right_on='LocationID')\
        .rename(columns={'Zone':'Dropoff Zone'})\
        .drop(columns=['LocationID','PULocationID','DOLocationID'])\
        .sort_index(axis=1)\
        .loc[lambda x: x['Pickup Zone'] == 'Astoria']\
        .nlargest(4, 'tip_amount')

# Dropoff Zone with the largest tip last September 2019 where the pickup zone is Astoria is 'JFK Airport'
df_largest_tip.nlargest(4,'tip_amount')

Unnamed: 0,Dropoff Zone,Pickup Zone,tip_amount
97703,JFK Airport,Astoria,62.31
181824,Woodside,Astoria,30.0
315059,Kips Bay,Astoria,28.0
21000,NV,Astoria,25.0


In [22]:
# Alternative solution: SQL query and transformation only 
query5 = '''
WITH zone AS (
  SELECT  "LocationID",
          "Zone" AS "Dropoff Zone",
          "Zone" AS "Pickup Zone"
  FROM "taxi-zone"
), 
  gtd AS (
SELECT "DOLocationID",
       "PULocationID",
       "lpep_pickup_datetime",
       "tip_amount"
FROM "green-taxi-data")

SELECT zone_a."Dropoff Zone",
       zone_b."Pickup Zone",
       gtd."tip_amount",
       DENSE_RANK() OVER(ORDER BY "tip_amount" DESC) as "Rank"
FROM gtd
JOIN zone as zone_a
  ON gtd."DOLocationID" = zone_a."LocationID"
JOIN zone as zone_b
  ON gtd."PULocationID" = zone_b."LocationID"
WHERE (EXTRACT(YEAR FROM gtd."lpep_pickup_datetime") = 2019 AND
       EXTRACT(MONTH FROM gtd."lpep_pickup_datetime") = 9) AND
       zone_b."Pickup Zone" = 'Astoria'
'''
df_lta = pd.read_sql_query(query5, con=engine)
df_lta.head(4)

Unnamed: 0,Dropoff Zone,Pickup Zone,tip_amount,Rank
0,JFK Airport,Astoria,62.31,1
1,Woodside,Astoria,30.0,2
2,Kips Bay,Astoria,28.0,3
3,NV,Astoria,25.0,4


In [None]:
# Below is the encapsulated data ingestion process (30% slower) 

# import pandas as pd
# import pyarrow as pa

# def ingest_dataframe_by_chunk(df: pd.core.frame.DataFrame, chunksize: int = 100_000, offset: int = 0):
#     print('Initiating data ingestion...\n')
#     table = pa.RecordBatch.from_pandas(df)
    
#     num_rows = table.num_rows
#     while num_rows > 0:
#         length = min(100_000, num_rows)
#         start_time = time()
#         table.slice(offset=offset, length=length)\
#             .to_pandas()\
#             .to_sql('green-taxi-data', con=engine, if_exists='append', index=False, method='multi')
#         end_time = time()

#         print(f'Inserted {length:,} rows: #{offset+1:,} - #{offset+length:,} in {(end_time - start_time):.2f} seconds')
#         num_rows -= length
#         offset += length

#     print("Writing to database complete.")
    
# ingest_dataframe_by_chunk(df, 100_000, 0)