In [1]:
!pip install pandas polars duckdb pyarrow



#Part 1: Data Ingestion

**1.** Programmatic Download

In [2]:
import requests
import os
from pathlib import Path

down_dir = Path("data/raw")
down_dir.mkdir(parents=True, exist_ok=True)

download = [
    {
        'url': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet',
        'filename': down_dir / 'yellow_tripdata_2024_01.parquet'
    },
    {
        'url': 'https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv',
        'filename': down_dir / 'taxi_zone_lookup.csv'
    }
]

for file in download:
  print(f'Downloading {file['url']}...')

  response = requests.get(file['url'], stream=True)

  response.raise_for_status()

  with open(file['filename'], 'wb') as f:
    for chunk in response.iter_content(chunk_size=8192):
      f.write(chunk)

  print(f'Downloaded to {file['filename']}')

print('All downloads completed.')

Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet...
Downloaded to data/raw/yellow_tripdata_2024_01.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv...
Downloaded to data/raw/taxi_zone_lookup.csv
All downloads completed.


The above uses a "requests" implementation to download and store necessary files.

**2.** Data validation

In [10]:
import polars as pl
import time

#a) verifing expected columns exist
ex_cols = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance', 'fare_amount', 'tip_amount', 'total_amount', 'payment_type']

taxi_schema = pl.read_parquet_schema(down_dir / 'yellow_tripdata_2024_01.parquet')


actual_cols = list(taxi_schema.keys())
print(f"Actual columns in file: {actual_cols}")

missing_cols = [col for col in ex_cols if col not in actual_cols]

if not missing_cols:
  print('\nAll expected columns exist.')
else:
  # d.a)
  print(f'\nDataset is missing the following expected coloumns: {missing_cols}')
  raise Exception('Missing expected columns.')

#b) checking for valid datatime type in datetime columns
date_cols = ['tpep_pickup_datetime', 'tpep_dropoff_datetime']

for col in date_cols:
  if col in taxi_schema:
    datet = taxi_schema[col]
    if datet == pl.Datetime:
      print(f'\ncolumn {col} is of {datet} type')
    else:
      print(f'\ncolumn {col} is not of datetime type')
      raise Exception(f'column {col} is not of datetime type')

#c) Report total row count and print a summary to the console


Actual columns in file: ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'Airport_fee']

All expected columns exist.

column tpep_pickup_datetime is of Datetime(time_unit='ns', time_zone=None) type

column tpep_dropoff_datetime is of Datetime(time_unit='ns', time_zone=None) type
