# Wedge Data Engineering Project

In [1]:
import os
import io
import random
from google.cloud import bigquery
from google.oauth2 import service_account

In [2]:
cleaned = os.listdir("clean-files")

In [11]:
# These first two values will be different on your machine.
service_path = "/Users/ajeck/Documents/ADA/Wedge Project/"
service_file = 'WedgeProject-843a50287012.json'
gbq_proj_id = 'wedgeproject-290522'
gbq_dataset_id = 'wedge'

private_key =service_path + service_file

In [12]:
# Creating a client with my credentials so I can talk to my GBQ
credentials = service_account.Credentials.from_service_account_file(service_path + service_file)
client = bigquery.Client(credentials = credentials, project=gbq_proj_id)

In [13]:
#beginning code that allows us to modify the table
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job_config.schema_update_options = [bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION]

In [14]:
#schema field additions for tables
job_config.schema = [
    bigquery.SchemaField("datetime", "TIMESTAMP", mode="NULLABLE"),
    bigquery.SchemaField("register_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("emp_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("upc", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_type", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_subtype", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_status", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("department", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("quantity", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("Scale", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("cost", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("unitPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("total", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("regPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("altPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tax", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("taxexempt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("foodstamp", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("wicable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discountable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discounttype", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("voided", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("percentDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("ItemQtty", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volDiscType", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volume", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("VolSpecial", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("mixMatch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("matched", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memType", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("staff", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("numflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("itemstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tenderstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("charflag", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("varflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("batchHeaderID", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("local", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("organic", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("display", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("receipt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("card_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("store", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("branch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("match_id", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_id", "FLOAT", mode="NULLABLE"),
]
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1

In [20]:
def texists(client, table_ref):
    from google.cloud.exceptions import NotFound
    try:
        client.get_table(table_ref)
        return True
    except NotFound:
        return False

In [19]:
#Had to use to rewrite first files that got written in twice
#fixclean = cleaned[:24]
#print(fixclean)

['transArchive_201001_201003_clean.csv', 'transArchive_201004_201006_clean.csv', 'transArchive_201007_201009_clean.csv', 'transArchive_201010_201012_clean.csv', 'transArchive_201101_201103_clean.csv', 'transArchive_201104_clean.csv', 'transArchive_201105_clean.csv', 'transArchive_201106_clean.csv', 'transArchive_201107_201109_clean.csv', 'transArchive_201110_201112_clean.csv', 'transArchive_201201_201203_clean.csv', 'transArchive_201201_201203_inactive_clean.csv', 'transArchive_201204_201206_clean.csv', 'transArchive_201204_201206_inactive_clean.csv', 'transArchive_201207_201209_clean.csv', 'transArchive_201207_201209_inactive_clean.csv', 'transArchive_201210_201212_clean.csv', 'transArchive_201210_201212_inactive_clean.csv', 'transArchive_201301_201303_clean.csv', 'transArchive_201301_201303_inactive_clean.csv', 'transArchive_201304_201306_clean.csv', 'transArchive_201304_201306_inactive_clean.csv', 'transArchive_201307_201309_clean.csv', 'transArchive_201307_201309_inactive_clean.csv

In [23]:
#open the files and loading them into seperate tables in GBQ
filepath = "/Users/ajeck/Documents/ADA/Wedge Project/clean-files/"
for file in cleaned:
    #just to grab everything pre-_clean (which is the table part)
    tab, other = file.split("_clean")
    table_full_name = ".".join([gbq_proj_id,gbq_dataset_id,tab])
    
    if not texists(client, table_full_name) :
        table_ref = client.create_table(table = table_full_name)
    else :
        table_ref = client.get_table(table_full_name)
    
    table = client.get_table(table_ref)
    print("Table {} contains {} columns".format(table_ref.table_id,len(table.schema)))
    
    with open(filepath + file, "rb") as my_file:
        job = client.load_table_from_file(
            my_file,
            table_ref,
            location = "US",
            job_config=job_config,
        )

    job.result()
    
    print("Finished loading {} rows into {}:{}.".format(job.output_rows, 'wedge_example', table_ref.table_id))
    table = client.get_table(table)
    print("Finished loading {} now contains {} columns.".format(table_ref.table_id, len(table.schema)))

Table transArchive_201001_201003 contains 0 columns
Finished loading 2998329 rows into wedge_example:transArchive_201001_201003.
Finished loading transArchive_201001_201003 now contains 50 columns.
Table transArchive_201004_201006 contains 0 columns
Finished loading 3185806 rows into wedge_example:transArchive_201004_201006.
Finished loading transArchive_201004_201006 now contains 50 columns.
Table transArchive_201007_201009 contains 0 columns
Finished loading 2992584 rows into wedge_example:transArchive_201007_201009.
Finished loading transArchive_201007_201009 now contains 50 columns.
Table transArchive_201010_201012 contains 0 columns
Finished loading 2957585 rows into wedge_example:transArchive_201010_201012.
Finished loading transArchive_201010_201012 now contains 50 columns.
Table transArchive_201101_201103 contains 0 columns
Finished loading 2920825 rows into wedge_example:transArchive_201101_201103.
Finished loading transArchive_201101_201103 now contains 50 columns.
Table tran

In [24]:
#select the distinct individual card_no except the coding for non-members, card_no=3
query_card_nos = "Select Distinct card_no," "From `wedgeproject-290522.wedge.transArchive*`" "Where card_no !=3"
query_get_card_nos = client.query(query_card_nos,location ='US',)

In [25]:
owners = []

for idx, row in enumerate(query_get_card_nos) :
    card_no = row[0]
    
    owners.append(card_no)

print(len(owners))

In [27]:
random.seed(312)
samp_size = 250
samp_owners = random.choices(owners, k=samp_size)
print(len(samp_owners))

250


In [28]:
#query selecting card numbers
owners_join = ",".join([str(owner) for owner in samp_owners])
query = """Select * From `wedgeproject-290522.wedge.transArchive*` Where card_no in (""" + owners_join + ")"
query_own = client.query(query,location ='US',)

In [29]:
#write the files as txt file and add headers to the first row
headers = ["datetime","register_no","emp_no","trans_no","upc","description","trans_type","trans_subtype","trans_status","department","quantity","Scale","cost","unitPrice","total","regPrice","altPrice","tax","taxexempt","foodstamp","wicable","discount","memDiscount","discountable","discounttype","voided","percentDiscount","ItemQtty","volDiscType","volume","VolSpecial","mixMatch","matched","memType","staff","numflag","itemstatus","tenderstatus","charflag","varflag","batchHeaderID","local","organic","display","receipt","card_no","store","branch","match_id","trans_id"]
with open ("sample_of_owners.txt", "w") as outfile :
    outfile.write ("\t".join(headers) + "\n")
    for row in query_own:
        outfile.write("\t".join([str(item) for item in row]) + "\n")

# Part 3
## Sales by Date by Hour

In [30]:
# want by distinct hour (calendar day and hour combo), get sum of sales (total spend), count transactions and sum items
# do this by extracting date and hour from datetime, and taking sum of sales
# then we have to count the distict occurances of date, transaction, employee and register as unique transactions, then we can use count
# lastly, we sum the items sold, but -1 for the returns and voids because these items need to be counted against an item sold
# for example, if an item is double scanned, then voided we need -1 so that the two items from the double scan only counts as 1

query = ("SELECT (EXTRACT(date FROM datetime)) AS Date, " 
"(EXTRACT(hour FROM datetime)) AS Hour, " 
"SUM(total) AS Sales, " 
"COUNT(DISTINCT(Date(datetime) || trans_no || emp_no || register_no)) AS Transactions, " 
"SUM(CASE WHEN(trans_status = 'R' OR trans_status = 'V') THEN -1 ELSE 1 END) as Items "  
"FROM `wedgeproject-290522.wedge.transArchive*` " 
"WHERE card_no != 3 " 
"AND department != 0 " 
"AND department != 15 " 
"AND trans_status != 'M' " 
"AND trans_status != 'C' " 
"AND trans_status != 'J' " 
"AND (trans_status = '' " 
"OR trans_status = ' ' " 
"OR trans_status =  'V' " 
"OR trans_status = 'R') " 
"GROUP BY Date, Hour " 
"ORDER BY Date, Hour ")

#execute query
query_sales_by_date_by_hour = client.query(query,location ='US',)

In [31]:
headers = ["date", "hours", "sales", "transactions", "items"]
with open ("sales_by_date_by_hour.txt", "w") as outfile :
    outfile.write ("\t".join(headers) + "\n")
    for row in query_sales_by_date_by_hour:
        newlist = [str(item) for item in row]
        outfile.write("\t".join(newlist) + "\n")

In [32]:
import sqlite3
db = sqlite3.connect("wedge_aj.db")
cur = db.cursor()

Create table in database

In [33]:
input_txt = "sales_by_date_by_hour.txt"
cur.execute('''DROP TABLE IF EXISTS sales_by_date_by_hour''')
cur.execute('''CREATE TABLE sales_by_date_by_hour (
    date TIMESTAMP, 
    hour INTEGER, 
    sales INTEGER,
    transactions INTEGER,
    items INTEGER)''')

with open(input_txt,'r', encoding="Latin-1") as inputfile :
    next(inputfile)
    for idx, line in enumerate(inputfile.readlines()) :
        line = line.strip().split("\t")
        cur.execute('''
            INSERT INTO sales_by_date_by_hour (date,hour,sales,transactions,items)
            VALUES (?,?,?,?,?)''', line)

db.commit()

# Sales by Owner by Year by Month

In [34]:
# want by distinct month (calendar year and month combo), get sum of sales (total), count transactions and sum items
# do this by extracting year and month from datetime, and taking sum of sales
# then we have to count the distict occurances of date, transaction, employee and register as unique transactions, then we can use count
# lastly, we sum the items sold, but -1 for the returns and voids because these items need to be counted against an item sold
# for example, if an item is double scanned, then voided we need -1 so that the two items from the double scan only counts as 1

query = ('''SELECT card_no As Owner,
    (EXTRACT(year FROM datetime)) AS Year,
    (EXTRACT(month FROM datetime)) AS Month,
    SUM(total) AS Sales,
    COUNT(DISTINCT(Date(datetime) || trans_no || emp_no || register_no)) AS Transactions,
    SUM(CASE WHEN(trans_status = 'R' OR trans_status = 'V') THEN -1 ELSE 1 END) as Items
    FROM `wedgeproject-290522.wedge.transArchive*`
    WHERE card_no != 3
    AND department != 0
    AND department != 15
    AND trans_status != 'M'
    AND trans_status != 'C'
    AND trans_status != 'J'
    AND (trans_status = ''
    OR trans_status = ' '
    OR trans_status =  'V'
    OR trans_status = 'R')
    GROUP BY Owner, Year, Month
    ORDER BY Owner, Year, Month DESC''')

#execute query
query_sales_by_owner_by_year_by_month = client.query(query,location ='US',)

In [35]:
headers = ["owner","year","month","sales","transactions","items"]
with open ("sales_by_owner_by_year_by_month.txt", "w") as outfile :
    outfile.write ("\t".join(headers) + "\n")
    for row in query_sales_by_owner_by_year_by_month:
        newlist = [str(item) for item in row]
        outfile.write("\t".join(newlist) + "\n")

Create table in database

In [36]:
input_txt = "sales_by_owner_by_year_by_month.txt"
cur.execute('''DROP TABLE IF EXISTS sales_by_owner_by_year_by_month''')
cur.execute('''CREATE TABLE sales_by_owner_by_year_by_month (
    owner INTEGER, year INTEGER, month INTEGER, sales INTEGER, transactions INTEGER, items INTEGER)''')

with open(input_txt,'r', encoding="Latin-1") as inputfile :
    next(inputfile)
    for idx, line in enumerate(inputfile.readlines()) :
        line = line.strip().split("\t")
        cur.execute('''
            INSERT INTO sales_by_owner_by_year_by_month (owner,year,month,sales,transactions,items)
            VALUES (?,?,?,?,?,?)''', line)
db.commit()

# Sales by Product Description by Year by Month

In [37]:
# Here we are selecting, department, product desription, extracting year and month from datetime and grabbing the universal product key
# The desired calculated values are sales, derived by sum of total, then distinct transactions and items just as before
query = ('''SELECT deplook.dept_name, trans.department, description,
    (EXTRACT(year FROM datetime)) AS Year,
    (EXTRACT(month FROM datetime)) AS Month,
    upc,
    SUM(total) AS Sales,
    COUNT(DISTINCT(Date(datetime) || trans_no || emp_no || register_no)) AS Transactions,
    SUM(CASE WHEN(trans_status = 'R' OR trans_status = 'V') THEN -1 ELSE 1 END) as Items,
    trans.department AS dept_no
    FROM `wedgeproject-290522.wedge.transArchive*` AS trans
    LEFT OUTER JOIN `wedgeproject-290522.wedge.department_lookup` AS deplook ON trans.department = deplook.department
    WHERE card_no != 3
    AND trans.department != 0 
    AND trans.department != 15
    AND trans_status != 'M'
    AND trans_status != 'C'
    AND trans_status != 'J'
    AND (trans_status = ''
    OR trans_status = ' '
    OR trans_status =  'V'
    OR trans_status = 'R')
    GROUP BY Year, Month, upc, description, dept_no, deplook.dept_name
    ORDER BY description, Year, Month DESC''')

#execute query
query_sales_by_product_description_by_year_by_month = client.query(query,location ='US',)

In [38]:
headers = ["department","dept_name","description", "year","month", "upc", "sales", "transactions", "items","dept_no"]
with open ("sales_by_product_description_by_year_by_month.txt", "w") as outfile :
    outfile.write ("\t".join(headers) + "\n")
    for row in query_sales_by_product_description_by_year_by_month:
        newlist = [str(item) for item in row]
        outfile.write("\t".join(newlist) + "\n")

In [39]:
input_txt = "sales_by_product_description_by_year_by_month.txt"
cur.execute('''DROP TABLE IF EXISTS sales_by_product_description_by_year_by_month''')
cur.execute('''CREATE TABLE sales_by_product_description_by_year_by_month (
    Department INTEGER,
    Dept_name INTERGER,
    Description INTERGER,
    Year INTEGER,
    Month INTEGER,
    upc INTEGER,
    Sales INTEGER,
    Transactions INTEGER,
    Items INTEGER,
    dept_no INTEGER)''')

with open(input_txt,'r', encoding="Latin-1") as inputfile :
    next(inputfile)
    for idx, line in enumerate(inputfile.readlines()) :
        line = line.strip().split("\t")
        cur.execute('''
            INSERT INTO sales_by_product_description_by_year_by_month (Department,Dept_name,Description,Year,Month,upc,Sales,Transactions,Items,dept_no)
            VALUES (?,?,?,?,?,?,?,?,?,?)''', line)

db.commit()