# The Wedge

## Task 1: Building a Transaction Database in Google Big Query!


## Python Modules

In [1]:
import os
import io
import shutil
import re
import datetime 
import csv

import pandas as pd
import numpy as np
import pandas_gbq
import janitor

from zipfile import ZipFile # usually you'd do all these imports at the beginning

# Do our imports for the code
from google.cloud import bigquery
from google.oauth2 import service_account

# Python program to illustrate the concept
# of threading
# importing the threading module
import threading
import time

from multiprocessing.pool import ThreadPool as Pool

import multiprocessing

## Define Global Variables

In [2]:
# # Small File Sample
# zip_file_name = "WedgeZipOfZips_Small.zip"

## Full data Set
zip_file_name = "WedgeZipOfZips.zip"

# Clean data Set
# zip_file_name = "WedgeFiles_Clean.zip"

# Small Clean Data Set
# zip_file_name = "WedgeZipOfZips_Small_Clean.zip"

# Working Directory included in .gitignore
# working_directory = "/media/psf/Home/Repos/BMKT670.V60-72020-Fall2022-Wedge-Project/eggs/"
working_directory = "/home/blackvwgolf95/BMKT670.V60-72020-Fall2022-Wedge-Project/eggs/"

## Define Functions

In [3]:
def extract_zip(zf):
    # printing what's in the zip file.  
    # zf.printdir() 

    # extracting all the files 
    print('Extracting all the files now...') 
    # pick a folder name already in .gitignore
    
    # Instead of always extracting ALL, check if file exists first
    # zf.extractall(working_directory) 
    
    zipped_files = zf.namelist()
    # display_zip_contents(zipped_files)
    
    # Only extract files if they don't exist
    for file_name in zipped_files :
        
        # Ignore .DS_Store hidden files
        if(file_name.endswith( '.DS_Store' )):
            continue
            
        # Ignore __MACOSX hidden files
        if(file_name.startswith( '__' )):
            continue
        
        # Ignore folders
        if(file_name.endswith( '/' )):
            continue
        
        if os.path.exists(working_directory + file_name) :
            print("File Exists, skipping")
            print(file_name)
        else :
            print("Need to Extract")
            print(file_name)
            zf.extract(file_name, working_directory) 
        
        zip_files.append(file_name)
        
def extract_single_zip(zf):
    zipped_files = zf.namelist()
    # display_zip_contents(zipped_files)
    
    # Only extract files if they don't exist
    for file_name in zipped_files :
        
        # Ignore .DS_Store hidden files
        if(file_name.endswith( '.DS_Store' )):
            continue
            
        # Ignore __MACOSX hidden files
        if(file_name.startswith( '__' )):
            continue
        
        # Ignore folders
        if(file_name.endswith( '/' )):
            continue
        
        if os.path.exists(working_directory + file_name) :
            print("File Exists, skipping")
            print(file_name)
        else :
            print("Need to Extract")
            print(file_name)
            zf.extract(file_name, working_directory) 
        
        data_files.append(file_name)


def display_zip_contents(zipped_files):
    for file_name in zipped_files :
        # Ignore __MACOSX hidden files
        if(file_name.startswith( '__' )):
            continue
        # Ignore .DS_Store hidden files
        if(file_name.endswith( '.DS_Store' )):
            continue
        # Ignore folders
        if(file_name.endswith( '/' )):
            continue

        print("File: ", file_name," Size:", os.path.getsize(working_directory+file_name))

def display_file_contents(files):
    for file_name in files :
        # Ignore __MACOSX hidden files
        if(file_name.startswith( '__' )):
            continue
        # Ignore .DS_Store hidden files
        if(file_name.endswith( '.DS_Store' )):
            continue
        # Ignore folders
        if(file_name.endswith( '/' )):
            continue

        print("File: ", file_name," Size:", os.path.getsize(working_directory+file_name))

def get_delimiter(file_name) :
    # Get separator
    input_file = open(working_directory+file_name,'r')
    # input_file = io.TextIOWrapper(input_file,encoding="utf-8")
            
    dialect = csv.Sniffer().sniff(sample=input_file.readline(),
                                  delimiters=[",",";","\t"])
    delimiter = dialect.delimiter
    # delimiters[file_name] = dialect.delimiter

    #     print(" ".join(["It looks like",
    #                    file_name,
    #                    "has delimiter",
    #                    dialect.delimiter,
    #                    "."]))
    input_file.close() # tidy up
    return delimiter

def get_header(file_name) :
    
    with open(working_directory+file_name) as f:
        first_line = f.readline()
        # print(first_line)
        if first_line.startswith('datetime') :
            return 0
        if first_line.startswith('"datetime"') :
            return 0
        if first_line.startswith("'datetime'") :
            return 0
        else :
            return None

def upload_data(data):
    # https://stackoverflow.com/a/24083253
    grouped = data.groupby(pd.Grouper(freq='M'))
    for name, group in grouped:

        # Construct table name from index
        # table_name = "dram_items_"+reformat_date(name.strftime('%Y-%m-%d'))

        # 3. For each month in the file, subset the data to that month and 
        #    upload the data to a table called `dram_items_YYYYMM01`. 
        # table_id = ".".join([gbq_proj_id,dataset_id,table_name])
        # print(table_id)
        # pandas_gbq.to_gbq(item_lu, table_id, project_id=gbq_proj_id,if_exists="replace") # let's discuss this last bit
        print("Data Uploaded!")
        
def cleanup_data(data): 
    # Clean the names with the janitor package.
    data = janitor.clean_names(data)

#     for column in ( 'gross_sales', 'discounts', 'net_sales', 'tax' ):
#         # Convert the fields that have dollar signs (such as `gross_sales`) into numeric data. Watch out for dollar signs and commas.
#         data[column] = ( data[column]
#                            .str.replace("$", '', regex=False)
#                            .str.replace(",", '', regex=False)
#                            .astype(float) )

    # Change the type of the column `modifiers_applied` to string.
#     data['modifiers_applied'] = data['modifiers_applied'].astype(str)

    # Replace the `sku` column with a column of empty strings. 
#     data['sku'] = ''

    # print( item_lu.head() )
#     data.index = pd.to_datetime(data['date']) # ,format='%y-%m-%d'  
    return data


def process_file(file_name):
    # print(file_name)
    data = pd.read_csv(working_directory+file_name, low_memory=False)
  
    # 2. Do the same cleaning we did in Part 1 (clean names, 
    #    make sku an empty string, fix dollars, make modifiers_applied a string)
    data = cleanup_data(data)
    upload_data(data)


def data_columns():
    # https://www.geeksforgeeks.org/add-column-names-to-dataframe-in-pandas/
    columns = []
    # 1.	datetime: timestamp of the transaction-row creation   
    columns.append('datetime') # 
    # 2.	register_no: register for transaction
    columns.append('register_no') # 
    # 3.	emp_no: employee number for cashier   
    columns.append('emp_no') # 
    # 4.	trans_no: transaction number. This number counts up by day and is only unique when combined with date, columns.append('trans_no') # register and employee.
    columns.append('trans_no')
    # 5.	Upc: Universal Product Code for the item. 0 for non-items.
    columns.append('Upc') # 
    # 6.	description: product description. Includes things like Tax, Tender type, etc.   
    columns.append('description') # 
    # 7.	trans_type: One of five values (D, G, A, T, and I). These correspond to the following types of columns.append('trans_type') # transactions:
    # •	D: Departmental rings, when the cashier just selects a department for the item.
    # •	G: Green patch donations. This is the donation made for shoppers who bring their own bag. 
    # •	A: Tax
    # •	T: Tender, the payment row.
    # •	I: Items, but also includes discounts. 
    columns.append('trans_type')
    # 8.	trans_subtype: There are a lot of these. Key ones include methods of payment (CK for Check, CA for columns.append('trans_subtype') # Cash, CP for coupon, EF for EBT Food Stamps , WC for WIC). These are often blank for other trans_type values.
    columns.append('trans_subtype')
    # 9.	trans_status: An important field. The field trans_status tells us more about the types transactions. columns.append('trans_status') # Here are the possible values:
    # •	Blank: The typical value.
    # •	M: Member discounts.
    # •	V: Voids 
    # •	C: Coupons
    # •	0: Honestly, I think these are supposed to be blanks but they changed from 0s at some point in February 2010. 
    # •	R: Returns.
    # •	J: Juice club cards
    columns.append('trans_status')
    # 10.	department: The number of the department. See the next appendix for a department lookup table.
    columns.append('department') # 
    # 11.	quantity: The purchased quantity. Beware, some items such as flowers and bulk vegetables are priced per  # cent and then sold in very large quantities (like 1000 for a $10 bouquet.)   
    columns.append('quantity')
    # 12.	Scale: The reading on the scale. Note that the capital here is not a typo. This is one field that 
    columns.append('Scale') # weirdly has a capital first letter.    
    # 13.	cost: the per-unit cost of an item to the Wedge. This is not uniformly populated. 
    columns.append('cost') # 
    # 14.	unitPrice:  the per-unit cost of an item to an owner. Negative for things like returns and discounts.
    columns.append('unitPrice') # 
    # 15.	total: price times quantity. The cost of the line item. Note that this can be negative because columns.append('total') # unitPrice can be negative.     
    columns.append('total')
    # 16.	regPrice: The regular price of an item. May be different from unitPrice but unitPrice plus discount columns.append('regPrice') # should be regPrice.   
    columns.append('regPrice')
    # 17.	altPrice
    columns.append('altPrice') # 
    # 18.	tax: an indicator of whether or not the item is taxable.   
    columns.append('tax') # 
    # 19.	taxexempt: mostly zero.   
    columns.append('taxexempt') # 
    # 20.	foodstamp: can the item be purchased with food stamps?   
    columns.append('foodstamp') # 
    # 21.	wicable: can the item be purchased with WIC?   
    columns.append('wicable') # 
    # 22.	discount: a marker of any discounts.    
    columns.append('discount') # 
    # 23.	memDiscount: the member discounts on items.   
    columns.append('memDiscount') # 
    # 24.	discountable: beats me.   
    columns.append('discountable') # 
    # 25.	discounttype: there’s probably information in here, but I haven’t decoded it.
    columns.append('discounttype') # 
    # 26.	voided: I think it’s used if an item is a void or if an item was run up and subsequently voided.   
    columns.append('voided') # 
    # 27.	percentDiscount: I don’t use it.   
    columns.append('percentDiscount') # 
    # 28.	ItemQtty: I’m not sure what this is.   
    columns.append('ItemQtty') # 
    # 29.	volDiscType: Ditto   
    columns.append('volDiscType') # 
    # 30.	volume: Ditto
    columns.append('volume') # 
    # 31.	VolSpecial: Ditto   
    columns.append('VolSpecial') # 
    # 32.	mixMatch: Ditto   
    columns.append('mixMatch') # 
    # 33.	matched: Ditto   
    columns.append('matched') # 
    # 34.	memType: Mostly NULL or 1, but I’m not sure what it signifies. Maybe institutional memberships?   
    columns.append('memType') # 
    # 35.	staff: indicative of staff transactions perhaps?   
    columns.append('staff') # 
    # 36.	numflag: A complicated bitflag that encodes a bunch of other information. I’ll add the communication on columns.append('numflag') # this topic to an appendix below, but it’s not critical for our purposes.   
    columns.append('numflag')
    # 37.	Itemstatus: Don’t know   
    columns.append('Itemstatus') # 
    # 38.	tenderstatus: Ditto   
    columns.append('tenderstatus') # 
    # 39.	charflag: Ditto   
    columns.append('charflag') # 
    # 40.	varflag: Ditto   
    columns.append('varflag') # 
    # 41.	batchHeaderID: Ditto   
    columns.append('batchHeaderID') # 
    # 42.	local: is the item local?   
    columns.append('local') # 
    # 43.	organic: is the item organic?   
    columns.append('organic') # 
    # 44.	display: Don’t know.   
    columns.append('display') # 
    # 45.	receipt: Ditto   
    columns.append('receipt') # 
    # 46.	card_no: This one is important. This is the masked owner number for the transaction. It is an integer. columns.append('card_no') # If the value is 3, then the transaction is for a non-owner. You’ll find some owners (like 11572) that have a huge number of transactions. These are likely other co-ops. If you are a member of, say, the Seward Co-op you can receive discounts at the Wedge. The cashier selects your co-op and the receipt is flagged as being from that co-op.    
    columns.append('card_no')
    # 47.	store: 1 for the main store and 512 for catering.   
    columns.append('store') # 
    # 48.	branch: 0 for the main store and 3 for the Wedge Table, a grab-and-go bodega they opened in January columns.append('branch') # 2015.  
    columns.append('branch')
    # 49.	match_id: don’t know   
    columns.append('match_id') # 
    # 50.	trans_id: a counter that increments the line items of a receipt.
    columns.append('trans_id') # 
    # print(columns)
    return columns


def dtype_columns():
    # https://www.geeksforgeeks.org/add-column-names-to-dataframe-in-pandas/
    columns = {}
    # 1.	datetime: timestamp of the transaction-row creation   
    columns.update({'datetime':'string'}) # 
    # 2.	register_no: register for transaction
    columns.update({'register_no':'string'}) # 
    # 3.	emp_no: employee number for cashier   
    columns.update({'emp_no':'string'}) # 
    # 4.	trans_no: transaction number. This number counts up by day and is only unique when combined with date, columns.update({'trans_no':'string'}) # register and employee.
    columns.update({'trans_no':'string'})
    # 5.	Upc: Universal Product Code for the item. 0 for non-items.
    columns.update({'Upc':'string'}) # 
    # 6.	description: product description. Includes things like Tax, Tender type, etc.   
    columns.update({'description':'string'}) # 
    # 7.	trans_type: One of five values (D, G, A, T, and I). These correspond to the following types of columns.update({'trans_type':'string'}) # transactions:
    # •	D: Departmental rings, when the cashier just selects a department for the item.
    # •	G: Green patch donations. This is the donation made for shoppers who bring their own bag. 
    # •	A: Tax
    # •	T: Tender, the payment row.
    # •	I: Items, but also includes discounts. 
    columns.update({'trans_type':'string'})
    # 8.	trans_subtype: There are a lot of these. Key ones include methods of payment (CK for Check, CA for columns.update({'trans_subtype':'string'}) # Cash, CP for coupon, EF for EBT Food Stamps , WC for WIC). These are often blank for other trans_type values.
    columns.update({'trans_subtype':'string'})
    # 9.	trans_status: An important field. The field trans_status tells us more about the types transactions. columns.update({'trans_status':'string'}) # Here are the possible values:
    # •	Blank: The typical value.
    # •	M: Member discounts.
    # •	V: Voids 
    # •	C: Coupons
    # •	0: Honestly, I think these are supposed to be blanks but they changed from 0s at some point in February 2010. 
    # •	R: Returns.
    # •	J: Juice club cards
    columns.update({'trans_status':'string'})
    # 10.	department: The number of the department. See the next appendix for a department lookup table.
    columns.update({'department':'string'}) # 
    # 11.	quantity: The purchased quantity. Beware, some items such as flowers and bulk vegetables are priced per  # cent and then sold in very large quantities (like 1000 for a $10 bouquet.)   
    columns.update({'quantity':'string'})
    # 12.	Scale: The reading on the scale. Note that the capital here is not a typo. This is one field that 
    columns.update({'Scale':'string'}) # weirdly has a capital first letter.    
    # 13.	cost: the per-unit cost of an item to the Wedge. This is not uniformly populated. 
    columns.update({'cost':'string'}) # 
    # 14.	unitPrice:  the per-unit cost of an item to an owner. Negative for things like returns and discounts.
    columns.update({'unitPrice':'string'}) # 
    # 15.	total: price times quantity. The cost of the line item. Note that this can be negative because columns.update({'total':'string'}) # unitPrice can be negative.     
    columns.update({'total':'string'})
    # 16.	regPrice: The regular price of an item. May be different from unitPrice but unitPrice plus discount columns.update({'regPrice':'string'}) # should be regPrice.   
    columns.update({'regPrice':'string'})
    # 17.	altPrice
    columns.update({'altPrice':'string'}) # 
    # 18.	tax: an indicator of whether or not the item is taxable.   
    columns.update({'tax':'string'}) # 
    # 19.	taxexempt: mostly zero.   
    columns.update({'taxexempt':'string'}) # 
    # 20.	foodstamp: can the item be purchased with food stamps?   
    columns.update({'foodstamp':'string'}) # 
    # 21.	wicable: can the item be purchased with WIC?   
    columns.update({'wicable':'string'}) # 
    # 22.	discount: a marker of any discounts.    
    columns.update({'discount':'string'}) # 
    # 23.	memDiscount: the member discounts on items.   
    columns.update({'memDiscount':'string'}) # 
    # 24.	discountable: beats me.   
    columns.update({'discountable':'string'}) # 
    # 25.	discounttype: there’s probably information in here, but I haven’t decoded it.
    columns.update({'discounttype':'string'}) # 
    # 26.	voided: I think it’s used if an item is a void or if an item was run up and subsequently voided.   
    columns.update({'voided':'string'}) # 
    # 27.	percentDiscount: I don’t use it.   
    columns.update({'percentDiscount':'string'}) # 
    # 28.	ItemQtty: I’m not sure what this is.   
    columns.update({'ItemQtty':'string'}) # 
    # 29.	volDiscType: Ditto   
    columns.update({'volDiscType':'string'}) # 
    # 30.	volume: Ditto
    columns.update({'volume':'string'}) # 
    # 31.	VolSpecial: Ditto   
    columns.update({'VolSpecial':'string'}) # 
    # 32.	mixMatch: Ditto   
    columns.update({'mixMatch':'string'}) # 
    # 33.	matched: Ditto   
    columns.update({'matched':'string'}) # 
    # 34.	memType: Mostly NULL or 1, but I’m not sure what it signifies. Maybe institutional memberships?   
    columns.update({'memType':'string'}) # 
    # 35.	staff: indicative of staff transactions perhaps?   
    columns.update({'staff':'string'}) # 
    # 36.	numflag: A complicated bitflag that encodes a bunch of other information. I’ll add the communication on columns.update({'numflag':'string'}) # this topic to an appendix below, but it’s not critical for our purposes.   
    columns.update({'numflag':'string'})
    # 37.	Itemstatus: Don’t know   
    columns.update({'Itemstatus':'string'}) # 
    # 38.	tenderstatus: Ditto   
    columns.update({'tenderstatus':'string'}) # 
    # 39.	charflag: Ditto   
    columns.update({'charflag':'string'}) # 
    # 40.	varflag: Ditto   
    columns.update({'varflag':'string'}) # 
    # 41.	batchHeaderID: Ditto   
    columns.update({'batchHeaderID':'string'}) # 
    # 42.	local: is the item local?   
    columns.update({'local':'string'}) # 
    # 43.	organic: is the item organic?   
    columns.update({'organic':'string'}) # 
    # 44.	display: Don’t know.   
    columns.update({'display':'string'}) # 
    # 45.	receipt: Ditto   
    columns.update({'receipt':'string'}) # 
    # 46.	card_no: This one is important. This is the masked owner number for the transaction. It is an integer. If the value is 3, then the transaction is for a non-owner. You’ll find some owners (like 11572) that have a huge number of transactions. These are likely other co-ops. If you are a member of, say, the Seward Co-op you can receive discounts at the Wedge. The cashier selects your co-op and the receipt is flagged as being from that co-op.    
    columns.update({'card_no':'string'})
    # 47.	store: 1 for the main store and 512 for catering.   
    columns.update({'store':'string'}) # 
    # 48.	branch: 0 for the main store and 3 for the Wedge Table, a grab-and-go bodega they opened in January 2015.  
    columns.update({'branch':'string'})
    # 49.	match_id: don’t know   
    columns.update({'match_id':'string'}) # 
    # 50.	trans_id: a counter that increments the line items of a receipt.
    columns.update({'trans_id':'string'}) # 
    
    # print(columns)

    return columns


## GBQ Setup

In [4]:
# These first two values will be different on your machine. 
# service_path = "/Users/chandler/Dropbox/Teaching/"
# service_file = 'umt-msba-037daf11ee16.json' # change this to your authentication information  
# gbq_proj_id = 'umt-msba' # change this to your project. 
# service_path = "/media/psf/Home/Repos/"
service_path = "/home/blackvwgolf95/"
service_file = 'bmkt670-fall2022-wedge-project-6ce4398b80e4.json' # change this to your authentication information  
gbq_proj_id = 'bmkt670-fall2022-wedge-project' # change this to your project. 
dataset_id = 'wedgedataset'

# And this should stay the same. 
private_key = service_path + service_file

# Now we pass in our credentials so that Python has permission to access our project.
credentials = service_account.Credentials.from_service_account_file(service_path + service_file)

# And finally we establish our connection
client = bigquery.Client(credentials = credentials, project=gbq_proj_id)

# for item in client.list_datasets() : 
#    print(item.full_dataset_id)

## Phase 1, Upload Clean Files

In [5]:
# In this cell, do the following: 

# Master list of all data files
zip_files = []

with ZipFile( zip_file_name, 'r') as zf : 
    extract_zip(zf)
    print('Done Extracting!')
    

print("Done building file list")

Extracting all the files now...
File Exists, skipping
transArchive_201001_201003.zip
File Exists, skipping
transArchive_201004_201006.zip
File Exists, skipping
transArchive_201007_201009.zip
File Exists, skipping
transArchive_201010_201012.zip
File Exists, skipping
transArchive_201101_201103.zip
File Exists, skipping
transArchive_201104.zip
File Exists, skipping
transArchive_201105.zip
File Exists, skipping
transArchive_201106.zip
File Exists, skipping
transArchive_201107_201109.zip
File Exists, skipping
transArchive_201110_201112.zip
File Exists, skipping
transArchive_201201_201203.zip
File Exists, skipping
transArchive_201201_201203_inactive.zip
File Exists, skipping
transArchive_201204_201206.zip
File Exists, skipping
transArchive_201204_201206_inactive.zip
File Exists, skipping
transArchive_201207_201209.zip
File Exists, skipping
transArchive_201207_201209_inactive.zip
File Exists, skipping
transArchive_201210_201212.zip
File Exists, skipping
transArchive_201210_201212_inactive.zip

## Verify ZIP Files

In [7]:
# display_file_contents(zip_files)

## Extract Inner Zips


In [8]:
data_files = []

for inner_zip_file_name in zip_files :
    # print(working_directory + inner_zip_file_name)
    # Ignore folders
    if not inner_zip_file_name.endswith( '.zip' ):
        continue
    with ZipFile( working_directory + inner_zip_file_name, 'r') as zf : 
        extract_single_zip(zf)
#         extract_single_zips(zip_files)

File Exists, skipping
transArchive_201001_201003.csv
File Exists, skipping
transArchive_201004_201006.csv
File Exists, skipping
transArchive_201007_201009.csv
File Exists, skipping
transArchive_201010_201012.csv
File Exists, skipping
transArchive_201101_201103.csv
File Exists, skipping
transArchive_201104.csv
File Exists, skipping
transArchive_201105.csv
File Exists, skipping
transArchive_201106.csv
File Exists, skipping
transArchive_201107_201109.csv
File Exists, skipping
transArchive_201110_201112.csv
File Exists, skipping
transArchive_201201_201203.csv
File Exists, skipping
transArchive_201201_201203_inactive.csv
File Exists, skipping
transArchive_201204_201206.csv
File Exists, skipping
transArchive_201204_201206_inactive.csv
File Exists, skipping
transArchive_201207_201209.csv
File Exists, skipping
transArchive_201207_201209_inactive.csv
File Exists, skipping
transArchive_201210_201212.csv
File Exists, skipping
transArchive_201210_201212_inactive.csv
File Exists, skipping
transArch

## Verify Data Files

In [10]:
# display_file_contents(data_files)


### Checking for and deleting previous tables

We'll get all the tables in our Dram data set that match our pattern, then delete them. We do not want to accidentally delete the item lookup table that we put in this data set in class. 


In [14]:
# create a regex that matches our table pattern
# ymd_pattern = re.compile(r"^dram_items_[1-2][9,0][1-2][9,0,1,2][01][0-9][01][0-9]$") 

transArchive_pattern = re.compile(r"^transArchive_*") 

tables = client.list_tables(dataset_id)  

for table in tables:
    
    print(f'Looking at {table.table_id}')

    # Test to see if table.table_id matches the pattern
    # if so, delete it
    if transArchive_pattern.match(table.table_id):
        # print(table.table_id)
        print(f'She swiped right, we have a MATCH! {table.table_id}')
        # table_id = ".".join([gbq_proj_id,dataset_id,table.table_id])
        # Disabling to prevent accidently running
        client.delete_table(table, not_found_ok=True)
        print(f"She blocked us, all hope is lost {table.table_id}.")


Looking at transArchive_201001_201003
She swiped right, we have a MATCH! transArchive_201001_201003
She blocked us, all hope is lost transArchive_201001_201003.
Looking at transArchive_201004_201006
She swiped right, we have a MATCH! transArchive_201004_201006
She blocked us, all hope is lost transArchive_201004_201006.
Looking at transArchive_201007_201009
She swiped right, we have a MATCH! transArchive_201007_201009
She blocked us, all hope is lost transArchive_201007_201009.
Looking at transArchive_201010_201012
She swiped right, we have a MATCH! transArchive_201010_201012
She blocked us, all hope is lost transArchive_201010_201012.
Looking at transArchive_201101_201103
She swiped right, we have a MATCH! transArchive_201101_201103
She blocked us, all hope is lost transArchive_201101_201103.
Looking at transArchive_201104
She swiped right, we have a MATCH! transArchive_201104
She blocked us, all hope is lost transArchive_201104.
Looking at transArchive_201105
She swiped right, we hav

She blocked us, all hope is lost transArchive_201612.
Looking at transArchive_201701
She swiped right, we have a MATCH! transArchive_201701
She blocked us, all hope is lost transArchive_201701.


## Uploading


In [12]:


# def upload_file(file_name):
#     # https://stackoverflow.com/a/27232309
#     transactions = pd.read_csv(working_directory+file_name, 
#                                header=None, 
#                                names=data_columns(), 
#                                dtype=dtype_columns()
#                               ) # 

#     # Construct table name from index
#     table_name = "wedge_"+file_name.replace(".","-").replace("/","-")
#     # print(type(name))

#     # 3. For each month in the file, subset the data to that month and 
#     #    upload the data to a table called `dram_items_YYYYMM01`. 
#     table_id = ".".join([gbq_proj_id,dataset_id,table_name])
    
#     # print(table_id)
    
#     job = client.load_table_from_dataframe(
#         transactions, table_id, job_config=job_config
#     ) # 

#     # Wait for the load job to complete. (I omit this step)
#     print(job.result())

def upload_file(file_name):
    # print(file_name)
    print("Starting Processing File: " + file_name + "\n")
    delimiter = get_delimiter(file_name)
    
    header = get_header(file_name)
    
    # https://stackoverflow.com/a/27232309
    transactions = pd.read_csv(working_directory+file_name, 
                               header=header, 
                               names=data_columns(), 
                               dtype=dtype_columns(),
                               delimiter=delimiter,
                               na_filter=False,
                               na_values=['nan', 'NaN', 'null', 'NULL', '\\N'],
                               escapechar="\\"
                              ) #
    
    transactions['datetime'] = transactions['datetime'].astype('datetime64[ns]')
    
    transactions['register_no'] = transactions['register_no'].astype('int')
    transactions['emp_no'] = transactions['register_no'].astype('int')
    transactions['trans_no'] = transactions['register_no'].astype('int')
    transactions['department'] = transactions['department'].astype('int')
    transactions['quantity'] = transactions['quantity'].astype('float')
    transactions['Scale'] = transactions['Scale'].astype('float')
    transactions['cost'] = transactions['cost'].astype('float')
    transactions['unitPrice'] = transactions['unitPrice'].astype('float')
    transactions['total'] = transactions['total'].astype('float')
    transactions['regPrice'] = transactions['regPrice'].astype('float')
    # transactions['altPrice'] = transactions['altPrice'].astype('float')
    transactions['tax'] = transactions['tax'].astype('int')
    transactions['foodstamp'] = transactions['foodstamp'].astype('int')
    
    transactions['trans_id'] = transactions['trans_id'].astype('int')
  

    # Construct table name from index
    table_name = file_name.replace(".csv","").replace("_small","").replace(".","-").replace("/","-")
    # print(type(name))
    print("Uploading Table: " + table_name + "\n")
    table_id = ".".join([gbq_proj_id,dataset_id,table_name])
    pandas_gbq.to_gbq(transactions, table_id, project_id=gbq_proj_id,if_exists="replace") # let's discuss this last bit
    
    print("Finished Processing File: " + file_name + "\n")
    
    

## Multi Threading

In [None]:
# https://stackoverflow.com/a/68806012

# Since string columns use the "object" dtype, pass in a (partial) schema
# to ensure the correct BigQuery data type.
# job_config = bigquery.LoadJobConfig(schema=[
#     dtype_columns(),
# ])

job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE",)

x = 0

# 1. Read in the items files one at a time.
for file_name in data_files :
    print("Processing File: " + file_name)
    # upload_file(file_name)
    # threading.Thread(target=upload_file, args=(file_name,)).start()
    
    time.sleep(5)
    x = x + 1
    if x > 5:
        break
print("Done!")

## Pooling

In [None]:
start_time = time.time()

pool_size = 50  # your "parallelness"

pool = Pool(pool_size)

for file_name in data_files :
    #pool.apply_async(upload_file, (file_name,))
    pass

pool.close()
pool.join()

print("Done!")

print("--- %s seconds ---" % (time.time() - start_time))

## Multiprocessing

In [16]:
start_time = time.time()

jobs = []
for file_name in data_files :
    p = multiprocessing.Process(target=upload_file, args=(file_name,))
    jobs.append(p)
    p.start()
    time.sleep(.5)
    
for p in jobs:
    p.join()
    
print("Done!")

print("--- %s seconds ---" % (time.time() - start_time))

Starting Processing File: transArchive_201001_201003.csv

Starting Processing File: transArchive_201004_201006.csv

Starting Processing File: transArchive_201007_201009.csv

Starting Processing File: transArchive_201010_201012.csv

Starting Processing File: transArchive_201101_201103.csv

Starting Processing File: transArchive_201104.csv

Starting Processing File: transArchive_201105.csv

Starting Processing File: transArchive_201106.csv

Starting Processing File: transArchive_201107_201109.csv

Starting Processing File: transArchive_201110_201112.csv

Starting Processing File: transArchive_201201_201203.csv

Starting Processing File: transArchive_201201_201203_inactive.csv

Starting Processing File: transArchive_201204_201206.csv

Starting Processing File: transArchive_201204_201206_inactive.csv

Starting Processing File: transArchive_201207_201209.csv

Starting Processing File: transArchive_201207_201209_inactive.csv

Starting Processing File: transArchive_201210_201212.csv

Starting


Finished Processing File: transArchive_201307_201309.csv

Finished Processing File: transArchive_201110_201112.csv

Finished Processing File: transArchive_201310_201312.csv

Finished Processing File: transArchive_201404_201406.csv

Done!
--- 218.95518279075623 seconds ---


# Cleanup ALL Local Files

In [None]:
# https://linuxize.com/post/python-delete-files-and-directories/
try:
    # shutil.rmtree(working_directory)
    print('Done Cleanup')
    print("Completed Exit Code 0")
except OSError as e:
    print("Error: %s : %s" % (working_directory, e.strerror))
    print("Completed Exit Code -1")
