In [2]:
import os
with open('/workspaces/Data-Engineering-Zoomcamp/XX-Workshop1/new-de-zoomcamp-449719-9c27773d9a31.json', 'r') as f:
    os.environ["DESTINATION__BIGQUERY__CREDENTIALS"] = f.read()

In [3]:
import os
import requests
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import logging
import dlt
from dlt.sources import DltResource
from typing import List, Dict, Iterator, Any, Tuple, Optional

@dlt.source
def nyc_taxi_data(
    year: int = datetime.now().year,
    start_month: int = 1,
    end_month: int = 7,
    max_workers: int = 1,
    base_url: str = "https://d37ci6vzurychx.cloudfront.net/trip-data"
) -> List[DltResource]:
    """
    A dlt source for NYC Taxi trip data.

    Args:
        year: Year of the data to download (default: current year)
        start_month: Starting month (1-12, default: 1)
        end_month: Ending month (1-12, default: 7)
        max_workers: Maximum number of parallel workers (default: 4)
        base_url: Base URL for NYC taxi data (default: "https://d37ci6vzurychx.cloudfront.net/trip-data")

    Returns:
        List of dlt resources for NYC taxi data
    """
    # Set up logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    logger = logging.getLogger(__name__)

    @dlt.resource(write_disposition="merge", primary_key=["vendorID", "tpep_pickup_datetime"])
    def yellow_taxi_trips() -> Iterator[Dict[str, Any]]:
        """
        Resource that yields NYC yellow taxi trip data for a specified time range.
        Data is downloaded and yielded in chunks for memory efficiency.
        """
        def download_month_data(year: int, month: int) -> Optional[List[Dict[str, Any]]]:
            file_name = f"yellow_tripdata_{year}-{month:02d}.parquet"
            url = f"{base_url}/{file_name}"
            
            try:
                logger.info(f"Downloading {file_name}")
                response = requests.get(url, stream=True)
                response.raise_for_status()
                
                # Save to a temporary file
                temp_path = f"/tmp/{file_name}"
                with open(temp_path, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=1000):
                        f.write(chunk)
                
                # Read parquet file into a pandas DataFrame and convert to dict
                import pandas as pd
                df = pd.read_parquet(temp_path)
                
                # Clean up temporary file
                os.remove(temp_path)
                
                # Add metadata columns for tracking
                df['source_file'] = file_name
                df['ingestion_timestamp'] = datetime.now().isoformat()
                
                logger.info(f"Successfully processed {file_name}, found {len(df)} records")
                return df.to_dict('records')
                
            except Exception as e:
                logger.error(f"Error downloading {file_name}: {str(e)}")
                return None
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(download_month_data, year, month): (year, month)
                for month in range(start_month, end_month + 1)
            }
            
            for future in futures:
                result = future.result()
                if result:
                    for record in result:
                        yield record
    
    return [yellow_taxi_trips]

def run_nyc_taxi_pipeline():
    """
    Main function to run the NYC taxi pipeline to BigQuery
    """
    # Configure the pipeline to load data to BigQuery
    pipeline = dlt.pipeline(
        pipeline_name='nyc_taxi_data',
        destination='bigquery',
        dataset_name='nyc_taxi',
        dev_mode=True
    )
    
    # Load the data
    load_info = pipeline.run(
        nyc_taxi_data(
            year=2024,  # can be parameterized
            start_month=1,  # can be parameterized
            end_month=2,   # can be parameterized
            max_workers=1  # can be parameterized
        )
    )
    
    print(f"Load info: {load_info}")
    
    # Return info for verification
    return load_info

if __name__ == "__main__":
    run_nyc_taxi_pipeline()

# To customize pipeline for BigQuery, you can use the following:
"""
import dlt

# Create credentials from service account key file
credentials = dlt.secrets.service_account_key_file('/path/to/credentials.json')

# Initialize pipeline with BigQuery-specific configuration
pipeline = dlt.pipeline(
    pipeline_name='nyc_taxi_data',
    destination='bigquery',
    dataset_name='nyc_taxi',
    credentials=credentials,
    full_refresh=False, # set to True to replace all data
    progress=True
)

# Run the pipeline
pipeline.run(nyc_taxi_data())
"""

# You can also run from command line with:
# python nyc_taxi_dlt.py

2025-02-18 05:16:47,480 - INFO - Downloading yellow_tripdata_2024-01.parquet
2025-02-18 05:16:48,536 - INFO - Successfully processed yellow_tripdata_2024-01.parquet, found 2964624 records
2025-02-18 05:17:14,269 - INFO - Downloading yellow_tripdata_2024-02.parquet
2025-02-18 05:17:17,306 - INFO - Successfully processed yellow_tripdata_2024-02.parquet, found 3007526 records


: 