<h1>Imports and Initializations</h1>
<ul>
<li><b>dataset_id</b> is the id of the google bigQuery database</li>
<li><b>table</b> is the id of the google biqQuery database table</li>
<li><b>bicket_name</b> is the name of the google bucket</li>
<li><b>root</b> is the name of the google bucket location, "data" is the directory with entire dataset, where as "data_sample" only has 80 data points for each class.</li>
</ul>
<p style="color:blue">Its never a good idea to save blobs in any database, rather we prefer to save only the reference of blobs in database, same approach is taken here, The actual data will stored in google bucket but will be handle using biqQuery database</p>

In [1]:
import os
import time
import pathlib
import threading

from datetime import datetime
from ipywidgets import IntProgress

from google.cloud import storage
from google.cloud import bigquery

dataset_id = 'dspd_aftabkhalil_dataset'
table_id = 'sounds_sample'

bucket_name = "dspd_aftabkhalil_bucket"

#Actual data was uploded fron the './data' folder, but to reduce submission size now only data_sample is available
#to check the working of code
root = './data_sample'

<h1>Create biqQuery dataset</h1>

In [3]:
def create_dataset(dataset_id):
    bigquery_client = bigquery.Client()
    
    datasets = bigquery_client.list_datasets()
    dataset = next((d for d in datasets if d.dataset_id == dataset_id), None)

    if(dataset != None):
        print(f'Dataset already exixts {bigquery_client.project}.{dataset.dataset_id}')
        return dataset
    else:
        dataset_full_id = f'{bigquery_client.project}.{dataset_id}'
        dataset_object = bigquery.Dataset(dataset_full_id)
        dataset = bigquery_client.create_dataset(dataset_object, timeout = 30)
        print(f'Created dataset {bigquery_client.project}.{dataset.dataset_id}')
    return dataset

_ = create_dataset(dataset_id)

Dataset already exixts myfirstproject-305412.dspd_aftabkhalil_dataset


<h1>Create table</h1>

In [4]:
def create_table(dataset_id, table_id, schema):
    bigquery_client = bigquery.Client()
    dataset_full_id = f'{bigquery_client.project}.{dataset_id}'
    
    tables = bigquery_client.list_tables(dataset_full_id)
    table = next((t for t in tables if t.table_id == table_id), None)
    if(table != None):
        print(f'Table already exixts {dataset_full_id}.{table.table_id}')
        return table
    else:
        table_full_id = f'{dataset_full_id}.{table_id}'
        table_obj = bigquery.Table(table_full_id, schema = schema)
        table = bigquery_client.create_table(table_obj)
        print(f'Created table {dataset_full_id}.{table.table_id}')
        return table
              
table_schema = [
    bigquery.SchemaField("type", "STRING", mode = "REQUIRED"),
    bigquery.SchemaField("name", "STRING", mode = "REQUIRED"),
    bigquery.SchemaField("location", "STRING", mode = "REQUIRED"),
]

table = create_table(dataset_id, table_id, table_schema)

Created table myfirstproject-305412.dspd_aftabkhalil_dataset.sounds_sample


<h1>Function to insert data in biqQuery table</h1>

In [6]:
bigquery_client = bigquery.Client()

def insert_row_into_table(dataset_id, table_id, resource_type, name, location):
    rows = [{u"type": resource_type, u"name": name, u"location": location}]
    table_full_id = f'{bigquery_client.project}.{dataset_id}.{table_id}'
    bigquery_client.insert_rows_json(table_full_id, rows)

<h1>Create google bucket</h1>

In [7]:
def create_or_get_bucket(bucket_name):
    
    #Create storage client
    storage_client = storage.Client()
    
    #Get already existsing buckets
    buckets = list(storage_client.list_buckets())
    
    #Check if required bucket already exists
    bucket = next((b for b in buckets if b.name == bucket_name), None)
    
    #If bucket already exists retuen it
    if(bucket != None):
        print(f'Bucket already exixts {bucket.name} in {bucket.location} with storage class {bucket.storage_class}')
        return bucket
    #Else create and return bucket
    else:
        bucket = storage_client.bucket(bucket_name)
        new_bucket = storage_client.create_bucket(bucket, location="us")
        print(f'Created bucket {new_bucket.name} in {new_bucket.location} with storage class {new_bucket.storage_class}')
        return new_bucket

_ = create_or_get_bucket(bucket_name)

Bucket already exixts dspd_aftabkhalil_bucket in US with storage class STANDARD


<h1>Function to upload blob to google bucket and insert entry in bigQuery table</h1>
<p>Note that if a file with same name already exists in bucket, it will be uploaded iff force_upload is set to True</p>

In [13]:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)

def upload_blob(root, resource_type, resource_name, force_upload = False):
    resource_full_path = f'{root}/{resource_type}/{resource_name}'
    blob = bucket.blob(resource_full_path)
    if(force_upload or not blob.exists()):
        blob.upload_from_filename(resource_full_path)
        insert_row_into_table(dataset_id, table_id, resource_type, resource_name, resource_full_path)

<h1>Lets Upload!</h1>
<p>We will upload the data via multithreaded uproach<p>

In [12]:
threads = list()
def upload_dataset(root_folder, retry = True):
    max_count = len(list(root_folder.glob('*/*.wav')))
    
    print(f'There are a total of {max_count} files in dataset.')    
    uploadBar = IntProgress(min = 0, max = max_count)
    display(uploadBar)
    
    for dir in os.listdir(root_folder):
        for file in os.listdir(os.path.join(root_folder, dir)):
            t = threading.Thread(target = upload_blob, args = (root_folder, dir, file))
            t.start()
            threads.append(t)   
            uploadBar.value += 1
            
            #Wait between threads, or api call limit will be reached
            if retry:
                time.sleep(0.01)
            
    for t in threads:
        t.join()
        
data_dir = pathlib.Path(root)          
upload_dataset(data_dir)
print('Upload complete')

There are a total of 2400 files in dataset.


IntProgress(value=0, max=2400)

Upload complete


<h1>Please wait until the above cell prints Upload complete.. 🐍</h1>

<hr><hr>