In [1]:
import pandas as pd
from google.cloud import storage
from google.api_core import page_iterator
from datetime import datetime, date, timedelta
import os

# Utility Function to List Out Google Clou Storage Directories for a Given File Path

In [4]:
def _item_to_value(iterator, item):
    return item

def list_directories(bucket_name, prefix):
    if prefix and not prefix.endswith('/'):
        prefix += '/'

    extra_params = {
        "projection": "noAcl",
        "prefix": prefix,
        "delimiter": '/'
    }

    gcs = storage.Client()

    path = "/b/" + bucket_name + "/o"

    iterator = page_iterator.HTTPIterator(
        client=gcs,
        api_request=gcs._connection.api_request,
        path=path,
        items_key='prefixes',
        item_to_value=_item_to_value,
        extra_params=extra_params,
    )

    return [x for x in iterator]

# Instantiate Google Cloud Storage Client and Resources

In [5]:
client = storage.Client()
bucket = client.get_bucket('entropy-keeper-transactions')

# Get List of Dates Where Txns Have Been Recorded

In [6]:
date_list = [x.strip("'raw/").strip('/') for x in list_directories('entropy-keeper-transactions', 'raw/')]

In [7]:
date_list

['2022-04-22',
 '2022-04-23',
 '2022-04-24',
 '2022-04-25',
 '2022-04-26',
 '2022-04-27',
 '2022-04-28',
 '2022-04-29',
 '2022-04-30',
 '2022-05-01']

# Loop Through Dates, Group Txns Together, and Write to Google Cloud Storage

In [None]:
for date in date_list:
    print(datetime.now(), 'Starting on {}'.format(date))
    df_daily_agg = pd.DataFrame()
    
    blobs = bucket.list_blobs(prefix='raw/'+date)
    
    for blob in blobs:
        file_path = 'gs://entropy-keeper-transactions/{}'.format(blob.name)

        df_blob = pd.read_parquet(file_path)
        
        df_daily_agg = pd.concat([df_daily_agg, df_blob], axis=0)
    
    df_daily_agg.to_parquet(date+'-daily-aggregation.parquet')
    
    print(datetime.now(), 'Uploading file to GCS...')
    blob = bucket.blob('daily/'+date+'/'+date+'-daily-aggregation.parquet')
    blob.upload_from_filename(date+'-daily-aggregation.parquet')

    print(datetime.now(), 'Deleting file from local memory')
    os.remove(date+'-daily-aggregation.parquet')