In [3]:
import os
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from google.cloud import storage
import time

In [46]:
# [f"{i:02d}" for i in range(1, 13)] 
[f"{y:04d}-{m:02d}" for y in range(2019, 2021) for m in range(1, 13)] 

['2019-01',
 '2019-02',
 '2019-03',
 '2019-04',
 '2019-05',
 '2019-06',
 '2019-07',
 '2019-08',
 '2019-09',
 '2019-10',
 '2019-11',
 '2019-12',
 '2020-01',
 '2020-02',
 '2020-03',
 '2020-04',
 '2020-05',
 '2020-06',
 '2020-07',
 '2020-08',
 '2020-09',
 '2020-10',
 '2020-11',
 '2020-12']

In [72]:
#Change this to your bucket name
# BUCKET_NAME = "de-zoomcamp-449723-bucket_hw3"
BUCKET_NAME = "de-zoomcamp-449723-bucket-hw4"

#If you authenticated through the GCP SDK you can comment out these two lines
CREDENTIALS_FILE = "de-zoomcamp-449723-3ef7d21812be.json"
client = storage.Client.from_service_account_json(CREDENTIALS_FILE)

BASE_DIR = "yellow"
# BASE_DIR = "green"
# BASE_DIR = "fhv"

BASE_FILENAME = "yellow_tripdata_"
# BASE_FILENAME = "green_tripdata_"
# BASE_FILENAME = "fhv_tripdata_"
# BASE_FILETYPE = "parquet"
BASE_FILETYPE = "csv.gz"

# BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-"
# BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2019-"
# BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2020-"
# BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2019-"
# BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2020-"
# BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2019-"
# BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2020-"

BASE_URL = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{BASE_DIR}/{BASE_FILENAME}"

# MONTHS = [f"{i:02d}" for i in range(1, 7)] 
# MONTHS = [f"{i:02d}" for i in range(1, 13)] 
# MONTHS = [f"{i:02d}" for i in range(12, 13)] 
DATES = [f"{y:04d}-{m:02d}" for y in range(2019, 2021) for m in range(1, 13)] 
DOWNLOAD_DIR = "./downloads"

CHUNK_SIZE = 8 * 1024 * 1024  

os.makedirs(DOWNLOAD_DIR, exist_ok=True)

bucket = client.bucket(BUCKET_NAME)


In [59]:
# def download_file(month):
def download_file(yyyymm):
    # url = f"{BASE_URL}{month}.parquet"
    url = f"{BASE_URL}{yyyymm}.{BASE_FILETYPE}"

    # file_path = os.path.join(DOWNLOAD_DIR, f"yellow_tripdata_2024-{month}.parquet")
    # file_path = os.path.join(DOWNLOAD_DIR, f"yellow_tripdata_2019-{month}.parquet")
    # file_path = os.path.join(DOWNLOAD_DIR, f"yellow_tripdata_2020-{month}.parquet")
    # file_path = os.path.join(DOWNLOAD_DIR, f"green_tripdata_2019-{month}.parquet")
    # file_path = os.path.join(DOWNLOAD_DIR, f"green_tripdata_2020-{month}.parquet")
    # file_path = os.path.join(DOWNLOAD_DIR, f"fhv_tripdata_2019-{month}.parquet")
    # file_path = os.path.join(DOWNLOAD_DIR, f"fhv_tripdata_2020-{month}.parquet")
    
    file_path = os.path.join(DOWNLOAD_DIR, f"{BASE_FILENAME}{yyyymm}.{BASE_FILETYPE}")

    try:
        print(f"Downloading {url}...")
        urllib.request.urlretrieve(url, file_path)
        print(f"Downloaded: {file_path}")
        return file_path
    except Exception as e:
        print(f"Failed to download {url}: {e}")
        return None

In [7]:
def verify_gcs_upload(blob_name):
    return storage.Blob(bucket=bucket, name=blob_name).exists(client)

In [8]:
def upload_to_gcs(file_path, max_retries=3):
    blob_name = os.path.basename(file_path)
    blob = bucket.blob(blob_name)
    blob.chunk_size = CHUNK_SIZE  
    
    for attempt in range(max_retries):
        try:
            print(f"Uploading {file_path} to {BUCKET_NAME} (Attempt {attempt + 1})...")
            blob.upload_from_filename(file_path)
            print(f"Uploaded: gs://{BUCKET_NAME}/{blob_name}")
            
            if verify_gcs_upload(blob_name):
                print(f"Verification successful for {blob_name}")
                return
            else:
                print(f"Verification failed for {blob_name}, retrying...")
        except Exception as e:
            print(f"Failed to upload {file_path} to GCS: {e}")
        
        time.sleep(5)  
    
    print(f"Giving up on {file_path} after {max_retries} attempts.")

In [73]:
with ThreadPoolExecutor(max_workers=4) as executor:
    # file_paths = list(executor.map(download_file, MONTHS))
    file_paths = list(executor.map(download_file, DATES))


Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-01.csv.gz...
Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-02.csv.gz...
Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-03.csv.gz...
Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-04.csv.gz...
Downloaded: ./downloads/yellow_tripdata_2019-02.csv.gz
Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-05.csv.gz...
Downloaded: ./downloads/yellow_tripdata_2019-03.csv.gz
Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-06.csv.gz...
Downloaded: ./downloads/yellow_tripdata_2019-01.csv.gz
Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-07.csv.gz...
Downloaded: ./do

In [74]:

with ThreadPoolExecutor(max_workers=4) as executor:
    executor.map(upload_to_gcs, filter(None, file_paths))  # Remove None values

print("All files processed and verified.")

Uploading ./downloads/yellow_tripdata_2019-01.csv.gz to de-zoomcamp-449723-bucket-hw4 (Attempt 1)...
Uploading ./downloads/yellow_tripdata_2019-02.csv.gz to de-zoomcamp-449723-bucket-hw4 (Attempt 1)...
Uploading ./downloads/yellow_tripdata_2019-03.csv.gz to de-zoomcamp-449723-bucket-hw4 (Attempt 1)...
Uploading ./downloads/yellow_tripdata_2019-04.csv.gz to de-zoomcamp-449723-bucket-hw4 (Attempt 1)...
Uploaded: gs://de-zoomcamp-449723-bucket-hw4/yellow_tripdata_2019-02.csv.gz
Verification successful for yellow_tripdata_2019-02.csv.gz
Uploading ./downloads/yellow_tripdata_2019-05.csv.gz to de-zoomcamp-449723-bucket-hw4 (Attempt 1)...
Uploaded: gs://de-zoomcamp-449723-bucket-hw4/yellow_tripdata_2019-03.csv.gz
Verification successful for yellow_tripdata_2019-03.csv.gz
Uploading ./downloads/yellow_tripdata_2019-06.csv.gz to de-zoomcamp-449723-bucket-hw4 (Attempt 1)...
Uploaded: gs://de-zoomcamp-449723-bucket-hw4/yellow_tripdata_2019-01.csv.gz
Verification successful for yellow_tripdata_2019