In [1]:
import polars as pl
import requests
from pprint import pprint
import glob
import pandas as pd
import geopandas as gpd
from polars._typing import SchemaDict
import polars_st as st
import os

In [2]:
from transformation import order_columns_by_schema, add_location_ids, col_rename, values_map, add_missing_columns, enforce_schema_types

In [3]:

SHAPEFILE_PATH = r"C:\Users\stgadmin\Desktop\Taxi_AirFlow\data\taxi_zones\\"
zones_gdf = gpd.read_file(SHAPEFILE_PATH)
zones_df = st.from_geopandas(zones_gdf.to_crs("EPSG:4326"))
zones_lazy = zones_df.select(['geometry', 'LocationID']).lazy()

In [4]:
DIR = r"C:\Users\stgadmin\Desktop\Taxi_AirFlow\data\test-in\yellow\2025\*.parquet"
files = glob.glob(DIR)


In [5]:
target_schema_2025_dict = pl.scan_parquet(files[0]).collect_schema()

In [6]:
def run_transformation(
    source_lazy_df: pl.LazyFrame,
    zones_df: pl.DataFrame,
    target_schema: SchemaDict
) -> pl.LazyFrame:
    """
        renvoie Un nouveau LazyFrame aligné sur le schéma cible.
    """
    print("--- START ---")

   
    df_step1 = col_rename(source_lazy_df)

    
    df_step2 = add_location_ids(df_step1, zones_df)

    
    df_step3 = values_map(df_step2)

    
    df_step4 = add_missing_columns(df_step3, target_schema)

    
    df_step5 = enforce_schema_types(df_step4, target_schema)
    
    
    final_lazy_df = order_columns_by_schema(df_step5, target_schema)

    print("--- END ---")
    
    return final_lazy_df

 ## TEST POUR 2009

In [21]:
DIR = r"C:\Users\stgadmin\Desktop\Taxi_AirFlow\data\test-in\yellow\2009\*.parquet"
files = glob.glob(DIR)


In [22]:
print(files)

['C:\\Users\\stgadmin\\Desktop\\Taxi_AirFlow\\data\\test-in\\yellow\\2009\\yellow_tripdata_2009_all.parquet']


In [None]:
for file in files : 
    data = pl.scan_parquet(file)
    file_name = file
    file_again = file.split('\\')
    file_again[6] = 'test_out'
    file_name = ('\\').join(file_again)
    print(file_name)
    result = run_transformation(data,zones_lazy,target_schema_2025_dict)
    result.sink_parquet(file_name) 

C:\Users\stgadmin\Desktop\Taxi_AirFlow\data\test_out\yellow\2009\yellow_tripdata_2009_all.parquet
--- START ---
Mapping started (robust version)...
  - Converting 'store_and_fwd_flag'...
  - Converting 'payment_type'...
  - Missing columns detected and will be added: ['Airport_fee', 'cbd_congestion_fee', 'congestion_surcharge', 'improvement_surcharge']
-> Dynamically checking and converting column types to match target schema...
  - Mismatch found for 'VendorID': Current is Int64, Target is Int32. Converting...
  - Mismatch found for 'tpep_pickup_datetime': Current is String, Target is Datetime(time_unit='us', time_zone=None). Converting...
  - Mismatch found for 'tpep_dropoff_datetime': Current is String, Target is Datetime(time_unit='us', time_zone=None). Converting...
  - Mismatch found for 'RatecodeID': Current is Float64, Target is Int64. Converting...
-> Step 5: Ordering columns to match the final schema...
--- END ---


In [13]:
new_2009 = pl.scan_parquet(r"C:\Users\stgadmin\Desktop\Taxi_AirFlow\data\test_out\yellow\2009\yellow_tripdata_2009_all.parquet").collect()

In [14]:
print(new_2009)

shape: (170_896_064, 20)
┌──────────┬──────────────┬──────────────┬──────────────┬───┬──────────────┬──────────────┬─────────────┬──────────────┐
│ VendorID ┆ tpep_pickup_ ┆ tpep_dropoff ┆ passenger_co ┆ … ┆ total_amount ┆ congestion_s ┆ Airport_fee ┆ cbd_congesti │
│ ---      ┆ datetime     ┆ _datetime    ┆ unt          ┆   ┆ ---          ┆ urcharge     ┆ ---         ┆ on_fee       │
│ i32      ┆ ---          ┆ ---          ┆ ---          ┆   ┆ f64          ┆ ---          ┆ f64         ┆ ---          │
│          ┆ datetime[μs] ┆ datetime[μs] ┆ i64          ┆   ┆              ┆ f64          ┆             ┆ f64          │
╞══════════╪══════════════╪══════════════╪══════════════╪═══╪══════════════╪══════════════╪═════════════╪══════════════╡
│ 2        ┆ 2009-01-12   ┆ 2009-01-12   ┆ 1            ┆ … ┆ 10.1         ┆ null         ┆ null        ┆ null         │
│          ┆ 19:54:00     ┆ 20:02:00     ┆              ┆   ┆              ┆              ┆             ┆              │
│ 2    

# test pour 2010

In [7]:
DIR = r"C:\Users\stgadmin\Desktop\Taxi_AirFlow\data\test-in\yellow\2010\*.parquet"
files = glob.glob(DIR)
print(files)

['C:\\Users\\stgadmin\\Desktop\\Taxi_AirFlow\\data\\test-in\\yellow\\2010\\yellow_tripdata_2010-01.parquet']


In [8]:
print(pl.scan_parquet(files[0]).collect())

shape: (14_863_778, 18)
┌───────────┬────────────────┬────────────────┬───────────────┬───┬─────────┬────────────┬──────────────┬──────────────┐
│ vendor_id ┆ pickup_datetim ┆ dropoff_dateti ┆ passenger_cou ┆ … ┆ mta_tax ┆ tip_amount ┆ tolls_amount ┆ total_amount │
│ ---       ┆ e              ┆ me             ┆ nt            ┆   ┆ ---     ┆ ---        ┆ ---          ┆ ---          │
│ str       ┆ ---            ┆ ---            ┆ ---           ┆   ┆ f64     ┆ f64        ┆ f64          ┆ f64          │
│           ┆ str            ┆ str            ┆ i64           ┆   ┆         ┆            ┆              ┆              │
╞═══════════╪════════════════╪════════════════╪═══════════════╪═══╪═════════╪════════════╪══════════════╪══════════════╡
│ VTS       ┆ 2010-01-26     ┆ 2010-01-26     ┆ 1             ┆ … ┆ 0.5     ┆ 0.0        ┆ 0.0          ┆ 5.0          │
│           ┆ 07:41:00       ┆ 07:45:00       ┆               ┆   ┆         ┆            ┆              ┆              │
│ DDS   

In [10]:
for file in files : 
    data = pl.scan_parquet(file)
    file_name = file
    file_again = file.split('\\')
    file_again[6] = 'test_out'
    file_name = ('\\').join(file_again)
    print(file_name)
    result = run_transformation(data,zones_lazy,target_schema_2025_dict)
    result.sink_parquet(file_name) 

C:\Users\stgadmin\Desktop\Taxi_AirFlow\data\test_out\yellow\2010\yellow_tripdata_2010-01.parquet
--- START ---
pickup_datetime tpep_pickup_datetime
dropoff_datetime tpep_dropoff_datetime
rate_code RatecodeID
surcharge extra
vendor_id VendorID
pickup_longitude Start_Lon
pickup_latitude Start_Lat
dropoff_longitude End_Lon
dropoff_latitude End_Lat
Schema({'VendorID': String, 'tpep_pickup_datetime': String, 'tpep_dropoff_datetime': String, 'passenger_count': Int64, 'trip_distance': Float64, 'Start_Lon': Float64, 'Start_Lat': Float64, 'RatecodeID': String, 'store_and_fwd_flag': String, 'End_Lon': Float64, 'End_Lat': Float64, 'payment_type': String, 'fare_amount': Float64, 'extra': Float64, 'mta_tax': Float64, 'tip_amount': Float64, 'tolls_amount': Float64, 'total_amount': Float64})
Mapping started (robust version)...
  - Converting 'payment_type'...
  - Missing columns detected and will be added: ['Airport_fee', 'cbd_congestion_fee', 'congestion_surcharge', 'improvement_surcharge']
-> Dynam

  if not required_cols.issubset(data_lazy.columns):
  current_columns = set(lazy_df.columns)


In [12]:
new_2010 = pl.scan_parquet(r"C:\Users\stgadmin\Desktop\Taxi_AirFlow\data\test_out\yellow\2010\yellow_tripdata_2010-01.parquet").collect()

In [13]:
print(new_2010)

shape: (14_863_786, 20)
┌──────────┬──────────────┬──────────────┬──────────────┬───┬──────────────┬──────────────┬─────────────┬──────────────┐
│ VendorID ┆ tpep_pickup_ ┆ tpep_dropoff ┆ passenger_co ┆ … ┆ total_amount ┆ congestion_s ┆ Airport_fee ┆ cbd_congesti │
│ ---      ┆ datetime     ┆ _datetime    ┆ unt          ┆   ┆ ---          ┆ urcharge     ┆ ---         ┆ on_fee       │
│ i32      ┆ ---          ┆ ---          ┆ ---          ┆   ┆ f64          ┆ ---          ┆ f64         ┆ ---          │
│          ┆ datetime[μs] ┆ datetime[μs] ┆ i64          ┆   ┆              ┆ f64          ┆             ┆ f64          │
╞══════════╪══════════════╪══════════════╪══════════════╪═══╪══════════════╪══════════════╪═════════════╪══════════════╡
│ 1        ┆ 2010-01-12   ┆ 2010-01-12   ┆ 1            ┆ … ┆ 8.4          ┆ null         ┆ null        ┆ null         │
│          ┆ 12:16:34     ┆ 12:25:23     ┆              ┆   ┆              ┆              ┆             ┆              │
│ 1     

# TEST pour 2011


In [15]:
DIR = r"C:\Users\stgadmin\Desktop\Taxi_AirFlow\data\test-in\yellow\2011\*.parquet"
files = glob.glob(DIR)
print(files)

['C:\\Users\\stgadmin\\Desktop\\Taxi_AirFlow\\data\\test-in\\yellow\\2011\\yellow_tripdata_2011-01.parquet']


In [16]:
for file in files : 
    data = pl.scan_parquet(file)
    file_name = file
    file_again = file.split('\\')
    file_again[6] = 'test_out'
    file_name = ('\\').join(file_again)
    print(file_name)
    result = run_transformation(data,zones_lazy,target_schema_2025_dict)
    result.sink_parquet(file_name) 

C:\Users\stgadmin\Desktop\Taxi_AirFlow\data\test_out\yellow\2011\yellow_tripdata_2011-01.parquet
--- START ---
Schema({'VendorID': Int64, 'tpep_pickup_datetime': Datetime(time_unit='ns', time_zone=None), 'tpep_dropoff_datetime': Datetime(time_unit='ns', time_zone=None), 'passenger_count': Int64, 'trip_distance': Float64, 'RatecodeID': Int64, 'store_and_fwd_flag': String, 'PULocationID': Int64, 'DOLocationID': Int64, 'payment_type': Int64, 'fare_amount': Float64, 'extra': Float64, 'mta_tax': Float64, 'tip_amount': Float64, 'tolls_amount': Float64, 'improvement_surcharge': Float64, 'total_amount': Float64, 'congestion_surcharge': Float64, 'airport_fee': Float64})
pas de coords longitude/ latitude
Mapping started (robust version)...
  - No value mapping needed for store_and_fwd_flag or payment_type.
  - Missing columns detected and will be added: ['Airport_fee', 'cbd_congestion_fee']
-> Dynamically checking and converting column types to match target schema...
  - Mismatch found for 'Vend

In [17]:
new_2011 = pl.scan_parquet(r"C:\Users\stgadmin\Desktop\Taxi_AirFlow\data\test_out\yellow\2011\yellow_tripdata_2011-01.parquet").collect()
print(new_2011)

shape: (13_464_997, 20)
┌──────────┬──────────────┬──────────────┬──────────────┬───┬──────────────┬──────────────┬─────────────┬──────────────┐
│ VendorID ┆ tpep_pickup_ ┆ tpep_dropoff ┆ passenger_co ┆ … ┆ total_amount ┆ congestion_s ┆ Airport_fee ┆ cbd_congesti │
│ ---      ┆ datetime     ┆ _datetime    ┆ unt          ┆   ┆ ---          ┆ urcharge     ┆ ---         ┆ on_fee       │
│ i32      ┆ ---          ┆ ---          ┆ ---          ┆   ┆ f64          ┆ ---          ┆ f64         ┆ ---          │
│          ┆ datetime[μs] ┆ datetime[μs] ┆ i64          ┆   ┆              ┆ f64          ┆             ┆ f64          │
╞══════════╪══════════════╪══════════════╪══════════════╪═══╪══════════════╪══════════════╪═════════════╪══════════════╡
│ 2        ┆ 2011-01-01   ┆ 2011-01-01   ┆ 4            ┆ … ┆ 4.18         ┆ null         ┆ null        ┆ null         │
│          ┆ 00:10:00     ┆ 00:12:00     ┆              ┆   ┆              ┆              ┆             ┆              │
│ 2     

# TEST POUR 2012


In [7]:
DIR = r"C:\Users\stgadmin\Desktop\Taxi_AirFlow\data\test-in\yellow\2012\*.parquet"
files = glob.glob(DIR)
print(files)

['C:\\Users\\stgadmin\\Desktop\\Taxi_AirFlow\\data\\test-in\\yellow\\2012\\yellow_tripdata_2012-01.parquet']


In [8]:
print(pl.scan_parquet(files[0]).collect_schema())

Schema({'VendorID': Int64, 'tpep_pickup_datetime': Datetime(time_unit='ns', time_zone=None), 'tpep_dropoff_datetime': Datetime(time_unit='ns', time_zone=None), 'passenger_count': Int64, 'trip_distance': Float64, 'RatecodeID': Int64, 'store_and_fwd_flag': String, 'PULocationID': Int64, 'DOLocationID': Int64, 'payment_type': Int64, 'fare_amount': Float64, 'extra': Float64, 'mta_tax': Float64, 'tip_amount': Float64, 'tolls_amount': Float64, 'improvement_surcharge': Float64, 'total_amount': Float64, 'congestion_surcharge': Null, 'airport_fee': Null})


In [9]:
target_schema_2025_dict

Schema([('VendorID', Int32),
        ('tpep_pickup_datetime', Datetime(time_unit='us', time_zone=None)),
        ('tpep_dropoff_datetime', Datetime(time_unit='us', time_zone=None)),
        ('passenger_count', Int64),
        ('trip_distance', Float64),
        ('RatecodeID', Int64),
        ('store_and_fwd_flag', String),
        ('PULocationID', Int32),
        ('DOLocationID', Int32),
        ('payment_type', Int64),
        ('fare_amount', Float64),
        ('extra', Float64),
        ('mta_tax', Float64),
        ('tip_amount', Float64),
        ('tolls_amount', Float64),
        ('improvement_surcharge', Float64),
        ('total_amount', Float64),
        ('congestion_surcharge', Float64),
        ('Airport_fee', Float64),
        ('cbd_congestion_fee', Float64)])

In [10]:
for file in files : 
    data = pl.scan_parquet(file)
    file_name = file
    file_again = file.split('\\')
    file_again[6] = 'test_out'
    file_name = ('\\').join(file_again)
    print(file_name)
    result = run_transformation(data,zones_lazy,target_schema_2025_dict)
    result.sink_parquet(file_name) 

C:\Users\stgadmin\Desktop\Taxi_AirFlow\data\test_out\yellow\2012\yellow_tripdata_2012-01.parquet
--- START ---
airport_fee Airport_fee
Schema({'VendorID': Int64, 'tpep_pickup_datetime': Datetime(time_unit='ns', time_zone=None), 'tpep_dropoff_datetime': Datetime(time_unit='ns', time_zone=None), 'passenger_count': Int64, 'trip_distance': Float64, 'RatecodeID': Int64, 'store_and_fwd_flag': String, 'PULocationID': Int64, 'DOLocationID': Int64, 'payment_type': Int64, 'fare_amount': Float64, 'extra': Float64, 'mta_tax': Float64, 'tip_amount': Float64, 'tolls_amount': Float64, 'improvement_surcharge': Float64, 'total_amount': Float64, 'congestion_surcharge': Null, 'Airport_fee': Null})
pas de coords longitude/ latitude
Mapping started (robust version)...
  - No value mapping needed for store_and_fwd_flag or payment_type.
  - Missing columns detected and will be added: ['cbd_congestion_fee']
-> Dynamically checking and converting column types to match target schema...
  - Mismatch found for 'V

  if not required_cols.issubset(data_lazy.columns):
  current_columns = set(lazy_df.columns)


# TEST POUR TOUTES LES ANNEES

In [11]:
DIR = r"C:\Users\stgadmin\Desktop\Taxi_AirFlow\data\test-in\yellow\**\*.parquet"
files = glob.glob(DIR)
print(files)

['C:\\Users\\stgadmin\\Desktop\\Taxi_AirFlow\\data\\test-in\\yellow\\2009\\yellow_tripdata_2009_all.parquet', 'C:\\Users\\stgadmin\\Desktop\\Taxi_AirFlow\\data\\test-in\\yellow\\2010\\yellow_tripdata_2010-01.parquet', 'C:\\Users\\stgadmin\\Desktop\\Taxi_AirFlow\\data\\test-in\\yellow\\2011\\yellow_tripdata_2011-01.parquet', 'C:\\Users\\stgadmin\\Desktop\\Taxi_AirFlow\\data\\test-in\\yellow\\2012\\yellow_tripdata_2012-01.parquet', 'C:\\Users\\stgadmin\\Desktop\\Taxi_AirFlow\\data\\test-in\\yellow\\2013\\yellow_tripdata_2013-01.parquet', 'C:\\Users\\stgadmin\\Desktop\\Taxi_AirFlow\\data\\test-in\\yellow\\2014\\yellow_tripdata_2014-01.parquet', 'C:\\Users\\stgadmin\\Desktop\\Taxi_AirFlow\\data\\test-in\\yellow\\2015\\yellow_tripdata_2015-01.parquet', 'C:\\Users\\stgadmin\\Desktop\\Taxi_AirFlow\\data\\test-in\\yellow\\2016\\yellow_tripdata_2016-01.parquet', 'C:\\Users\\stgadmin\\Desktop\\Taxi_AirFlow\\data\\test-in\\yellow\\2017\\yellow_tripdata_2017-01.parquet', 'C:\\Users\\stgadmin\\Desk

In [None]:
for file in files : 
    data = pl.scan_parquet(file)
    file_name = file
    file_again = file.split('\\')
    file_again[6] = 'test_out'
    file_name = ('\\').join(file_again)
    print(file_name)
    result = run_transformation(data,zones_lazy,target_schema_2025_dict)
    result.sink_parquet(file_name) 

C:\Users\stgadmin\Desktop\Taxi_AirFlow\data\test_out\yellow\2009\yellow_tripdata_2009_all.parquet
--- START ---
Trip_Pickup_DateTime tpep_pickup_datetime
Trip_Dropoff_DateTime tpep_dropoff_datetime
Passenger_Count passenger_count
Trip_Distance trip_distance
Rate_Code RatecodeID
store_and_forward store_and_fwd_flag
Payment_Type payment_type
Fare_Amt fare_amount
surcharge extra
Tip_Amt tip_amount
Tolls_Amt tolls_amount
Total_Amt total_amount
Schema({'vendor_name': String, 'tpep_pickup_datetime': String, 'tpep_dropoff_datetime': String, 'passenger_count': Int64, 'trip_distance': Float64, 'Start_Lon': Float64, 'Start_Lat': Float64, 'RatecodeID': Float64, 'store_and_fwd_flag': Float64, 'End_Lon': Float64, 'End_Lat': Float64, 'payment_type': String, 'fare_amount': Float64, 'extra': Float64, 'mta_tax': Float64, 'tip_amount': Float64, 'tolls_amount': Float64, 'total_amount': Float64})
Mapping started (robust version)...
  - Converting 'store_and_fwd_flag'...
  - Converting 'payment_type'...
  