In [3]:
import io
import os
import requests
import pandas as pd
from google.cloud import storage

"""
Pre-reqs: 
1. `pip install pandas pyarrow google-cloud-storage`
2. Set GOOGLE_APPLICATION_CREDENTIALS to your project/service-account key
3. Set GCP_GCS_BUCKET as your bucket or change default value of BUCKET
"""

# services = ['fhv','green','yellow']
init_url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/'
# switch out the bucketname
BUCKET = "ny-taxi-datalake-88"
os.environ['GOOGLE_APPLICATION_CREDENTIALS']='/home/drios/data-engineering-zoomcamp/dr_de_zoomcamp/04-analytics_engineering/my-creds.json'

fhv_dtypes = {
 'dispatching_base_num': str,
 'PUlocationID': pd.Int64Dtype(),
 'DOlocationID ': pd.Int64Dtype(),
 'SR_Flag': pd.Int64Dtype(),
 'Affiliated_base_number': str,
}

fhv_parse_dates = ['pickup_datetime', 'dropOff_datetime']

def upload_to_gcs(bucket, object_name, local_file):
    """
    Ref: https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python
    """
    # # WORKAROUND to prevent timeout for files > 6 MB on 800 kbps upload speed.
    # # (Ref: https://github.com/googleapis/python-storage/issues/74)
    storage.blob._MAX_MULTIPART_SIZE = 3 * 1024 * 1024  # 5 MB
    storage.blob._DEFAULT_CHUNKSIZE = 3 * 1024 * 1024  # 5 MB

    client = storage.Client()
    bucket = client.bucket(bucket)
    blob = bucket.blob(object_name)
    blob.upload_from_filename(local_file)


def web_to_gcs(year, service):
    for i in range(12):
        
        # sets the month part of the file_name string
        month = '0'+str(i+1)
        month = month[-2:]

        # csv file_name
        file_name = f"{service}_tripdata_{year}-{month}.csv.gz"

        # download it using requests via a pandas df
        request_url = f"{init_url}{service}/{file_name}"
        r = requests.get(request_url)
        open(file_name, 'wb').write(r.content)
        print(f"Local: {file_name}")

        # read it back into a parquet file
        df = pd.read_csv(file_name, compression='gzip', dtype=fhv_dtypes, parse_dates=fhv_parse_dates)
        df.fillna(0, inplace=True)
        df['Affiliated_base_number'] = df['Affiliated_base_number'].astype(str)
        df['DOlocationID'] = df['DOlocationID'].astype(int)
        df['dispatching_base_num'] = df['dispatching_base_num'].astype(str)
        file_name = file_name.replace('.csv.gz', '.parquet')
        df.to_parquet(file_name, engine='pyarrow')
        print(f"Parquet: {file_name}")

        # upload it to gcs 
        upload_to_gcs(BUCKET, f"{service}/{file_name}", file_name)
        print(f"GCS: {service}/{file_name}")


# web_to_gcs('2019', 'green')
# web_to_gcs('2020', 'green')
# web_to_gcs('2019', 'yellow')
# web_to_gcs('2020', 'yellow')
web_to_gcs('2019', 'fhv')

Local: fhv_tripdata_2019-01.csv.gz
Parquet: fhv_tripdata_2019-01.parquet
GCS: fhv/fhv_tripdata_2019-01.parquet
Local: fhv_tripdata_2019-02.csv.gz
Parquet: fhv_tripdata_2019-02.parquet
GCS: fhv/fhv_tripdata_2019-02.parquet
Local: fhv_tripdata_2019-03.csv.gz
Parquet: fhv_tripdata_2019-03.parquet
GCS: fhv/fhv_tripdata_2019-03.parquet
Local: fhv_tripdata_2019-04.csv.gz
Parquet: fhv_tripdata_2019-04.parquet
GCS: fhv/fhv_tripdata_2019-04.parquet
Local: fhv_tripdata_2019-05.csv.gz
Parquet: fhv_tripdata_2019-05.parquet
GCS: fhv/fhv_tripdata_2019-05.parquet
Local: fhv_tripdata_2019-06.csv.gz
Parquet: fhv_tripdata_2019-06.parquet
GCS: fhv/fhv_tripdata_2019-06.parquet
Local: fhv_tripdata_2019-07.csv.gz
Parquet: fhv_tripdata_2019-07.parquet
GCS: fhv/fhv_tripdata_2019-07.parquet
Local: fhv_tripdata_2019-08.csv.gz
Parquet: fhv_tripdata_2019-08.parquet
GCS: fhv/fhv_tripdata_2019-08.parquet
Local: fhv_tripdata_2019-09.csv.gz
Parquet: fhv_tripdata_2019-09.parquet
GCS: fhv/fhv_tripdata_2019-09.parquet
L

In [1]:
pip install pandas pyarrow google-cloud-storage

Collecting pyarrow
  Downloading pyarrow-15.0.0-cp39-cp39-manylinux_2_28_x86_64.whl (38.3 MB)
[K     |████████████████████████████████| 38.3 MB 28 kB/s  eta 0:00:01     |████████████████▋               | 19.9 MB 450 kB/s eta 0:00:41
Installing collected packages: pyarrow
Successfully installed pyarrow-15.0.0
Note: you may need to restart the kernel to use updated packages.
