In [None]:
import os
import requests
import time
import gzip
from concurrent.futures import ThreadPoolExecutor
from google.cloud import storage

In [7]:
# 配置常量
CHUNK_SIZE = 8 * 1024 * 1024  
DOWNLOAD_DIR = "dbt"
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
BUCKET_NAME = "mimosa-de-zoomcamp-bucket"
CREDENTIALS_FILE = "key/key.json"  
client = storage.Client.from_service_account_json(CREDENTIALS_FILE) # 创建 GCS 客户端
bucket = client.bucket(BUCKET_NAME)  # 获取指定 bucket

BASE_URL = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_"
FILE_NAME = "fhv_tripdata_"

# 需要下载的年份和月份
YEARS = [2019, 2020, 2021]
MONTHS = [f"{i:02d}" for i in range(1, 13)]  # 月份 01 到 12

In [8]:
def verify_file_exists_local(file_path):
    """检查文件是否已存在于本地"""
    return os.path.exists(file_path)

def download_file(year, month):
    url = f"{BASE_URL}{year}-{month}.csv.gz"
    file_path = os.path.join(DOWNLOAD_DIR, f"{FILE_NAME}{year}-{month}.csv.gz")
    
    # 检查文件是否已经存在于本地
    if verify_file_exists_local(file_path):
        print(f"File {file_path} already exists locally. Skipping download.")
        return file_path

    try:
        print(f"Downloading {url}...")
        response = requests.get(url, stream=True, timeout=10)
        response.raise_for_status()  # 如果 HTTP 响应状态码 4xx/5xx，会抛出异常

        with open(file_path, "wb") as f:
            for chunk in response.iter_content(1024):
                f.write(chunk)

        print(f"Downloaded: {file_path}")
        return file_path
    except Exception as e:
        print(f"Failed to download {url}: {e}")
        return None



def verify_gcs_upload(blob_name):
    return storage.Blob(bucket=bucket, name=blob_name).exists(client)

def verify_file_exists(blob_name):
    """检查文件是否已存在于 GCS 上"""
    blob = bucket.blob(blob_name)
    return blob.exists()  # 返回 True 或 False

def upload_to_gcs(file_path, max_retries=3):
    # 将文件上传到 dbt 目录下
    blob_name = f"dbt/{os.path.basename(file_path)}"
    
    # 检查文件是否已存在
    if verify_file_exists(blob_name):
        print(f"File {blob_name} already exists in GCS. Skipping upload.")
        return

    blob = bucket.blob(blob_name)
    blob.chunk_size = CHUNK_SIZE  
    

        
    for attempt in range(max_retries):
        try:
            print(f"Uploading {file_path} to {BUCKET_NAME} (Attempt {attempt + 1})...")
            blob.upload_from_filename(file_path)
            print(f"Uploaded: gs://{BUCKET_NAME}/{blob_name}")
            
            if verify_gcs_upload(blob_name):
                print(f"Verification successful for {blob_name}")
                return
            else:
                print(f"Verification failed for {blob_name}, retrying...")
        except Exception as e:
            print(f"Failed to upload {file_path} to GCS: {e}")
        
        time.sleep(5)  
    
    print(f"Giving up on {file_path} after {max_retries} attempts.")


def decompress_gz_file(gz_file_path):
    """解压 .gz 文件到 .csv"""
    csv_file_path = gz_file_path.replace(".gz", "")  # .gz 文件转换为 .csv 文件
    try:
        with gzip.open(gz_file_path, 'rb') as f_in:
            with open(csv_file_path, 'wb') as f_out:
                f_out.write(f_in.read())
        print(f"Decompressed {gz_file_path} to {csv_file_path}")
        return csv_file_path
    except Exception as e:
        print(f"Failed to decompress {gz_file_path}: {e}")
        return None

def process_and_upload_file(year, month):
    # 下载文件
    gz_file_path = download_file(year, month)
    if not gz_file_path:
        return None

    # 解压文件
    csv_file_path = decompress_gz_file(gz_file_path)
    if not csv_file_path:
        return None
    
    # 上传到 GCS
    upload_to_gcs(csv_file_path)
    return csv_file_path

if __name__ == "__main__":
    with ThreadPoolExecutor(max_workers=4) as executor:
        # 处理下载、解压和上传过程
        file_paths = list(executor.map(lambda args: process_and_upload_file(args[0], args[1]), 
                                       [(year, month) for year in YEARS for month in MONTHS]))

    print("All files processed and verified.")

File dbt\fhv_tripdata_2019-01.csv.gz already exists locally. Skipping download.
File dbt\fhv_tripdata_2019-02.csv.gz already exists locally. Skipping download.
File dbt\fhv_tripdata_2019-03.csv.gz already exists locally. Skipping download.
File dbt\fhv_tripdata_2019-04.csv.gz already exists locally. Skipping download.
Decompressed dbt\fhv_tripdata_2019-03.csv.gz to dbt\fhv_tripdata_2019-03.csv
Decompressed dbt\fhv_tripdata_2019-02.csv.gz to dbt\fhv_tripdata_2019-02.csv
Decompressed dbt\fhv_tripdata_2019-04.csv.gz to dbt\fhv_tripdata_2019-04.csv
Uploading dbt\fhv_tripdata_2019-04.csv to mimosa-de-zoomcamp-bucket (Attempt 1)...
Uploading dbt\fhv_tripdata_2019-03.csv to mimosa-de-zoomcamp-bucket (Attempt 1)...
Uploading dbt\fhv_tripdata_2019-02.csv to mimosa-de-zoomcamp-bucket (Attempt 1)...
Decompressed dbt\fhv_tripdata_2019-01.csv.gz to dbt\fhv_tripdata_2019-01.csv
Uploading dbt\fhv_tripdata_2019-01.csv to mimosa-de-zoomcamp-bucket (Attempt 1)...
Uploaded: gs://mimosa-de-zoomcamp-bucket