In [1]:
# Load modules

from google.cloud import bigquery
from google.oauth2 import service_account

import zipfile
import io
import os

import csv

import pandas as pd
import pandas_gbq


In [2]:
# Show all columns in pandas data frame

pd.set_option("display.max_columns", None)

In [3]:
# Establish pathway to Google BigQuery (GBQ) account

service_path = "/Users/Owner/Google Drive/MSBA/Classes/bmkt_670_applied_data_analytics/gbq_key/"
service_file = 'evident-catcher-327918-fa366e7e71f5.json'
gbq_proj_id = 'evident-catcher-327918'
gbq_dataset_id = 'wedge_master'

private_key = service_path + service_file

In [4]:
# Get credentials
credentials = service_account.Credentials.from_service_account_file(service_path + service_file)

# Creat client to connect with GBQ
client = bigquery.Client(credentials = credentials, project=gbq_proj_id)

In [21]:
# Generating a list of folders in the main .zip file
# Just for reference

location = 'WedgeZipOfZips.zip'

with zipfile.ZipFile(location) as z:
    # Create list of all folders in zip file
    fold_list = z.namelist()

fold_list

In [11]:
# Confirming that there is only one .csv in each .zip
# Code adapted from here:
# https://unix.stackexchange.com/questions/239898/listing-files-from-nested-zip-files-without-extracting

location = 'WedgeZipOfZips.zip'

def uz(f, parent=[]):

    result = []
    try:
        zf = zipfile.ZipFile(f)
        for e in zf.namelist():
            path=parent+[e]
            if e.lower().endswith(".zip"):
                result += uz(io.BytesIO(zf.open(e).read()), path)
            else:
                result.append("/".join(path))

    except Exception as ex:
        return result

    return result

print("\n".join(uz(open(location, "rb"))))

transArchive_201001_201003.zip/transArchive_201001_201003.csv
transArchive_201004_201006.zip/transArchive_201004_201006.csv
transArchive_201007_201009.zip/transArchive_201007_201009.csv
transArchive_201010_201012.zip/transArchive_201010_201012.csv
transArchive_201101_201103.zip/transArchive_201101_201103.csv
transArchive_201104.zip/transArchive_201104.csv
transArchive_201105.zip/transArchive_201105.csv
transArchive_201106.zip/transArchive_201106.csv
transArchive_201107_201109.zip/transArchive_201107_201109.csv
transArchive_201110_201112.zip/transArchive_201110_201112.csv
transArchive_201201_201203.zip/transArchive_201201_201203.csv
transArchive_201201_201203_inactive.zip/transArchive_201201_201203_inactive.csv
transArchive_201204_201206.zip/transArchive_201204_201206.csv
transArchive_201204_201206_inactive.zip/transArchive_201204_201206_inactive.csv
transArchive_201207_201209.zip/transArchive_201207_201209.csv
transArchive_201207_201209_inactive.zip/transArchive_201207_201209_inactive.

In [12]:
# Create list of all file paths
# Ended up not being needed, but preserved for posterity

file_ext_list = uz(open(location, "rb"))

full_ext_list = []

for name in file_ext_list:
    full_ext_list.append(location + "/"+ name)

full_ext_list

In [5]:
# Establish the schema of columns and variable types for the tables in GBQ

wedge_schema = [{"name": "datetime", "type": "TIMESTAMP"},
{"name": "register_no", "type": "FLOAT"},
{"name": "emp_no", "type": "FLOAT"},
{"name": "trans_no", "type": "FLOAT"},
{"name": "upc", "type": "STRING"},
{"name": "description", "type": "STRING"},
{"name": "trans_type", "type": "STRING"},
{"name": "trans_subtype", "type": "STRING"},
{"name": "trans_status", "type": "STRING"},
{"name": "department", "type": "FLOAT"},
{"name": "quantity", "type": "FLOAT"},
{"name": "Scale", "type": "FLOAT"},
{"name": "cost", "type": "FLOAT"},
{"name": "unitPrice", "type": "FLOAT"},
{"name": "total", "type": "FLOAT"},
{"name": "regPrice", "type": "FLOAT"},
{"name": "altPrice", "type": "FLOAT"},
{"name": "tax", "type": "FLOAT"},
{"name": "taxexempt", "type": "FLOAT"},
{"name": "foodstamp", "type": "FLOAT"},
{"name": "wicable", "type": "FLOAT"},
{"name": "discount", "type": "FLOAT"},
{"name": "memDiscount", "type": "FLOAT"},
{"name": "discountable", "type": "FLOAT"},
{"name": "discounttype", "type": "FLOAT"},
{"name": "voided", "type": "FLOAT"},
{"name": "percentDiscount", "type": "FLOAT"},
{"name": "ItemQtty", "type": "FLOAT"},
{"name": "volDiscType", "type": "FLOAT"},
{"name": "volume", "type": "FLOAT"},
{"name": "VolSpecial", "type": "FLOAT"},
{"name": "mixMatch", "type": "FLOAT"},
{"name": "matched", "type": "FLOAT"},
{"name": "memType", "type": "BOOLEAN"},
{"name": "staff", "type": "BOOLEAN"},
{"name": "numflag", "type": "FLOAT"},
{"name": "itemstatus", "type": "FLOAT"},
{"name": "tenderstatus", "type": "FLOAT"},
{"name": "charflag", "type": "STRING"},
{"name": "varflag", "type": "FLOAT"},
{"name": "batchHeaderID", "type": "BOOLEAN"},
{"name": "local", "type": "FLOAT"},
{"name": "organic", "type": "FLOAT"},
{"name": "display", "type": "BOOLEAN"},
{"name": "receipt", "type": "FLOAT"},
{"name": "card_no", "type": "FLOAT"},
{"name": "store", "type": "FLOAT"},
{"name": "branch", "type": "FLOAT"},
{"name": "match_id", "type": "FLOAT"},
{"name": "trans_id", "type": "FLOAT"}
]

In [6]:
# Clean each .csv, convert it to pandas data frame, and upload data frame to GBQ

# The name of the main .zip file in which all of the sub-zip files reside
location = 'WedgeZipOfZips.zip'

# Each dataframe has these standardized column names assigned to them below, even if they already have headers
col_heads = ["datetime", "register_no", "emp_no", "trans_no", "upc", "description", 
             "trans_type", "trans_subtype", "trans_status", "department", "quantity", 
             "Scale", "cost", "unitPrice", "total", "regPrice", "altPrice", "tax", 
             "taxexempt", "foodstamp", "wicable", "discount", "memDiscount", "discountable", 
             "discounttype", "voided", "percentDiscount", "ItemQtty", "volDiscType", 
             "volume", "VolSpecial", "mixMatch", "matched", "memType", "staff", "numflag", 
             "itemstatus", "tenderstatus", "charflag", "varflag", "batchHeaderID", 
             "local", "organic", "display", "receipt", "card_no", "store", "branch", 
             "match_id", "trans_id"]

# Open main zip file
with zipfile.ZipFile(location) as z:
    # Create list of all folders in zip file
    fold_list = z.namelist()
    # Loop through each file in zip
    for name in fold_list:
        # Print .zip name to display which file is currently in progress for monitoring purposes
        print(f"Starting on {name}.")
        # Each of the 2nd level .zip files has a single file in it with the same name (different file extension)
        # Using the .zip file name, generate names for .csv file and table name for GBQ
        file_name = name.replace(".zip", ".csv")
        my_table = name.replace(".zip", "")
        table_full_name = ".".join([gbq_proj_id,gbq_dataset_id,my_table])
        # Create and ensure emptiness of data frame
        df = pd.DataFrame()
        df = df[0:0]
        # Open 2nd level zip file
        with z.open(name) as z2:
            z2_filedata = io.BytesIO(z2.read())
            # Open 3rd level zipped .csv
            with zipfile.ZipFile(z2_filedata) as nested_zip:
                temp = nested_zip.open(file_name)
                # This code was used to establish whether or not a quote character was needed for the below
                # data frame creation. Ended up being superfluous/counterproductive, but preserved just in case.
                #first_line = temp.readline().decode("utf-8")
                #if first_line[0] == '"':
                #    quotechar_input = '"'
                #else:
                #    quotechar_input = None
                #
                # Determine the delimiter
                dialect = csv.Sniffer().sniff(sample = temp.readline().decode("utf-8"),
                                      delimiters = [",",";","\t"])
                delim = dialect.delimiter
                # Determine if .csv has headers, used below
                # Status of 0 means use the 0 index row of .csv as a header
                # Status None means do not use any rows as header
                if "datetime" in first_line:
                    header_status = 0
                else:
                    header_status = None
                # Close .csv
                temp.close()
                # Write temporary .csv file to be converted to data frame.
                # It should theoretically be possible to get the data into a pandas data frame
                # without this step, but I couldn't root out the errors that kept cropping up
                # Adding this step made it work
                pd_input = nested_zip.open(file_name).read().decode("utf-8")
                with open ("clean.csv", 'w') as out_file:
                    out_file.write(pd_input)
                # Create pandas dataframe from temporary .csv, specifying header status and delimiter 
                df = pd.read_csv(open("clean.csv"), header = header_status, sep = delim)
                # Assign column names from list above. This makes the headers uniform regardless of
                # whether the source .csv had a header or not.
                df.columns = col_heads
                # Some of the source .csv's have null values that GBQ cannot interpret for columns
                # that have a type Boolean, creating errors. By casting these columns as Boolean in
                # data frame, they take on values that GBQ can interpret
                cols_to_bool = ["memType", "staff", "batchHeaderID", "display"]
                df[cols_to_bool] = df[cols_to_bool].astype('bool')
                # Some of the source .csv's also have null values marked as \N. This throws errors for
                # GBQ for some column types, so this step eliminates them
                df = df.replace({r"\N": None})
                # Close file
                nested_zip.close()
        # Upload data frame to GBQ
        df.to_gbq(destination_table = table_full_name, 
                  project_id = gbq_proj_id, 
                  chunksize = 500000, 
                  if_exists = "append", 
                  table_schema = wedge_schema,
                  credentials = credentials)
        # Print .csv name to display which file just finished being uploaded, for monitoring purposes
        print(f"Done with {file_name}.")
# When all loops are completed, and all .csv's have been dealt with, delete the temp .csv that was used above
os.remove("clean.csv")

Starting on transArchive_201001_201003.zip.


6it [10:45, 107.61s/it]


Done with transArchive_201001_201003.csv.
Starting on transArchive_201004_201006.zip.


7it [12:10, 104.29s/it]


Done with transArchive_201004_201006.csv.
Starting on transArchive_201007_201009.zip.


6it [11:15, 112.51s/it]


Done with transArchive_201007_201009.csv.
Starting on transArchive_201010_201012.zip.


  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,
6it [11:20, 113.34s/it]


Done with transArchive_201010_201012.csv.
Starting on transArchive_201101_201103.zip.


6it [11:15, 112.52s/it]


Done with transArchive_201101_201103.csv.
Starting on transArchive_201104.zip.


3it [04:00, 80.29s/it]


Done with transArchive_201104.csv.
Starting on transArchive_201105.zip.


3it [03:57, 79.03s/it]


Done with transArchive_201105.csv.
Starting on transArchive_201106.zip.


2it [03:49, 114.69s/it]


Done with transArchive_201106.csv.
Starting on transArchive_201107_201109.zip.


7it [11:16, 96.63s/it] 


Done with transArchive_201107_201109.csv.
Starting on transArchive_201110_201112.zip.


7it [11:39, 99.87s/it] 


Done with transArchive_201110_201112.csv.
Starting on transArchive_201201_201203.zip.


6it [11:55, 119.31s/it]


Done with transArchive_201201_201203.csv.
Starting on transArchive_201201_201203_inactive.zip.


  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,
1it [00:57, 57.31s/it]


Done with transArchive_201201_201203_inactive.csv.
Starting on transArchive_201204_201206.zip.


  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,
7it [11:46, 100.98s/it]


Done with transArchive_201204_201206.csv.
Starting on transArchive_201204_201206_inactive.zip.


1it [00:56, 56.07s/it]


Done with transArchive_201204_201206_inactive.csv.
Starting on transArchive_201207_201209.zip.


6it [10:51, 108.51s/it]


Done with transArchive_201207_201209.csv.
Starting on transArchive_201207_201209_inactive.zip.


  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,
1it [00:59, 59.23s/it]


Done with transArchive_201207_201209_inactive.csv.
Starting on transArchive_201210_201212.zip.


6it [12:08, 121.44s/it]


Done with transArchive_201210_201212.csv.
Starting on transArchive_201210_201212_inactive.zip.


1it [00:52, 52.05s/it]


Done with transArchive_201210_201212_inactive.csv.
Starting on transArchive_201301_201303.zip.


6it [11:17, 112.88s/it]


Done with transArchive_201301_201303.csv.
Starting on transArchive_201301_201303_inactive.zip.


1it [00:35, 35.69s/it]


Done with transArchive_201301_201303_inactive.csv.
Starting on transArchive_201304_201306.zip.


7it [12:36, 108.05s/it]


Done with transArchive_201304_201306.csv.
Starting on transArchive_201304_201306_inactive.zip.


1it [00:33, 33.64s/it]


Done with transArchive_201304_201306_inactive.csv.
Starting on transArchive_201307_201309.zip.


6it [11:19, 113.30s/it]


Done with transArchive_201307_201309.csv.
Starting on transArchive_201307_201309_inactive.zip.


1it [00:55, 55.22s/it]


Done with transArchive_201307_201309_inactive.csv.
Starting on transArchive_201310_201312.zip.


  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,
6it [10:52, 108.79s/it]


Done with transArchive_201310_201312.csv.
Starting on transArchive_201310_201312_inactive.zip.


1it [00:21, 21.34s/it]


Done with transArchive_201310_201312_inactive.csv.
Starting on transArchive_201401_201403.zip.


6it [11:38, 116.39s/it]


Done with transArchive_201401_201403.csv.
Starting on transArchive_201401_201403_inactive.zip.


1it [00:14, 14.95s/it]


Done with transArchive_201401_201403_inactive.csv.
Starting on transArchive_201404_201406.zip.


7it [12:08, 104.05s/it]


Done with transArchive_201404_201406.csv.
Starting on transArchive_201404_201406_inactive.zip.


1it [00:15, 15.70s/it]


Done with transArchive_201404_201406_inactive.csv.
Starting on transArchive_201407_201409.zip.


7it [12:00, 102.97s/it]


Done with transArchive_201407_201409.csv.
Starting on transArchive_201407_201409_inactive.zip.


1it [00:20, 20.84s/it]


Done with transArchive_201407_201409_inactive.csv.
Starting on transArchive_201410_201412.zip.


6it [11:51, 118.59s/it]


Done with transArchive_201410_201412.csv.
Starting on transArchive_201410_201412_inactive.zip.


1it [00:04,  4.68s/it]


Done with transArchive_201410_201412_inactive.csv.
Starting on transArchive_201501_201503.zip.


7it [11:58, 102.63s/it]


Done with transArchive_201501_201503.csv.
Starting on transArchive_201504_201506.zip.


7it [11:58, 102.69s/it]


Done with transArchive_201504_201506.csv.
Starting on transArchive_201507_201509.zip.


7it [11:53, 101.94s/it]


Done with transArchive_201507_201509.csv.
Starting on transArchive_201510.zip.


3it [04:08, 82.98s/it]


Done with transArchive_201510.csv.
Starting on transArchive_201511.zip.


  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,
2it [03:25, 102.96s/it]


Done with transArchive_201511.csv.
Starting on transArchive_201512.zip.


2it [03:34, 107.49s/it]


Done with transArchive_201512.csv.
Starting on transArchive_201601.zip.


2it [03:28, 104.26s/it]


Done with transArchive_201601.csv.
Starting on transArchive_201602.zip.


2it [03:08, 94.10s/it]


Done with transArchive_201602.csv.
Starting on transArchive_201603.zip.


2it [03:26, 103.38s/it]


Done with transArchive_201603.csv.
Starting on transArchive_201604.zip.


2it [03:10, 95.44s/it]


Done with transArchive_201604.csv.
Starting on transArchive_201605.zip.


2it [03:19, 99.74s/it]


Done with transArchive_201605.csv.
Starting on transArchive_201606.zip.


2it [03:11, 95.65s/it]


Done with transArchive_201606.csv.
Starting on transArchive_201607.zip.


2it [03:24, 102.43s/it]


Done with transArchive_201607.csv.
Starting on transArchive_201608.zip.


2it [03:08, 94.03s/it]


Done with transArchive_201608.csv.
Starting on transArchive_201609.zip.


2it [03:05, 92.71s/it]


Done with transArchive_201609.csv.
Starting on transArchive_201610.zip.


2it [03:36, 108.02s/it]


Done with transArchive_201610.csv.
Starting on transArchive_201611.zip.


2it [03:17, 98.74s/it]


Done with transArchive_201611.csv.
Starting on transArchive_201612.zip.


2it [03:45, 112.61s/it]


Done with transArchive_201612.csv.
Starting on transArchive_201701.zip.


2it [03:47, 113.64s/it]

Done with transArchive_201701.csv.





In [7]:
# Run this whenever the above has finished, whether it was successful or not, to ensure file closure

try:
    temp.close()
    pd_input.close()

except:
    pass
