This notebook handles the tasks of Phase 1, getting the data out of the zip files and uploaded to GBQ. Data is in a folder of 53 zip files on my machine.

In [None]:
import os
import zipfile 
import io
import csv
from google.cloud import bigquery
from google.oauth2 import service_account

from wedge_functions import * # bring in our functions

# If this is true, we delete the clean files after we've uplaoded them. 
clean_up_clean_files = True

clean_out_gbq = True # change to true to empty the dataset, false requires a starting point below

Let's do all our GBQ set-up so that piece is ready.

In [None]:
# These first two values will be different on your machine. 
service_path = "C:\\users\\jchan\\dropbox\\teaching\\"
service_file = 'UMT-MSBA-7b4265df0ca4.json' # this is your authentication information  
gbq_proj_id = 'umt-msba'  # change this to your project_id
gbq_dataset_id = 'wedge_transactions' # and change this to your data set ID

credentials = service_account.Credentials.from_service_account_file(service_path + service_file)
client = bigquery.Client(credentials = credentials, project=gbq_proj_id)

In [None]:
# Use this to decide where to start back up. 
if clean_out_gbq :
    clean_out_gbq_tables(client,gbq_proj_id,gbq_dataset_id)    
else :
    next_table_to_fill = "transArchive_201401_201403"

Set up some constants and get our list of zip files. 

In [None]:
clean_file_location = "C:/users/jchan/dropbox/teaching/corporatepartners/wedge/data/holder/"
zip_location = "C:/users/jchan/dropbox/teaching/corporatepartners/wedge/data/WedgeZipOfZips/"

zip_files = sorted(os.listdir(zip_location))

In [None]:
# We didn't clean out the GBQ data set, so we're
# going to start in the middle of the data. This is not
# robust to reordering of files
have_hit_restart = clean_out_gbq 
# If clean out gbq is false, set our flag to false
# that tells us if we're ready to get back to work. 

for zf_idx, this_zip_file in enumerate(zip_files) :
    if ".zip" in this_zip_file :
        print("Working on " + this_zip_file)
        zf = zipfile.ZipFile("".join([zip_location,this_zip_file]))
        compressed_files = zf.filelist
        
        # Make sure we only have one file in the zip.
        assert(len(compressed_files) == 1)
        
        file_name = compressed_files[0]

        input_file = zf.open(file_name,'r')
        input_file = io.TextIOWrapper(input_file,encoding="utf-8")

        input_file_name = file_name.filename
        clean_file_name = input_file_name.replace(".csv","_clean.csv")
        table_name = input_file_name.replace(".csv","")

        # Now we check to see if we're supposed to process this table.
        if not have_hit_restart :
            if table_name == next_table_to_fill :
                have_hit_restart = True

                # delete this table so we can start from scratch
                full_table_name = ".".join([gbq_proj_id,
                                              gbq_dataset_id,
                                              table_name])
                
                if tbl_exists(client,full_table_name) :
                    client.delete_table(full_table_name, 
                                        not_found_ok=True)

                # and get rid of the csv
                if os.path.isfile(clean_file_location + clean_file_name) :
                    print("Deleting {} from clean file location.".format(clean_file_name))
                    os.unlink(clean_file_location + clean_file_name)
                
            else :
                # if we haven't hit the restart, just jump to the next 
                # "this_zip_file" in the list
                continue
        
        # Let's figure out the delimiter for this file
        dialect = csv.Sniffer().sniff(sample=input_file.readline(),
                                  delimiters=[",",";","\t"])

        # csv.Sniffer moves you into a file. you have to go 
        # back to the beginning to avoid missing any rows
        input_file.seek(0)

        # Let's learn about headers and quotes 
        line = input_file.readline()

        has_headers = "datetime" in line

        # Let's test to see if we have lots of quotes in the
        # file. If so, then there are quotes around each field, 
        # probably.
        #has_quotes = line.count('"') > 0
        # Note, we'll still have to do line checking on the quote stuff.

        # and back to the start
        input_file.seek(0)

        # If we have a header row, we'll skip it. We don't 
        # need this for GBQ. 
        if has_headers :
            next(input_file)

        with open(clean_file_location + clean_file_name,'w') as outfile :
            for idx, line in enumerate(input_file) :

                # Let's just get rid of the inconsistently used double quotes
                line = line.replace('"','')

                line = line.strip().split(dialect.delimiter)
                if len(line) > 50 :
                    # If len > 50, we have split the description into 
                    # more than one field, probably because it's something like
                    # "Fruit, Granola, Yogurt". This goofy line glues
                    # those together with ";" and puts the line back together
                    line = (line[:5] + 
                            [";".join(line[5:(len(line)-44)])] + 
                            line[(len(line)-44):])

                assert(len(line)==50)

                oline = ",".join([str(item) for item in line]) + "\n"

                # Let's get rid of NULLs in this string. 
                oline = oline.replace("NULL","")
                oline = oline.replace(r"\\N","")
                oline = oline.replace(r"\N","")
                # Note, order matters for those last two. r"\\N".replace(r"\N","") == r"\"
                # the "r" means the strings are raw, so "\n" doesn't become a return, for instance

                outfile.write(oline)

                if idx % 10000 == 0 :
                    print("On line {} in {}.".format(idx,input_file_name))

        input_file.close() 
        
        # The clean file is written out. Now we need to upload it to GBQ. 
        # Going to use some functions to make this a bit easier.
        job_config = get_upload_job_config()
                
        with open(clean_file_location + clean_file_name, "rb") as source_file:
            job = client.load_table_from_file(
                source_file,
                ".".join([gbq_proj_id,gbq_dataset_id,table_name]),
                location="US",  
                job_config=job_config,
            )  
                
        job.result() 
        
        print("Loaded {} rows into {}:{}.".format(job.output_rows, gbq_dataset_id, table_name))

        if clean_up_clean_files :
            print("Deleting {} from clean file location.".format(clean_file_name))
            os.unlink(clean_file_location + clean_file_name)
        
        zf.close()
        
#        if table_name == "transArchive_201106" :
#            break
        
        #if zf_idx > 2 :
        #      break
              


In [None]:
print("Done")