In [1]:
import os
import urllib.request
import gzip
import time
from concurrent.futures import ThreadPoolExecutor
from google.cloud import bigquery, storage
import pandas as pd
import pyarrow as pa
from dotenv import load_dotenv

In [2]:
# Load environment variables from .env file
load_dotenv()

True

In [4]:
BASE_URL = os.getenv("BASE_URL")
MONTHS = [f"{i:02d}" for i in range(1, 13)] 
DOWNLOAD_DIR = os.getenv("DOWNLOAD_DIR")
CHUNK_SIZE = 8 * 1024 * 1024
BUCKET_NAME = os.getenv("BUCKET_NAME")
PROJECT_ID = os.getenv("PROJECT_ID")
DATASET_ID = os.getenv("DATASET_ID")
TABLE_ID = os.getenv("TABLE_ID")

In [5]:
os.makedirs(DOWNLOAD_DIR, exist_ok=True)

In [6]:
#If you authenticated through the GCP SDK you can comment out these two lines
GOOGLE_APPLICATION_CREDENTIALS="D:/DEZommcamp2025/gcp-serviceaccount/gcp-warehouse.json" 
client = storage.Client.from_service_account_json(GOOGLE_APPLICATION_CREDENTIALS)
# Initialize BigQuery and Storage clients
bucket = client.bucket(BUCKET_NAME)
bigquery_client =  bigquery.Client.from_service_account_json(GOOGLE_APPLICATION_CREDENTIALS)

In [7]:
def download_and_unzip_file(month):
    url = f"{BASE_URL}fhv_tripdata_2019-{month}.csv.gz"
    gz_file_path = os.path.join(DOWNLOAD_DIR, f"fhv_tripdata_2019-{month}.csv.gz")
    csv_file_path = os.path.join(DOWNLOAD_DIR, f"fhv_tripdata_2019-{month}.csv")

    try:
        print(f"Downloading {url}...")
        urllib.request.urlretrieve(url, gz_file_path)
        print(f"Downloaded: {gz_file_path}")
        
        # Unzip the file
        with gzip.open(gz_file_path, 'rb') as f_in:
            with open(csv_file_path, 'wb') as f_out:
                f_out.write(f_in.read())
        print(f"Unzipped: {csv_file_path}")
        
        return csv_file_path
    except Exception as e:
        print(f"Failed to download or unzip {url}: {e}")
        return None

In [8]:
with ThreadPoolExecutor(max_workers=4) as executor:
        file_paths = list(executor.map(download_and_unzip_file, MONTHS))

Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-01.csv.gz...
Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-02.csv.gz...
Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-03.csv.gz...
Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-04.csv.gz...
Downloaded: ./data\fhv_tripdata_2019-04.csv.gz
Downloaded: ./data\fhv_tripdata_2019-02.csv.gz
Unzipped: ./data\fhv_tripdata_2019-04.csv
Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-05.csv.gz...
Downloaded: ./data\fhv_tripdata_2019-03.csv.gz
Unzipped: ./data\fhv_tripdata_2019-02.csv
Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-06.csv.gz...
Unzipped: ./data\fhv_tripdata_2019-03.csv
Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/

In [28]:
schema = [
    bigquery.SchemaField("dispatching_base_num", "STRING"),
    bigquery.SchemaField("pickup_datetime", "TIMESTAMP"),
    bigquery.SchemaField("dropOff_datetime", "TIMESTAMP"),
    bigquery.SchemaField("PUlocationID", "INTEGER"),
    bigquery.SchemaField("DOlocationID", "INTEGER"),
    bigquery.SchemaField("SR_Flag", "INTEGER"),
    bigquery.SchemaField("Affiliated_base_number", "STRING"),
   
]

In [29]:
def create_bq_table_with_schema(dataset_id, table_id, schema):
    
    # Create the table with the specified schema
    table_ref = bigquery_client.dataset(dataset_id).table(table_id)

    # Create the table with the specified schema
    table = bigquery.Table(table_ref, schema=schema)
    table = bigquery_client.create_table(table)

    print(f"Created table in BQ {table.project}.{table.dataset_id}.{table.table_id}")

In [30]:
table_ref = bigquery_client.dataset(dataset_id=DATASET_ID).table(table_id=TABLE_ID)

In [31]:
create_bq_table_with_schema(dataset_id=DATASET_ID, table_id=TABLE_ID, schema=schema)

Created table in BQ de-zoomcamp2025-448100.trips_data_all.fhv_tripdata


In [32]:
file_paths

['./data\\fhv_tripdata_2019-01.csv',
 './data\\fhv_tripdata_2019-02.csv',
 './data\\fhv_tripdata_2019-03.csv',
 './data\\fhv_tripdata_2019-04.csv',
 './data\\fhv_tripdata_2019-05.csv',
 './data\\fhv_tripdata_2019-06.csv',
 './data\\fhv_tripdata_2019-07.csv',
 './data\\fhv_tripdata_2019-08.csv',
 './data\\fhv_tripdata_2019-09.csv',
 './data\\fhv_tripdata_2019-10.csv',
 './data\\fhv_tripdata_2019-11.csv',
 './data\\fhv_tripdata_2019-12.csv']

In [24]:
df = pd.read_csv('./data\\fhv_tripdata_2019-09.csv')

In [25]:
df.head()

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-09-01 00:35:00,2019-09-01 00:59:00,264.0,264.0,,B00009
1,B00009,2019-09-01 00:48:00,2019-09-01 01:09:00,264.0,264.0,,B00009
2,B00014,2019-09-01 00:16:18,2019-09-02 00:35:37,264.0,264.0,,B00014
3,B00014,2019-09-01 00:55:03,2019-09-01 01:09:35,264.0,264.0,,B00014
4,B00014,2019-09-01 00:13:08,2019-09-02 01:12:31,264.0,264.0,,B00014


In [33]:
def upload_to_bigquery(file_paths):
    for file_path in file_paths:
        try:
            # Load CSV data into a DataFrame
            df = pd.read_csv(file_path)
            
            # Convert datetime columns to the correct data type
            df["pickup_datetime"] = pd.to_datetime(df["pickup_datetime"])
            df["dropOff_datetime"] = pd.to_datetime(df["dropOff_datetime"])
            
            table_ref = bigquery_client.dataset(DATASET_ID).table(TABLE_ID)
            
            # Load DataFrame to BigQuery
            job_config = bigquery.LoadJobConfig(schema=schema)
            job = bigquery_client.load_table_from_dataframe(df, table_ref, job_config=job_config)
            job.result()  # Wait for the load job to complete
            
            print(f'Loaded {job.output_rows} rows from {file_path} into {DATASET_ID}.{TABLE_ID}.')
        except Exception as e:
            print(f"Failed to upload data from {file_path} to BigQuery: {e}")

In [34]:
upload_to_bigquery(filter(None, file_paths))

Loaded 23143222 rows from ./data\fhv_tripdata_2019-01.csv into trips_data_all.fhv_tripdata.
Loaded 1707649 rows from ./data\fhv_tripdata_2019-02.csv into trips_data_all.fhv_tripdata.
Loaded 1475564 rows from ./data\fhv_tripdata_2019-03.csv into trips_data_all.fhv_tripdata.
Loaded 1937844 rows from ./data\fhv_tripdata_2019-04.csv into trips_data_all.fhv_tripdata.
Loaded 2073045 rows from ./data\fhv_tripdata_2019-05.csv into trips_data_all.fhv_tripdata.
Loaded 2009886 rows from ./data\fhv_tripdata_2019-06.csv into trips_data_all.fhv_tripdata.
Loaded 1947739 rows from ./data\fhv_tripdata_2019-07.csv into trips_data_all.fhv_tripdata.
Loaded 1880407 rows from ./data\fhv_tripdata_2019-08.csv into trips_data_all.fhv_tripdata.
Loaded 1248514 rows from ./data\fhv_tripdata_2019-09.csv into trips_data_all.fhv_tripdata.
Loaded 1897493 rows from ./data\fhv_tripdata_2019-10.csv into trips_data_all.fhv_tripdata.
Loaded 1879137 rows from ./data\fhv_tripdata_2019-11.csv into trips_data_all.fhv_tripdata

In [36]:
def purge_files(directory):
    try:
        for file_name in os.listdir(directory):
            file_path = os.path.join(directory, file_name)
            if os.path.isfile(file_path):
                os.remove(file_path)
                print(f"Deleted file: {file_path}")
    except Exception as e:
        print(f"Failed to purge files: {e}")

In [37]:
 # Purge files in the download directory
purge_files(DOWNLOAD_DIR)

print("All files processed, uploaded to BigQuery, and purged from the download directory.")

Deleted file: ./data\fhv_tripdata_2019-01.csv
Deleted file: ./data\fhv_tripdata_2019-01.csv.gz
Deleted file: ./data\fhv_tripdata_2019-02.csv
Deleted file: ./data\fhv_tripdata_2019-02.csv.gz
Deleted file: ./data\fhv_tripdata_2019-03.csv
Deleted file: ./data\fhv_tripdata_2019-03.csv.gz
Deleted file: ./data\fhv_tripdata_2019-04.csv
Deleted file: ./data\fhv_tripdata_2019-04.csv.gz
Deleted file: ./data\fhv_tripdata_2019-05.csv
Deleted file: ./data\fhv_tripdata_2019-05.csv.gz
Deleted file: ./data\fhv_tripdata_2019-06.csv
Deleted file: ./data\fhv_tripdata_2019-06.csv.gz
Deleted file: ./data\fhv_tripdata_2019-07.csv
Deleted file: ./data\fhv_tripdata_2019-07.csv.gz
Deleted file: ./data\fhv_tripdata_2019-08.csv
Deleted file: ./data\fhv_tripdata_2019-08.csv.gz
Deleted file: ./data\fhv_tripdata_2019-09.csv
Deleted file: ./data\fhv_tripdata_2019-09.csv.gz
Deleted file: ./data\fhv_tripdata_2019-10.csv
Deleted file: ./data\fhv_tripdata_2019-10.csv.gz
Deleted file: ./data\fhv_tripdata_2019-11.csv
Dele