In [1]:
import os
import pandas as pd
import math
import threading
from google.cloud import bigquery
from tqdm.notebook import tqdm

In [2]:
PROJECT_ID = "integrated-bit-312717"
DATASET_NAME = 'customs'
TABLE_NAME = "customs_test3"

table_id = "{project_id}.{dataset_name}.{table_name}".format(project_id=PROJECT_ID, dataset_name=DATASET_NAME, 
            table_name=TABLE_NAME)

In [3]:
client = bigquery.Client(project=PROJECT_ID, location="US")

In [4]:
try:
    client.create_dataset('{dataset_name}'.format(dataset_name=DATASET_NAME))
except Exception as e:
    print(e)
else:
    print('Dataset {dataset_name} created'.format(dataset_name=DATASET_NAME))

409 POST https://bigquery.googleapis.com/bigquery/v2/projects/integrated-bit-312717/datasets?prettyPrint=false: Already Exists: Dataset integrated-bit-312717:customs


In [5]:
schema = [
    bigquery.SchemaField("country", "STRING"),
    bigquery.SchemaField("hs_code", "STRING"),
    bigquery.SchemaField("type", "STRING"),
    bigquery.SchemaField("year_month", "STRING"),
    bigquery.SchemaField("value", "INTEGER")
]

In [6]:
table = bigquery.Table(table_id, schema=schema)

In [7]:
try:
    client.create_table(table)
except Exception as e:
    print(e)
else:
    print("table created")

table created


In [8]:
base = "bucket/merge_dataset_customs"

In [9]:
main_dirs = []
category_dirs = [os.path.join(base, main_dir) for main_dir in os.listdir(base) if os.path.isdir(os.path.join(base, main_dir))]
for category_dir in category_dirs:
    main_dirs += [os.path.join(category_dir, year_dir) for year_dir in os.listdir(category_dir)]
main_dirs

['bucket/merge_dataset_customs/export/2001',
 'bucket/merge_dataset_customs/export/2002',
 'bucket/merge_dataset_customs/export/2003',
 'bucket/merge_dataset_customs/export/2004',
 'bucket/merge_dataset_customs/export/2005',
 'bucket/merge_dataset_customs/export/2006',
 'bucket/merge_dataset_customs/export/2007',
 'bucket/merge_dataset_customs/export/2008',
 'bucket/merge_dataset_customs/export/2009',
 'bucket/merge_dataset_customs/export/2010',
 'bucket/merge_dataset_customs/export/2011',
 'bucket/merge_dataset_customs/export/2012',
 'bucket/merge_dataset_customs/export/2013',
 'bucket/merge_dataset_customs/export/2014',
 'bucket/merge_dataset_customs/export/2015',
 'bucket/merge_dataset_customs/export/2016',
 'bucket/merge_dataset_customs/export/2017',
 'bucket/merge_dataset_customs/export/2018',
 'bucket/merge_dataset_customs/export/2019',
 'bucket/merge_dataset_customs/export/2020',
 'bucket/merge_dataset_customs/export/2021',
 'bucket/merge_dataset_customs/import/2001',
 'bucket/m

In [10]:
table_id

'integrated-bit-312717.customs.customs_test3'

In [11]:
def check_value(value):
    if math.isnan(value):
        return 0
    return int(value)

def tranform_merge_df2jsonBQ(file_path):
    df = pd.read_csv(file_path)
    metas = file_path.split('/')
    hs_code = metas[-1].split('.')[0]
    data_type = metas[-3]
    datas = []
    for row_id, row in df.iterrows():
        keys = list(row.keys())
        keys.remove('Country')
        datas += [
            {
                'country':row['Country'], 
                'hs_code':hs_code,
                'type':data_type,
                'year_month':key, 
                'value':check_value(row[key])
            } 
        for key in keys]
    return datas

def upload2bq_thread(main_path):
    paths = [os.path.join(main_path, file) for file in os.listdir(main_path)]
    for path in tqdm(paths):
        rows_to_insert = tranform_merge_df2jsonBQ(path)
        if rows_to_insert != []:
            try:
                errors = client.insert_rows_json(table_id, rows_to_insert)
            except Exception as e:
                bq_errs.append(e)
            else:
                if errors != []:
                    bq_errs.append(errors)

# Parallel upload to BQ

In [None]:
t=[]
bq_errs = []
for main_path in main_dirs:
    x = threading.Thread(target=upload2bq_thread, args=(main_path,))
    x.start()
    t.append(x)
for th_id, thread in enumerate(t):
    print("Main    : before joining thread {}".format(th_id))
    thread.join()
    print("Main.   : thread {} done".format(th_id))

Main    : before joining thread 0


  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]

  0%|          | 0/1253 [00:00<?, ?it/s]

  0%|          | 0/1254 [00:00<?, ?it/s]