In [None]:
!gsutil rm -r gs://stage-nelson/retail_db_parquet

In [None]:
!gsutil ls gs://stage-nelson/

In [34]:
import glob
import os

In [35]:
def get_file_names(src_base_dir):
    items = glob.glob(f'{src_base_dir}/**', recursive=True)
    return list(filter(lambda item: os.path.isfile(item) and item.endswith('part-00000'), items))


In [36]:
src_base_dir = '../../data/retail_db'

In [None]:
get_file_names(src_base_dir)

In [38]:
import json

In [None]:
schemas = json.load(open('../../data/retail_db/schemas.json'))
schemas

In [None]:
schemas['orders']

In [None]:
sorted(schemas['orders'], key=lambda col: col['column_position'])

In [None]:
ds_schema = sorted(schemas['orders'], key=lambda col: col['column_position'])
ds_schema

In [None]:
[col['column_name'] for col in ds_schema]

In [44]:
def get_column_names(schemas_file, ds_name):
    schemas = json.load(open(schemas_file))
    ds_schema = sorted(schemas[ds_name], key=lambda col: col['column_position'])
    columns = [col['column_name'] for col in ds_schema]
    return columns

In [None]:
get_column_names('../../data/retail_db/schemas.json', 'orders')

In [None]:
for ds in [
    'departments', 'categories', 'products',
    'customers', 'orders', 'order_items'
]:
    column_names = get_column_names('../../data/retail_db/schemas.json', ds)
    print(f'''columns for {ds} are {','.join(column_names)}''')

In [47]:
import pandas as pd

In [48]:
src_base_dir = '../../data/retail_db'
schemas_file = '../../data/retail_db/schemas.json'
bucket = 'stage-nelson'
files = get_file_names(src_base_dir)

In [None]:
file = files[0]
file

In [None]:
'/'.join(file.split('/')[-2:])

In [None]:
ds_name = file.split('/')
ds_name

In [None]:
columns = get_column_names(schemas_file, ds_name)
columns

In [None]:
src_base_dir = '../../data/retail_db'
tgt_base_dir = 'retail_db_parquet'
schemas_file = '../../data/retail_db/schemas.json'
bucket = 'stage-nelson'
files = get_file_names(src_base_dir)
for file in files:
    print(f'Uploading file {file}')
    blob_suffix = '/'.join(file.split('/')[-2:])
    ds_name = file.split('/')[-2]
    blob_name = f'gs://{bucket}/{tgt_base_dir}/{blob_suffix}.snappy.parquet'
    columns = get_column_names(schemas_file, ds_name)
    df = pd.read_csv(file, names=columns)
    df.to_parquet(blob_name, index=False)

In [None]:
!gsutil ls -r gs://stage-nelson/retail_db_parquet

In [None]:
pd.read_csv('../../data/retail_db/orders/part-00000', header=None)

In [None]:
pd.read_parquet('gs://stage-nelson/retail_db_parquet/orders/part-00000.snappy.parquet')

In [None]:
for ds in [
    'departments', 'categories', 'products',
    'customers', 'orders', 'order_items'
]:
    df = pd.read_csv(f'../../data/retail_db/{ds}/part-00000', header=None)
    print(f'''Shape of {ds} in local files system is {df.shape}''')

In [None]:
for ds in [
    'departments', 'categories', 'products',
    'customers', 'orders', 'order_items'
]:
    df = pd.read_parquet(f'gs://{bucket}/{tgt_base_dir}/{ds}/part-00000.snappy.parquet')
    print(f'''Shape of {ds} in gcs is {df.shape}''')