# Part 1

## Building a Transaction Database in Google Big Query

Here, I will upload all clean Wedge transaction records to Google Big Query. 

In [1]:
# do imports for the code
from google.cloud import bigquery
from google.oauth2 import service_account
from zipfile import ZipFile, Path
import os
import io 
import csv
from shutil import rmtree

In [2]:
# feed in authentication info to gain credentials to my project and create a client to talk to GBQ
service_path = "/Users/austinsmith/Documents/Fall21/ADA/Wedge"
service_file = '/thewedge-austin-4c5ad634b17b.json' 
gbq_proj_id = 'thewedge-austin'  
gbq_dataset_id = 'the_wedge_A' 

private_key =service_path + service_file

credentials = service_account.Credentials.from_service_account_file(service_path + service_file)

client = bigquery.Client(credentials = credentials, project=gbq_proj_id)

In [3]:
# function to check if a table exists in gbq

def tbl_exists(client, table_ref):
    from google.cloud.exceptions import NotFound
    try:
        client.get_table(table_ref)
        return True
    except NotFound:
        return False

## Part 1: Extract the CSV Files from the Wedge Zip Files and Clean Them

Using the WedgeZipofZips as the "dirty" files

In [4]:
# make a directory to hold the clean files
if not os.path.isdir("clean_wedge_files") :
    os.mkdir("clean_wedge_files")
else :
    rmtree("clean_wedge_files")
    os.mkdir("clean_wedge_files")


# make current directory the file of dirty zip files - COME BACK TO THIS! 
# not sure how to point python to a whole list of zip files so it can iterate through themo
parent_directory = "/Users/austinsmith/Documents/Fall21/ADA/Wedge/"
output_directory = "/Users/austinsmith/Documents/Fall21/ADA/Wedge/clean_wedge_files/"
output_directory1 = "/Users/austinsmith/Documents/Fall21/ADA/Wedge/clean_wedge_files1/"

input_directory = "/Users/austinsmith/Documents/Fall21/ADA/Wedge/WedgeZipOfZips/"
zip_files = [zf for zf in os.listdir(input_directory) if ".zip" in zf]


In [5]:
# check for delimiters and headers 

delimiters = dict()
headers = dict()

for zip_file in zip_files :
    
    with ZipFile(input_directory + zip_file, 'r') as zf :
        zipped_files = zf.namelist()

        for file_name in zipped_files :
            input_file = zf.open(file_name, 'r')
            input_file = io.TextIOWrapper(input_file, encoding = "utf-8")
            
            first_line = input_file.readline()
            headers[file_name] = "datetime" in first_line

            dialect = csv.Sniffer().sniff(sample = input_file.readline(),
                                         delimiters = [",",";","\t"])

            delimiters[file_name] = dialect.delimiter

            #print(" ".join(["It looks like",
                          # file_name,
                           #"has delimter",
                           #dialect.delimiter,
                           #"."]))

            input_file.close()

In [7]:
# clean the files
# skip all the headers
# get rid of "" and null values
# split on tabs
# get rid of commas between yogurt, granola, and fruit 

for zip_file in zip_files :
    
    with ZipFile(input_directory + zip_file, 'r') as zf :
        zipped_files = zf.namelist()

        for file_name in zipped_files :
            clean_file = 'clean_' + file_name
            this_delimiter = delimiters[file_name]
            this_header = headers[file_name]
            input_file = zf.open(file_name, 'r')
            input_file = io.TextIOWrapper(input_file, encoding = "utf-8")
            
            # break
            
            
            with open (output_directory + clean_file, 'w') as outfile: # open the clean file for writing 
                if this_header :
                    next(input_file) # pass on every header line, so writing to file w/o header
            
            # process input file line by line
                rows_printed = 0
                for idx, line in enumerate(input_file) :
                    line = line.strip().split(this_delimiter) # gets rid of returns
                    line = [piece.replace('"','') for piece in line] # gets rid of ""
                    line = [piece.replace('\\N','') for piece in line] # gets rid of \\N
                    line = [piece.replace('NULL','') for piece in line] # gets rid of NULL
                    line = [piece.replace('null','') for piece in line] # gest rid of null
                    
                    # remove commas between yogurt, granola, and fruit
                    if len(line) != 50 :
                        new_column = (''.join(line[5:8]))
                        del line[5:8]
                        line.insert(5,new_column)            
            
            # write clean lines to clean file 
            # do clean files tab delimited
                    outfile.write('\t'.join(line) + '\n')
                    rows_printed +=1
            
            input_file.close()



    # break



## Part 2: Push the Clean Files to GQB

### Note: I uploaded the files manually, but the code below should (?) work. 

In [8]:
# access all of the clean csv files

clean_wedge_files = [file for file in os.listdir(output_directory) if file != ".DS_Store"]
clean_wedge_files1 = [file for file in os.listdir(output_directory1) if file != ".DS_Store"]

# get their names (without csv) so I can make tables for each of them in GBQ

clean_wedge_names = []

for file in clean_wedge_files :
    clean_wedge_names.append(file[:-4])

In [6]:
 # input code that allows me to modify tables
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job_config.schema_update_options = [
    bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION 
]

# input the schema for the tables
job_config.schema = [
    bigquery.SchemaField("datetime", "TIMESTAMP", mode="NULLABLE"),
    bigquery.SchemaField("register_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("emp_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("upc", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_type", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_subtype", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_status", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("department", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("quantity", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("Scale", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("cost", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("unitPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("total", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("regPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("altPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tax", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("taxexempt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("foodstamp", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("wicable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discountable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discounttype", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("voided", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("percentDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("ItemQtty", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volDiscType", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volume", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("VolSpecial", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("mixMatch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("matched", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memType", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("staff", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("numflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("itemstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tenderstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("charflag", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("varflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("batchHeaderID", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("local", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("organic", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("display", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("receipt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("card_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("store", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("branch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("match_id", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_id", "FLOAT", mode="NULLABLE"),
]
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 0 # because there are no headers in my csv files
job_config.field_delimiter = '\t' # split on tabs, so specified that here


In [10]:
# load the contents of each csv into the tables

for file in clean_wedge_files : # need to look to the csv files now, not the names 
    my_table = file[:-4] # tell it to look at name without .csv at the end 
    table_full_name = ".".join([gbq_proj_id,gbq_dataset_id,my_table]) # create table name for each file
    
    # check to see if table exists. If it doesn't, create an empty table
       
    if not tbl_exists(client, table_full_name) :
        table_ref = client.create_table(
            table = table_full_name
        )
    else :
        table_ref = client.get_table(table_full_name) # needs to be in same for loop as upload
        
        # note to self: go check gbq and make sure there are a bunch of empty tables there now
        # also double check with this print statement
    
    table = client.get_table(table_ref)
    print("Table {} contains {} columns".format(table_ref.table_id,len(table.schema)))
    
    # this is where the API request is created ... 
    with open(output_directory + file, "rb") as source_file:
        job = client.load_table_from_file(
            source_file,
            table_ref, 
            location="US", 
            job_config=job_config,
        ) 
    
        # check to see if it's working/worked
        job.result()  # Waits for table load to complete.
        print(
            "Loaded {} rows into {}:{}.".format(
                job.output_rows , 'test', table_ref.table_id
            )
        )

        # Checks the updated length of the schema
        table = client.get_table(table)
        print("Table {} now contains {} columns.".format(table_ref.table_id, len(table.schema)))

Table clean_transArchive_201511 contains 0 columns
Loaded 993744 rows into test:clean_transArchive_201511.
Table clean_transArchive_201511 now contains 50 columns.
Table clean_transArchive_201304_201306_inactive contains 0 columns
Loaded 137628 rows into test:clean_transArchive_201304_201306_inactive.
Table clean_transArchive_201304_201306_inactive now contains 50 columns.
Table clean_transArchive_201510 contains 0 columns
Loaded 1006055 rows into test:clean_transArchive_201510.
Table clean_transArchive_201510 now contains 50 columns.
Table clean_transArchive_201512 contains 0 columns
Loaded 960017 rows into test:clean_transArchive_201512.
Table clean_transArchive_201512 now contains 50 columns.
Table clean_transArchive_201310_201312 contains 0 columns


ConnectionError: ('Connection aborted.', timeout('The write operation timed out'))