In [1]:
import io
import os
import requests
import pandas as pd
from google.cloud import storage

In [None]:
!ls 

In [2]:
os.environ['GOOGLE_APPLICATION_CREDENTIALS']="D:/Data_Engineering_Zoomcamp_2024/data-engineering-zoomcamp/cohorts/2024/01-docker-terraform/1_terraform_gcp/keys/my-creds.json"

In [3]:
init_url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/'

In [4]:
BUCKET = os.environ.get("GCP_GCS_BUCKET", "cloud_bucket_dbt")

In [None]:
def fhv_taxi_schema():
    return {
        'dispatching_base_num': 'string',
        'PUlocationID': 'Int64',
        'DOlocationID': 'Int64',
        'SR_Flag': 'string',
        'Affiliated_base_number': 'string',
    }

In [5]:
parse_dates = ['pickup_datetime','dropoff_datetime']

In [6]:
def upload_to_gcs(bucket, object_name, local_file):
    """
    Ref: https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python
    """
    # # WORKAROUND to prevent timeout for files > 6 MB on 800 kbps upload speed.
    # # (Ref: https://github.com/googleapis/python-storage/issues/74)
    storage.blob._MAX_MULTIPART_SIZE = 5 * 1024 * 1024  # 5 MB
    storage.blob._DEFAULT_CHUNKSIZE = 5 * 1024 * 1024  # 5 MB

    client = storage.Client()
    bucket = client.bucket(bucket)
    blob = bucket.blob(object_name)
    blob.upload_from_filename(local_file)

In [9]:
def web_to_gcs(year, service):
    for i in range(12):
        
        # sets the month part of the file_name string
        month = '0'+str(i+1)
        month = month[-2:]
        print(i)

            
        # csv file_name
        file_name = f"{service}_tripdata_{year}-{month}.csv.gz"

        # download it using requests via a pandas df
        request_url = f"{init_url}{service}/{file_name}"
        r = requests.get(request_url)
        open(file_name, 'wb').write(r.content)
        print(f"Local: {file_name}")

        # read it back into a parquet file
        try:
            df = pd.read_csv(file_name, compression='gzip', parse_dates=parse_dates)
            df['PUlocationID'] = df['PUlocationID'].fillna(0)
            df['DOlocationID'] = df['DOlocationID'].fillna(0)
            df['SR_Flag'] = df['SR_Flag'].fillna(0)
            try:
                df = df.rename(columns={'dropOff_datetime': 'dropoff_datetime'})
            except Exception as e:
                df = df.rename(columns={'dropoff_datetime': 'dropoff_datetime'})
        except Exception as e:
            df = pd.read_csv(file_name, compression='gzip',parse_dates=parse_dates, encoding='ISO-8859-1')
            df['PUlocationID'] = df['PUlocationID'].fillna(0)
            df['DOlocationID'] = df['DOlocationID'].fillna(0)
            df['SR_Flag'] = df['SR_Flag'].fillna(0)
            try:
                df = df.rename(columns={'dropOff_datetime': 'dropoff_datetime'})
            except Exception as e:
                df = df.rename(columns={'dropoff_datetime': 'dropoff_datetime'})

        
        file_name = file_name.replace('.csv.gz', '.parquet')
        df.to_parquet(file_name, engine='pyarrow')
        print(f"Parquet: {file_name}")

        # upload it to gcs 
        upload_to_gcs(BUCKET, f"fhv_new/{file_name}", file_name)
        print(f"GCS: {service}/{file_name}")

In [10]:
web_to_gcs('2019', 'fhv')

0


KeyboardInterrupt: 

In [None]:
taxi_dtypes = {
                    'dispatching_base_num' :string ,
                    'PULocationID':pd.Int64Dtype(), 
                    'DOLocationID':pd.Int64Dtype(), 
                    'SR_Flag':string
                  }

In [None]:
file_name = f"fhv_tripdata_2020-06.csv.gz"

# download it using requests via a pandas df
request_url = f"{init_url}fhv/{file_name}"
r = requests.get(request_url)
open(file_name, 'wb').write(r.content)
print(f"Local: {file_name}")

# read it back into a parquet file
try:
    df = pd.read_csv(file_name, compression='gzip', parse_dates=parse_dates)
    df[['dispatching_base_num', 'SR_Flag']] = df[['SR_Flag', 'experience']].astype(str)
    df[['PULocationID', 'DOLocationID']] = df[['PULocationID', 'DOLocationID']].fillna(0).astype('int64')
    print(df.info())
except Exception as e:
    df = pd.read_csv(file_name, compression='gzip',dtype=fhv_taxi_schema(),parse_dates=parse_dates, encoding='ISO-8859-1')
    print(df.info())
# file_name = file_name.replace('.csv.gz', '.parquet')
# df.to_parquet(file_name, engine='pyarrow')
# print(f"Parquet: {file_name}"

In [None]:
web_to_gcs('2021', 'fhv')

In [None]:
# upload_to_gcs(BUCKET, 'fhv/fhv_tripdata_2019-01.parquet', 'fhv_tripdata_2019-01.parquet')