In [14]:
import pandas as pd
import os 
from google.cloud import storage
import pandas_gbq
import requests
from io import BytesIO, StringIO
import urllib.request
import gzip

In [None]:

# Set Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "../../keys/zoomcamp-sa.json"

# GCP Details
project_id = "coral-velocity-451115-d9"
bucket_name = "rimsha-kestra"

# Base URL for GitHub releases (corrected)
base_url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download"

# Define years and taxi types
years = ["2019", "2020"]
taxi_types = {
    "green": "green",
    "yellow": "yellow"
}

# Initialize GCS client
storage_client = storage.Client()

def upload_to_gcs(bucket_name, destination_blob_name, file_content):
    """Uploads file content to GCS directly from memory."""
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    
    blob.upload_from_string(file_content.getvalue(), content_type="text/csv")
    
    print(f"Uploaded to GCS: gs://{bucket_name}/{destination_blob_name}")

# Function to process .csv.gz files and upload as .csv
def process_and_upload(year, taxi_type, github_tag):
    # for yellow and green tables -- for fhv set taxi_type and github_tag as both "fhv"
    file_name_template = f"{taxi_type}_tripdata_{year}-{{month}}.csv.gz" 

    for month in range(1, 13):  # Loop through all months
        month_str = f"{month:02d}"  # Format as "01", "02", ..., "12"
        file_name = file_name_template.format(month=month_str)
        file_url = f"{base_url}/{github_tag}/{file_name}"  # Corrected download URL
        
        print(f"Downloading {file_url}...")

        try:
            # Download the .csv.gz file in memory
            with urllib.request.urlopen(file_url) as response:
                compressed_file = BytesIO(response.read())

            # Read the gzip file into a Pandas DataFrame
            with gzip.GzipFile(fileobj=compressed_file, mode="rb") as f:
                df = pd.read_csv(f)

            # Convert DataFrame to CSV in memory (without index)
            csv_buffer = StringIO()
            df.to_csv(csv_buffer, index=False)

            # Convert to BytesIO for GCS upload
            csv_bytes = BytesIO(csv_buffer.getvalue().encode("utf-8"))

            # Define GCS file path (without .gz)
            gcs_file_path = f"{taxi_type}/{year}/{taxi_type}_tripdata_{year}-{month_str}.csv"
            
            # Upload to GCS
            upload_to_gcs(bucket_name, gcs_file_path, csv_bytes)

        except urllib.error.HTTPError as e:
            print(f"Failed to download {file_url} - HTTPError: {e.code}")
        except urllib.error.URLError as e:
            print(f"Failed to download {file_url} - URLError: {e.reason}")
        except Exception as e:
            print(f"Error processing {file_url}: {str(e)}")

# Run for all years and taxi types
"""for taxi_type, github_tag in taxi_types.items():
    for year in years:
        process_and_upload(year, taxi_type, github_tag)"""


process_and_upload(2020, "yellow", "yellow")



ðŸ“¥ Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-01.csv.gz...


  df = pd.read_csv(f)


âœ… Uploaded to GCS: gs://rimsha-kestra/yellow/2020/yellow_tripdata_2020-01.csv
ðŸ“¥ Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-02.csv.gz...


  df = pd.read_csv(f)


âœ… Uploaded to GCS: gs://rimsha-kestra/yellow/2020/yellow_tripdata_2020-02.csv
ðŸ“¥ Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-03.csv.gz...


  df = pd.read_csv(f)


âœ… Uploaded to GCS: gs://rimsha-kestra/yellow/2020/yellow_tripdata_2020-03.csv
ðŸ“¥ Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-04.csv.gz...


  df = pd.read_csv(f)


âœ… Uploaded to GCS: gs://rimsha-kestra/yellow/2020/yellow_tripdata_2020-04.csv
ðŸ“¥ Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-05.csv.gz...


  df = pd.read_csv(f)


âœ… Uploaded to GCS: gs://rimsha-kestra/yellow/2020/yellow_tripdata_2020-05.csv
ðŸ“¥ Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-06.csv.gz...


  df = pd.read_csv(f)


âœ… Uploaded to GCS: gs://rimsha-kestra/yellow/2020/yellow_tripdata_2020-06.csv
ðŸ“¥ Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-07.csv.gz...


  df = pd.read_csv(f)


âœ… Uploaded to GCS: gs://rimsha-kestra/yellow/2020/yellow_tripdata_2020-07.csv
ðŸ“¥ Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-08.csv.gz...


  df = pd.read_csv(f)


âœ… Uploaded to GCS: gs://rimsha-kestra/yellow/2020/yellow_tripdata_2020-08.csv
ðŸ“¥ Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-09.csv.gz...


  df = pd.read_csv(f)


âœ… Uploaded to GCS: gs://rimsha-kestra/yellow/2020/yellow_tripdata_2020-09.csv
ðŸ“¥ Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-10.csv.gz...


  df = pd.read_csv(f)


âœ… Uploaded to GCS: gs://rimsha-kestra/yellow/2020/yellow_tripdata_2020-10.csv
ðŸ“¥ Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-11.csv.gz...


  df = pd.read_csv(f)


âœ… Uploaded to GCS: gs://rimsha-kestra/yellow/2020/yellow_tripdata_2020-11.csv
ðŸ“¥ Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-12.csv.gz...


  df = pd.read_csv(f)


âœ… Uploaded to GCS: gs://rimsha-kestra/yellow/2020/yellow_tripdata_2020-12.csv
