In [1]:
import pandas as pd

In [2]:
!pwd

/home/bri/zoomcamp/data_engineering_zoomcamp_workshop/01-docker-terraform/pipeline


In [3]:
!uv pip list

Package                   Version
------------------------- -----------
anyio                     4.12.1
argon2-cffi               25.1.0
argon2-cffi-bindings      25.1.0
arrow                     1.4.0
asttokens                 3.0.1
async-lru                 2.1.0
attrs                     25.4.0
babel                     2.17.0
beautifulsoup4            4.14.3
bleach                    6.3.0
certifi                   2026.1.4
cffi                      2.0.0
charset-normalizer        3.4.4
cli-helpers               2.8.2
click                     8.1.7
comm                      0.2.3
configobj                 5.0.9
debugpy                   1.8.19
decorator                 5.2.1
defusedxml                0.7.1
executing                 2.2.1
fastjsonschema            2.21.2
fqdn                      1.5.1
greenlet                  3.3.1
h11                       0.16.0
httpcore                  1.0.9
httpx                     0.28.1
idna                      3.11
ipykernel           

In [4]:
url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-11.parquet'

In [5]:
try:
    df_trip = pd.read_parquet(url)
    print(df_trip.head())
except ImportError as e:
    print(f"Error: {e}")
    print("Please install pyarrow or fastparquet to read parquet files.")
except Exception as e:
    print(f"An error occurred: {e}")

   VendorID lpep_pickup_datetime lpep_dropoff_datetime store_and_fwd_flag  \
0         2  2025-11-01 00:34:48   2025-11-01 00:41:39                  N   
1         2  2025-11-01 00:18:52   2025-11-01 00:24:27                  N   
2         2  2025-11-01 01:03:14   2025-11-01 01:15:24                  N   
3         2  2025-11-01 00:10:57   2025-11-01 00:24:53                  N   
4         1  2025-11-01 00:03:48   2025-11-01 00:19:38                  N   

   RatecodeID  PULocationID  DOLocationID  passenger_count  trip_distance  \
0         1.0            74            42              1.0           0.74   
1         1.0            74            42              2.0           0.95   
2         1.0            83           160              1.0           2.19   
3         1.0           166           127              1.0           5.44   
4         1.0           166           262              1.0           3.20   

   fare_amount  ...  mta_tax  tip_amount  tolls_amount  ehail_fee  \
0    

In [6]:
df_trip.info()

<class 'pandas.DataFrame'>
RangeIndex: 46912 entries, 0 to 46911
Data columns (total 21 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   VendorID               46912 non-null  int32         
 1   lpep_pickup_datetime   46912 non-null  datetime64[us]
 2   lpep_dropoff_datetime  46912 non-null  datetime64[us]
 3   store_and_fwd_flag     41343 non-null  str           
 4   RatecodeID             41343 non-null  float64       
 5   PULocationID           46912 non-null  int32         
 6   DOLocationID           46912 non-null  int32         
 7   passenger_count        41343 non-null  float64       
 8   trip_distance          46912 non-null  float64       
 9   fare_amount            46912 non-null  float64       
 10  extra                  46912 non-null  float64       
 11  mta_tax                46912 non-null  float64       
 12  tip_amount             46912 non-null  float64       
 13  tolls_amount

In [7]:
url_zones = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv'

In [8]:
try:
    df_zones = pd.read_csv(url_zones)
    print(df_zones.head())
except ImportError as e:
    print(f"Error: {e}")
    print("Please install pyarrow or fastparquet to read parquet files.")
except Exception as e:
    print(f"An error occurred: {e}")

   LocationID        Borough                     Zone service_zone
0           1            EWR           Newark Airport          EWR
1           2         Queens              Jamaica Bay    Boro Zone
2           3          Bronx  Allerton/Pelham Gardens    Boro Zone
3           4      Manhattan            Alphabet City  Yellow Zone
4           5  Staten Island            Arden Heights    Boro Zone


In [9]:
df_zones.info()

<class 'pandas.DataFrame'>
RangeIndex: 265 entries, 0 to 264
Data columns (total 4 columns):
 #   Column        Non-Null Count  Dtype
---  ------        --------------  -----
 0   LocationID    265 non-null    int64
 1   Borough       265 non-null    str  
 2   Zone          264 non-null    str  
 3   service_zone  263 non-null    str  
dtypes: int64(1), str(3)
memory usage: 16.9 KB


In [10]:
for col in df_zones.columns:
    print(f"--- Column: {col} ---")
    print(df_zones[col].value_counts())
    print("\n")

--- Column: LocationID ---
LocationID
1      1
2      1
3      1
4      1
5      1
      ..
261    1
262    1
263    1
264    1
265    1
Name: count, Length: 265, dtype: int64


--- Column: Borough ---
Borough
Queens           69
Manhattan        69
Brooklyn         61
Bronx            43
Staten Island    20
Unknown           2
EWR               1
Name: count, dtype: int64


--- Column: Zone ---
Zone
Governor's Island/Ellis Island/Liberty Island    3
Corona                                           2
Newark Airport                                   1
Jamaica Bay                                      1
Allerton/Pelham Gardens                          1
                                                ..
Woodside                                         1
World Trade Center                               1
Yorkville East                                   1
Yorkville West                                   1
NV                                               1
Name: count, Length: 261, dtype: in

In [11]:
def multi_data_check_with_names(datasets, dataset_names):
    """
    Function to display missing values, percentage of missing values, 
    duplicated data, total number of rows, and dtype of each column 
    for multiple datasets.
    
    :param datasets: A list of pandas dataframes to check.
    :param dataset_names: A list of dataset names corresponding to each dataframe.
    :return: A summary dataframe with the details for all datasets.
    """
    
    summary_list = []  # To store details for each dataset
    
    for i, data in enumerate(datasets):
        # Get the name of the dataset from the passed dataset_names list
        dataset_name = dataset_names[i] if i < len(dataset_names) else f'Dataset_{i+1}'
        
        # For each dataset, we compute the necessary details
        null_counts = data.isnull().sum()
        null_percentage = (null_counts / len(data)) * 100
        dup_count = data.duplicated().sum()
        dtypes = data.dtypes
        total_rows = len(data)  # Total number of rows
        
        # Create a temporary DataFrame to store the results for this dataset
        temp_df = pd.DataFrame({
            'Dataset': dataset_name,
            'Column': data.columns,
            'Data Type': dtypes.values,
            'Missing Values': null_counts.values,
            'Percentage Missing': null_percentage.values,
            'Total Rows': [total_rows] * len(data.columns),
            'Duplicate Rows': [dup_count] * len(data.columns)  # same for all columns
                 # same for all columns
            
        })
        
        # Append the result to the summary list
        summary_list.append(temp_df)
    
    # Concatenate all summaries into one DataFrame
    final_summary = pd.concat(summary_list, ignore_index=True)
    
    return final_summary

In [12]:
dataset = [df_trip, df_zones]
names = ['df_trip', 'df_zones']

In [13]:
multi_data_check_with_names(dataset, names)

Unnamed: 0,Dataset,Column,Data Type,Missing Values,Percentage Missing,Total Rows,Duplicate Rows
0,df_trip,VendorID,int32,0,0.0,46912,0
1,df_trip,lpep_pickup_datetime,datetime64[us],0,0.0,46912,0
2,df_trip,lpep_dropoff_datetime,datetime64[us],0,0.0,46912,0
3,df_trip,store_and_fwd_flag,str,5569,11.871163,46912,0
4,df_trip,RatecodeID,float64,5569,11.871163,46912,0
5,df_trip,PULocationID,int32,0,0.0,46912,0
6,df_trip,DOLocationID,int32,0,0.0,46912,0
7,df_trip,passenger_count,float64,5569,11.871163,46912,0
8,df_trip,trip_distance,float64,0,0.0,46912,0
9,df_trip,fare_amount,float64,0,0.0,46912,0


## Insights from multiple data check

1. 'ehail_fee' column in df_trip has 100% missing data. After consulting with software engineers and business people, it is a valid feature for green taxi after court decision in 2013. So we keep it as is.
2. Meanwhile, these 6 columns in df_trip that have 11% to 12% missing values are to be kept as they are. We can analyze further if there are patterns emerge from those missing values : Whether specific areas have higher missing values. This can start a business process investigation.
   - store_and_fwd_flag
   - RatecodeID
   - passenger_count
   - payment_type
   - trip_type
   - congestion_surcharge
4. 

# Change data type to the correct dtype

- pd.read_parquet doesn't have parse_date like read_csv.
- When you use pandas.read_parquet, the data types (dtypes) are typically inferred automatically from the metadata stored within the Parquet file itself. The Parquet format is a columnar storage format that preserves data types, unlike formats like CSV which require type inference.
- The primary behavior of read_parquet is to read the data using its inherent type information. The underlying engine (by default, pyarrow) handles the conversion from the Parquet data types to the corresponding pandas/NumPy types
- So we use .astype({'col_1': 'int64', 'col_2': 'category'})


In [14]:
df_trip["RatecodeID"].value_counts()

RatecodeID
1.0     38725
5.0      2432
2.0       112
4.0        53
3.0        20
99.0        1
Name: count, dtype: int64

In [15]:
df_trip.columns

Index(['VendorID', 'lpep_pickup_datetime', 'lpep_dropoff_datetime',
       'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID',
       'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax',
       'tip_amount', 'tolls_amount', 'ehail_fee', 'improvement_surcharge',
       'total_amount', 'payment_type', 'trip_type', 'congestion_surcharge',
       'cbd_congestion_fee'],
      dtype='str')

In [16]:
df_trip = df_trip.astype({
    "VendorID": "str",#changed from int32
    "lpep_pickup_datetime" : "datetime64[us]",
    "lpep_dropoff_datetime": "datetime64[us]",
    "store_and_fwd_flag": "str",
    "RatecodeID": "float64", #did not change because it behaved weird in postgresql
    "PULocationID": "str",
    "DOLocationID": "str",
    "passenger_count": "Int64",
    "trip_distance": "float64",
    "fare_amount": "float64",
    "extra": "float64",
    "mta_tax": "float64",
    "tip_amount": "float64",
    "tolls_amount": "float64",
    "ehail_fee" : "float64",
    "improvement_surcharge": "float64",
    "total_amount": "float64",
    "payment_type": "Int64", #did not change because it behaved weird in postgresql
    "trip_type" : "float64", #did not change because it behaved weird in postgresql
    "congestion_surcharge": "float64",
    "cbd_congestion_fee" : "float64"
    
})

In [17]:
df_trip.info()

<class 'pandas.DataFrame'>
RangeIndex: 46912 entries, 0 to 46911
Data columns (total 21 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   VendorID               46912 non-null  str           
 1   lpep_pickup_datetime   46912 non-null  datetime64[us]
 2   lpep_dropoff_datetime  46912 non-null  datetime64[us]
 3   store_and_fwd_flag     41343 non-null  str           
 4   RatecodeID             41343 non-null  float64       
 5   PULocationID           46912 non-null  str           
 6   DOLocationID           46912 non-null  str           
 7   passenger_count        41343 non-null  Int64         
 8   trip_distance          46912 non-null  float64       
 9   fare_amount            46912 non-null  float64       
 10  extra                  46912 non-null  float64       
 11  mta_tax                46912 non-null  float64       
 12  tip_amount             46912 non-null  float64       
 13  tolls_amount

In [18]:
df_zones.columns

Index(['LocationID', 'Borough', 'Zone', 'service_zone'], dtype='str')

In [19]:
df_zones = df_zones.astype({'LocationID' : "str", 
                            'Borough' : "str", 
                            'Zone' : "str", 
                            'service_zone' : "str"})

# Create database connection

In [20]:
from sqlalchemy import create_engine

In [21]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [22]:
try:
    with engine.connect() as connection:
        print("Connected to PostgreSQL database successfully!")
except Exception as e:
    print(f"Connection failed: {e}")

Connected to PostgreSQL database successfully!


# Get DDL Schema

In [23]:
print(pd.io.sql.get_schema(df_trip, name='green_taxi_data', con=engine))


CREATE TABLE green_taxi_data (
	"VendorID" TEXT, 
	lpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	lpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	store_and_fwd_flag TEXT, 
	"RatecodeID" FLOAT(53), 
	"PULocationID" TEXT, 
	"DOLocationID" TEXT, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	ehail_fee FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	payment_type BIGINT, 
	trip_type FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	cbd_congestion_fee FLOAT(53)
)




In [24]:
print(pd.io.sql.get_schema(df_zones, name='taxi_zone', con=engine))


CREATE TABLE taxi_zone (
	"LocationID" TEXT, 
	"Borough" TEXT, 
	"Zone" TEXT, 
	service_zone TEXT
)




# Create Table in PostgreSQL without inserting data

In [25]:
df_trip.head(n=0).to_sql(name='green_taxi_data', con=engine, if_exists='replace')
# head(n=0) ensure to create only table. no inserting any data

0

In [26]:
df_zones.head(n=0).to_sql(name='taxi_zone', con=engine, if_exists='replace')

0

# Inserting data to Postgre

In [27]:
%%time
df_trip.to_sql(name='green_taxi_data', con=engine, if_exists='append')

CPU times: user 2.3 s, sys: 72.4 ms, total: 2.37 s
Wall time: 3.58 s


912

In [29]:
%%time
df_zones.to_sql(name='taxi_zone', con=engine, if_exists='append')

CPU times: user 8.52 ms, sys: 0 ns, total: 8.52 ms
Wall time: 14.6 ms


265