In [62]:
import io
import os
import requests
import pandas as pd
import pyarrow
from google.cloud import storage
from os.path import exists
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "SET YOUR CREDENTIALS"


In [63]:
%%bash
rm *.parquet
rm *.csv

In [64]:

"""
Pre-reqs: 
1. `pip install pandas pyarrow google-cloud-storage`
2. Set GOOGLE_APPLICATION_CREDENTIALS to your project/service-account key
3. Set GCP_GCS_BUCKET as your bucket or change default value of BUCKET
"""

# services = ['fhv','green','yellow']
init_url = 'https://nyc-tlc.s3.amazonaws.com/trip+data/'
BUCKET = os.environ.get("GCP_GCS_BUCKET", "SET-YOUR-BUCKKET")


def upload_to_gcs(bucket, object_name, local_file):
    """
    Ref: https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python
    """
    # # WORKAROUND to prevent timeout for files > 6 MB on 800 kbps upload speed.
    # # (Ref: https://github.com/googleapis/python-storage/issues/74)
    # storage.blob._MAX_MULTIPART_SIZE = 5 * 1024 * 1024  # 5 MB
    # storage.blob._DEFAULT_CHUNKSIZE = 5 * 1024 * 1024  # 5 MB

    client = storage.Client()
    bucket = client.bucket(bucket)
    blob = bucket.blob(object_name)
    blob.upload_from_filename(local_file)


def web_to_gcs(year, service):
    for i in range(12):
        month = str(i+1).rjust(2,"0")
        
        file_name = service + '_tripdata_' + year + '-' + month + '.csv'

        if not exists(file_name):
          request_url = init_url + file_name
          df = pd.read_csv(request_url)
          df.to_csv(file_name, index=False)
        else:
          df = pd.read_csv(file_name)
        
        file_name = file_name.replace('.csv', '.parquet')
        df.to_parquet(file_name, engine='pyarrow', index=False)
        print(f"Parquet: {file_name}")
        upload_to_gcs(BUCKET, f"{service}/{file_name}", file_name)
        print(f"GCS: {service}/{file_name}")



In [65]:
%%time
web_to_gcs('2019', 'green')


Parquet: green_tripdata_2019-01.parquet
GCS: green/green_tripdata_2019-01.parquet
Parquet: green_tripdata_2019-02.parquet
GCS: green/green_tripdata_2019-02.parquet
Parquet: green_tripdata_2019-03.parquet
GCS: green/green_tripdata_2019-03.parquet
Parquet: green_tripdata_2019-04.parquet
GCS: green/green_tripdata_2019-04.parquet
Parquet: green_tripdata_2019-05.parquet
GCS: green/green_tripdata_2019-05.parquet
Parquet: green_tripdata_2019-06.parquet
GCS: green/green_tripdata_2019-06.parquet


  """Entry point for launching an IPython kernel.


Parquet: green_tripdata_2019-07.parquet
GCS: green/green_tripdata_2019-07.parquet
Parquet: green_tripdata_2019-08.parquet
GCS: green/green_tripdata_2019-08.parquet
Parquet: green_tripdata_2019-09.parquet
GCS: green/green_tripdata_2019-09.parquet
Parquet: green_tripdata_2019-10.parquet
GCS: green/green_tripdata_2019-10.parquet
Parquet: green_tripdata_2019-11.parquet
GCS: green/green_tripdata_2019-11.parquet
Parquet: green_tripdata_2019-12.parquet
GCS: green/green_tripdata_2019-12.parquet
CPU times: user 1min 14s, sys: 4.31 s, total: 1min 18s
Wall time: 1min 41s


In [66]:
%time web_to_gcs('2020', 'green')


  """Entry point for launching an IPython kernel.


Parquet: green_tripdata_2020-01.parquet
GCS: green/green_tripdata_2020-01.parquet
Parquet: green_tripdata_2020-02.parquet
GCS: green/green_tripdata_2020-02.parquet
Parquet: green_tripdata_2020-03.parquet
GCS: green/green_tripdata_2020-03.parquet
Parquet: green_tripdata_2020-04.parquet
GCS: green/green_tripdata_2020-04.parquet
Parquet: green_tripdata_2020-05.parquet
GCS: green/green_tripdata_2020-05.parquet
Parquet: green_tripdata_2020-06.parquet
GCS: green/green_tripdata_2020-06.parquet
Parquet: green_tripdata_2020-07.parquet
GCS: green/green_tripdata_2020-07.parquet
Parquet: green_tripdata_2020-08.parquet
GCS: green/green_tripdata_2020-08.parquet
Parquet: green_tripdata_2020-09.parquet
GCS: green/green_tripdata_2020-09.parquet
Parquet: green_tripdata_2020-10.parquet
GCS: green/green_tripdata_2020-10.parquet
Parquet: green_tripdata_2020-11.parquet
GCS: green/green_tripdata_2020-11.parquet
Parquet: green_tripdata_2020-12.parquet
GCS: green/green_tripdata_2020-12.parquet
CPU times: user 

In [67]:
%time web_to_gcs('2019', 'yellow')


Parquet: yellow_tripdata_2019-01.parquet
GCS: yellow/yellow_tripdata_2019-01.parquet
Parquet: yellow_tripdata_2019-02.parquet
GCS: yellow/yellow_tripdata_2019-02.parquet
Parquet: yellow_tripdata_2019-03.parquet
GCS: yellow/yellow_tripdata_2019-03.parquet
Parquet: yellow_tripdata_2019-04.parquet
GCS: yellow/yellow_tripdata_2019-04.parquet
Parquet: yellow_tripdata_2019-05.parquet
GCS: yellow/yellow_tripdata_2019-05.parquet
Parquet: yellow_tripdata_2019-06.parquet
GCS: yellow/yellow_tripdata_2019-06.parquet


  """Entry point for launching an IPython kernel.


Parquet: yellow_tripdata_2019-07.parquet
GCS: yellow/yellow_tripdata_2019-07.parquet
Parquet: yellow_tripdata_2019-08.parquet
GCS: yellow/yellow_tripdata_2019-08.parquet
Parquet: yellow_tripdata_2019-09.parquet
GCS: yellow/yellow_tripdata_2019-09.parquet
Parquet: yellow_tripdata_2019-10.parquet
GCS: yellow/yellow_tripdata_2019-10.parquet
Parquet: yellow_tripdata_2019-11.parquet
GCS: yellow/yellow_tripdata_2019-11.parquet
Parquet: yellow_tripdata_2019-12.parquet
GCS: yellow/yellow_tripdata_2019-12.parquet
CPU times: user 16min 19s, sys: 1min 3s, total: 17min 23s
Wall time: 19min 13s


In [68]:
%time web_to_gcs('2020', 'yellow')


  """Entry point for launching an IPython kernel.


Parquet: yellow_tripdata_2020-01.parquet
GCS: yellow/yellow_tripdata_2020-01.parquet
Parquet: yellow_tripdata_2020-02.parquet
GCS: yellow/yellow_tripdata_2020-02.parquet
Parquet: yellow_tripdata_2020-03.parquet
GCS: yellow/yellow_tripdata_2020-03.parquet
Parquet: yellow_tripdata_2020-04.parquet
GCS: yellow/yellow_tripdata_2020-04.parquet
Parquet: yellow_tripdata_2020-05.parquet
GCS: yellow/yellow_tripdata_2020-05.parquet
Parquet: yellow_tripdata_2020-06.parquet
GCS: yellow/yellow_tripdata_2020-06.parquet
Parquet: yellow_tripdata_2020-07.parquet
GCS: yellow/yellow_tripdata_2020-07.parquet
Parquet: yellow_tripdata_2020-08.parquet
GCS: yellow/yellow_tripdata_2020-08.parquet
Parquet: yellow_tripdata_2020-09.parquet
GCS: yellow/yellow_tripdata_2020-09.parquet
Parquet: yellow_tripdata_2020-10.parquet
GCS: yellow/yellow_tripdata_2020-10.parquet
Parquet: yellow_tripdata_2020-11.parquet
GCS: yellow/yellow_tripdata_2020-11.parquet
Parquet: yellow_tripdata_2020-12.parquet
GCS: yellow/yellow_tripd