In [1]:
### IMPORTING THE NECESSARY LIBRARIES

import time
import uuid
import os
import os.path
from subprocess import Popen
from datetime import datetime
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
import pandas as pd
import glob
import sys
import subprocess

In [2]:
### DEFINING THE REQUIRED FUNCTIONS

#function to instantiate Google Cloud credentials and bigquery service

def create_service(svc_type='bigquery'):
    credentials = GoogleCredentials.get_application_default()
    return discovery.build(svc_type, 'v2', credentials=credentials)

In [3]:
# function to poll job untill it is complete

def poll_job(bigquery, job):
    print('Waiting for job to finish...')

    request = bigquery.jobs().get(
        projectId=job['jobReference']['projectId'],
        jobId=job['jobReference']['jobId'])

    while True:
        result = request.execute(num_retries=2)

        if result['status']['state'] == 'DONE':
            if 'errorResult' in result['status']:
                raise RuntimeError(result['status']['errorResult'])
            print('Job complete.')
            return

        time.sleep(1)

In [4]:
import subprocess
# function to download the necessary csv files
            

def download_from_storage(uri , dest_dir):

        # Downloads all files in a Google cloud storage bucket/directory to a local destination 
        #
        # Inputs: uri - string - URI of the google cloud storage bucket that contains the target files
        #          dest_dir - string - absolute path of the directory in which the files should be placed
        #
        # Outputs: Boolean True if download is successful; False O.W.
        #
        # NOTE: This method is open to code injection and should be re-written to eliminate that risk
        #
        #        This method requires that the user's .boto configuration file limit the number of threads. The file
        #            is located in: 
        #                C:\Users\ --LDAP_ID-- \AppData\Roaming\gcloud\legacy_credentials\ --firstname_lastname@homedepot.com-- \
        #            
        #            The file should contain the following lines:
        #                [GSUtil]
        #                parallel_thread_count = 3
        #
        from datetime import datetime
        from os import stat, mkdir, path

        command = "gsutil -m cp " + uri +' .'#-m
        #args = split(commmand)

        print("Pull beginning at " + str(datetime.now()))

        directory = path.dirname(dest_dir)

        try:
            stat(directory)
        except:
            mkdir(directory) 

        p = Popen(command, shell = True, cwd = dest_dir)#args, )
        p.wait()
        
        return "Pull completed at " + str(datetime.now())



In [5]:
#function to clear the directory in the cloud storage
def clear_storage_dir(uri, conf = True):
    # Clears all files from the specified Cloud Storage directory
    # # CAUTION: All actions are irreversible; ensure your URI is correct
    # #
    # # Inputs: uri - string - URI of the directory to be cleared
    # #
    # # Outputs: Boolean True if BAT successfully run; False O.W.
    # #
    
    command = 'gsutil rm -r ' + uri+' .'
    print(command)
    if conf:
        confirmation = input('You are about to delete the following URI:\n' + uri +"\nPlease type 'DELETE' to confirm:")
        
        if confirmation!='DELETE':
            print("Delete aborted")
            return False
    
    p = Popen(command, shell = True)
    return p.wait()


In [6]:
#function to download the multiple csv files as one DataFrame in python

def fil_cons_csv(path):
        allFiles = glob.glob(path + "*.csv")
        frame = pd.DataFrame()
        list_ = []
        for file_ in allFiles:
            df = pd.read_csv(file_)
            list_.append(df)
        frame = pd.concat(list_,ignore_index=True)
        return frame


In [7]:
#function to upload the csv file to cloud storage
    
def upload_csv(project_id,filepath,wrkg_dir,filename=None,out_fmt=None):
    if filename == None:
        rpl = wrkg_dir+'/'
        filename = filepath.replace(rpl,'')
    else:
        if out_fmt == None:
            filename = filename + '.csv'
        else:
            filename = filename + '.' + out_fmt
    bucket = 'sc-lab-data/PXJ06AA'
    gs_uri = 'gs://'+bucket+'/'+filename   
    print(gs_uri)
    #cmd = 'gsutil -o GSUtil:parallel_composite_upload_threshold=150M cp '+filepath+' '+gs_uri
    prcss = Popen(['gsutil','-o','GSUtil:parallel_composite_upload_threshold=150M','cp',filepath,gs_uri],bufsize=-1,shell=True)
    prcss.wait()
    if prcss.returncode == 0:
        os.remove(filepath)
        prcss.terminate()
        return gs_uri
    else:
        prcss.terminate()
        return 'Failed'


In [8]:
#function to load table from cloud to big query
    
def load_table(bigquery, project_id, dataset_id, table_name,
               source_path,schema,write_mode=None):
    if write_mode == None:
        wmode = 'WRITE_TRUNCATE'
    elif write_mode.lower() == 'wa':
        wmode = 'WRITE_APPEND'
    elif write_mode.lower() == 'we':
        wmode = 'WRITE_EMPTY'
    else:
        wmode = 'WRITE_TRUNCATE'
    job_data = {
        'jobReference': {
            'projectId': project_id,
            'jobId': str(uuid.uuid4())
        },
        'configuration': {
            'load': {
                'sourceUris': [source_path],
                'schema': {
                    'fields': schema
                },
                'createDisposition':'CREATE_IF_NEEDED',
                'writeDisposition' : wmode,
                'fieldDelimiter':',',
                'skipLeadingRows': 1,
                'quote':'~',
                'destinationTable': {
                    'projectId': project_id,
                    'datasetId': dataset_id,
                    'tableId': table_name
                },
                'ignoreUnknownValues':True
            }
        }
    }

    return bigquery.jobs().insert(
        projectId=project_id,
        body=job_data).execute(num_retries=5)

In [9]:
#function to get schema

def get_bq_table_schema(bigquery,dataset_id,table_id,project_id = 'analytics-supplychain-thd'):
    


    request = bigquery.tables().get(projectId = project_id,datasetId = dataset_id, tableId = table_id)
    results = request.execute(num_retries = 100)
    
    return results['schema']['fields']

In [10]:
### STARTING THE DATA TRANSFER FROM BIG QUERY TO PANDAS DATAFRAMES  
# transferring data from big_query to cloud storage

def fetch_data():
    try:
        clear_storage_dir(dest_bucket+'/*', conf = False)
    except:
        var = None
    job_data1 = {
            'jobReference':{
                'projectId':'analytics-supplychain-thd',
                'jobId':str(uuid.uuid4())
                },
            'configuration':{
                'extract':{
                    'sourceTable': {
                        'projectId':bq_project
                        ,'datasetId':bq_target_dataset
                        ,'tableId':bq_target_table
                    }
                    ,'destinationUri':dest_bucket + '/' + dest_fn
                }
            }
        }

    bq_conn = create_service()
    export_job = bq_conn.jobs().insert(projectId='analytics-supplychain-thd',body=job_data1).execute(num_retries=5)
    #download the data into csv files
    poll_job(bq_conn,export_job)

    download_from_storage(dest_bucket+"/*", download_dir)      
    clear_storage_dir(dest_bucket,False)

    #reading the csv files
    print("data fetch start - "+ str(datetime.now()))
    print('reading the data from the csv file')
    DATA=fil_cons_csv(download_dir)
    #DATA=DATA[DATA['CLASS']==3]
    print("data fetch complete - "+ str(datetime.now()))
    return DATA
    

In [11]:
#### INPUT DIRECTORIES AND PARAMETERS

file_name=['ASL_SSTK_DMND_NULLS_WEB_FILE0','ASL_SSTK_DMND_NULLS_WEB_FILE1','ASL_SSTK_DMND_NULLS_WEB_FILE2','ASL_SSTK_DMND_NULLS_WEB_FILE3','ASL_SSTK_DMND_NULLS_WEB_FILE4']
upload_file=['/Users/CXL6PMD/Documents/ASL_Override/Data/UPLOAD_FILE/WEB/upload_file0.csv','/Users/CXL6PMD/Documents/ASL_Override/Data/UPLOAD_FILE/WEB/upload_file1.csv','/Users/CXL6PMD/Documents/ASL_Override/Data/UPLOAD_FILE/WEB/upload_file2.csv','/Users/CXL6PMD/Documents/ASL_Override/Data/UPLOAD_FILE/WEB/upload_file3.csv','/Users/CXL6PMD/Documents/ASL_Override/Data/UPLOAD_FILE/WEB/upload_file4.csv']
# details of the big query table
for i in range(5):
    bq_table= file_name[i]
    bq_project='analytics-supplychain-thd'
    bq_target_dataset='PXJ06AA_ASL2'
    bq_target_table =file_name[i]
    now = datetime.now()
    bucket_now = now.strftime('%Y%m%d%H%M%S%f')
    
    #details of the google cloud storage bucket
    
    dest_bucket = 'gs://sc-lab-data/PXJ06AA/'+bq_table+'__'+bucket_now
    dest_fn = 'OPTDATA_csv_*.csv'
    
    #details to download the optimization data into the scratch disk
    
    scratch_disk = '/Users/CXL6PMD/Documents'
    
    download_dir = scratch_disk + '/ASL_Override/Data/BQTOPYTHON_OPT_'+bucket_now+'/'
    project_id='analytics-supplychain-thd'
    
    DATA = fetch_data()
    DATA.to_csv(upload_file[i], index=False)


gsutil rm -r gs://sc-lab-data/PXJ06AA/ASL_SSTK_DMND_NULLS_WEB_FILE0__20200102094042064889/* .
Waiting for job to finish...
Job complete.
Pull beginning at 2020-01-02 09:41:45.551020
gsutil rm -r gs://sc-lab-data/PXJ06AA/ASL_SSTK_DMND_NULLS_WEB_FILE0__20200102094042064889 .
data fetch start - 2020-01-02 09:42:06.361012
reading the data from the csv file
data fetch complete - 2020-01-02 09:42:14.205121
gsutil rm -r gs://sc-lab-data/PXJ06AA/ASL_SSTK_DMND_NULLS_WEB_FILE1__20200102094330130759/* .
Waiting for job to finish...
Job complete.
Pull beginning at 2020-01-02 09:44:44.569778
gsutil rm -r gs://sc-lab-data/PXJ06AA/ASL_SSTK_DMND_NULLS_WEB_FILE1__20200102094330130759 .
data fetch start - 2020-01-02 09:45:16.414710
reading the data from the csv file
data fetch complete - 2020-01-02 09:45:25.022212
gsutil rm -r gs://sc-lab-data/PXJ06AA/ASL_SSTK_DMND_NULLS_WEB_FILE2__20200102094642817507/* .
Waiting for job to finish...
Job complete.
Pull beginning at 2020-01-02 09:47:59.750044
gsutil rm 

In [12]:
#### INPUT DIRECTORIES AND PARAMETERS

file_name=['ITEMLOC_SVLVL_SSTK_D30C22_OVERRIDE']
upload_file=['/Users/CXL6PMD/Documents/ASL_Override/Data/UPLOAD_FILE/WEB/upload_file5.csv']
# details of the big query table
for i in range(1):
    bq_table= file_name[i]
    bq_project='analytics-supplychain-thd'
    bq_target_dataset='CXL6PMD_SF_EFFECT'
    bq_target_table =file_name[i]
    now = datetime.now()
    bucket_now = now.strftime('%Y%m%d%H%M%S%f')
    
    #details of the google cloud storage bucket
    
    dest_bucket = 'gs://sc-lab-data/PXJ06AA/'+bq_table+'__'+bucket_now
    dest_fn = 'OPTDATA_csv_*.csv'
    
    #details to download the optimization data into the scratch disk
    
    scratch_disk = '/Users/CXL6PMD/Documents'
    
    download_dir = scratch_disk + '/ASL_Override/Data/BQTOPYTHON_OPT_'+bucket_now+'/'
    project_id='analytics-supplychain-thd'
    
    DATA = fetch_data()
    DATA.to_csv(upload_file[i], index=False)


gsutil rm -r gs://sc-lab-data/PXJ06AA/ITEMLOC_SVLVL_SSTK_D30C22_OVERRIDE__20200102095548143631/* .
Waiting for job to finish...
Job complete.
Pull beginning at 2020-01-02 09:56:12.950729
gsutil rm -r gs://sc-lab-data/PXJ06AA/ITEMLOC_SVLVL_SSTK_D30C22_OVERRIDE__20200102095548143631 .
data fetch start - 2020-01-02 09:56:21.486730
reading the data from the csv file
data fetch complete - 2020-01-02 09:56:22.902006
