In this notebook we'll work through some of the issues we will identify in class that we'll need to solve for the Wedge project. The key tasks are as follows:

1. We need to get our file or files out of the zip file.
1. We need to be able to read in the file. 
1. We need to do a few tests: looking for a header row, checking for delimiters, checking for quotes.
1. We need to identify the owner number, which has index `45` in the row.
1. We need to find the destination row.
1. We need to write out the row. 

I've built some toy examples for us to play with in class. First, let's get a list of the files. The `os` package has a handy function `listdir` that will help. 

In [1]:
import os

import io

import csv 

from zipfile import ZipFile

import glob 

import random 

from google.cloud import bigquery

from google.oauth2 import service_account

import sqlite3 




In [None]:
zip_files = os.listdir("wedge/")

In [None]:
good_files = []
bad_file_list = []
for filename in zip_files:
    
    if filename.startswith("._"):
        bad_file_list.append(filename.strip('._'))
    else:
        good_files.append(filename)
        

        

In [None]:
#header names
headers = ["datetime","register_no","emp_no","trans_no","upc","description","trans_type","trans_subtype","trans_status","department","quantity","Scale","cost","unitPrice","total","regPrice","altPrice","tax","taxexempt","foodstamp","wicable","discount","memDiscount","discountable","discounttype","voided","percentDiscount","ItemQtty","volDiscType","volume","VolSpecial","mixMatch","matched","memType","staff","numflag","itemstatus","tenderstatus","charflag","varflag","batchHeaderID","local","organic","display","receipt","card_no","store","branch","match_id","trans_id"]

In [None]:
#make new directory to hold extracted files 
temp_folder_name = "wedge_clean"

if not os.path.isdir(temp_folder_name): #if folder exisits
    os.mkdir(temp_folder_name)          # if not, make it

In [None]:
# original code 

for zip_file in good_files : #files found in the wedge directory from the firts extraction of the large zip file
    print(zip_file)
    with ZipFile("wedge/" + zip_file, 'r') as my_zip_file:#read the zip files in the directory as my_zip_file
    
        files_inside = my_zip_file.namelist() #assign new name of files inside 
        for zipped_file in files_inside:
            sniffer = csv.Sniffer()
            
            
            with my_zip_file.open(zipped_file,'r') as input_file:
                
                clean_file_name = input_file.name.replace('.csv','_clean.csv')#rename new files 
                
                with open("wedge_clean/" + clean_file_name,'w') as outfile:#open the output file name in new folder
                    outfile.write(",".join(headers) + "\n") #write the files and join the headers to the new outfile
                    
                    rows_printed = 0
                    for idx,line in enumerate(input_file):
                        
                        file_has_header = False
                                                                    
                        dialect = sniffer.sniff(line.decode("utf-8"))
                        line = line.decode("utf-8").strip().split(dialect.delimiter)#take a bytes object line to a normal string, strip it and then split the string to list
                        line=[piece.replace(r'"','') for piece in line] #use list comprehension to remove the double quotes
                        line=[piece.replace(r"//N",'') for piece in line]#remove null vaules
                        line=[piece.replace(r'NULL','') for piece in line]
                        line=[piece.replace(r'\N','') for piece in line]
                        
                        if len(line) != 50:
                            new_column = ("".join(line[5:8])) #if the lines > 50 create a new column of the joined items
                            del line[5:8] #delete the old columns that should be joined
                            line.insert(5,new_column) #insert the correct column in place of deleted ones
                        
                        if idx == 0:
                            if 'datetime' in line[0]:
                                file_has_header = True
                                                            
                        if file_has_header and idx == 0:
                            pass #don't print the line
                        else:
                            outfile.write(",".join(line) + "\n")
                            rows_printed += 1
                           
                        
print(f"Hell yes!!!!")

In [None]:
path_to_files = "/Volumes/EXTERNAL/MSBA/ada/assignments/ada-wedge/wedge_clean/"

In [None]:
clean_wedge = os.listdir("wedge_clean")

In [None]:
good_files = []
bad_file_list = []
for filename in clean_wedge:
    
    if filename.startswith("._"):
        bad_file_list.append(filename.strip('._'))
    else:
        good_files.append(filename)

Loading of the Clean Files into GBQ

In [None]:
# These first two values will be different on your machine. 
service_path = "/Volumes/EXTERNAL/MSBA/ada/assignments/ada-wedge/"
service_file = 'Wedge Project-b683332c35a5.json' # change this to your authentication information  
gbq_proj_id = 'affable-operand-291614' # change this to your poroject. 
gbq_dataset_id = 'wedge_data' # and change this to your data set ID

# And this should stay the same. 
private_key =service_path + service_file

In [None]:
# Get your credentials
credentials = service_account.Credentials.from_service_account_file(service_path + service_file)

# And create a client to talk to GBQ
client = bigquery.Client(credentials = credentials, project=gbq_proj_id)

In [None]:
#check to see if a table exist in GBQ
def tbl_exists(client, table_ref):
    from google.cloud.exceptions import NotFound
    try:
        client.get_table(table_ref)
        return True
    except NotFound:
        return False

In [None]:
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job_config.schema_update_options = [
    bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION # This allows us to modify the table. 
]

In [None]:
job_config.schema = [
    bigquery.SchemaField("datetime", "TIMESTAMP", mode="NULLABLE"),
    bigquery.SchemaField("register_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("emp_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("upc", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_type", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_subtype", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_status", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("department", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("quantity", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("Scale", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("cost", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("unitPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("total", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("regPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("altPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tax", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("taxexempt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("foodstamp", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("wicable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discountable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discounttype", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("voided", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("percentDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("ItemQtty", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volDiscType", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volume", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("VolSpecial", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("mixMatch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("matched", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memType", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("staff", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("numflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("itemstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tenderstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("charflag", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("varflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("batchHeaderID", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("local", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("organic", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("display", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("receipt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("card_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("store", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("branch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("match_id", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_id", "FLOAT", mode="NULLABLE"),
]
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1

In [None]:
for file in good_files:
    my_table,junk = file.split("_clean")#splits on _clean in csv files and removes rest as junk
    table_full_name = ".".join([gbq_proj_id,gbq_dataset_id,my_table]) #creates GBQ table name
    
    if not tbl_exists(client, table_full_name) :
        table_ref = client.create_table(table = table_full_name)
    else :
        table_ref = client.get_table(table_full_name)
    
    table = client.get_table(table_ref)
    print("Table {} contains {} columns".format(table_ref.table_id,len(table.schema)))
    
    with open(path_to_files + file, "rb") as source_file:
        job = client.load_table_from_file(
            source_file,
            table_ref,
            location="US",  # Must match the destination dataset location.
            job_config=job_config,
        )  # API request
        
    job.result()  # Waits for table load to complete.
    print("Loaded {} rows into {}:{}.".format(job.output_rows, 'wedge_example', table_ref.table_id))
    
    

# Checks the updated length of the schema
    table = client.get_table(table)
    print("Table {} now contains {} columns.".format(table_ref.table_id, len(table.schema)))
    

If you need to remove Tables in Dataset use code below

In [None]:
for file in clean_wedge:
    my_table,junk = file.split("_clean")
    table_full_name = ".".join([gbq_proj_id,gbq_dataset_id,my_table]) #creates GBQ table name
    query_text ="".join(['DROP TABLE `',table_full_name,'`'])
# you have to have WHERE clause in a DELETE for GBQ
    job_config = bigquery.QueryJobConfig()
    query_job = client.query(
        query_text,
        location="US",
        job_config=job_config,
    )  # API request - starts the query
    query_job.result()  # Waits for the query to finish
    
    print(f'Table {table_full_name} was dropped')
   

## Task 2

For this task we will connect to the GBQ instance and build a list of owners, take a sample of the owners and extract all the records associated with those owners and write them to a txt file. 

In [2]:
#Paths to connect to GBQ

service_path = "/Volumes/EXTERNAL/MSBA/ada/assignments/ada-wedge/"
service_file = 'Wedge Project-b683332c35a5.json' # change this to your authentication information  
gbq_proj_id = 'affable-operand-291614' # change this to your poroject. 
gbq_dataset_id = 'wedge_data' # and change this to your data set ID

private_key =service_path + service_file
# Get your credentials
credentials = service_account.Credentials.from_service_account_file(service_path + service_file)

# And create a client to talk to GBQ
client = bigquery.Client(credentials = credentials, project=gbq_proj_id)

In [None]:
#This is the query used from GBQ to pull owners
query = ("Select Distinct card_no,"
"From `affable-operand-291614.wedge_data.transArchive*`"
"Where card_no !=3"
)

#Execute the query using `client.query`

query_job = client.query(query,location = "US",)

Next we will create a list of all the distinct owners from the GBQ tables

In [None]:
owners = []

for idx, row in enumerate(query_job): 
    card_no = row[0] #since row in GBQ is a tuple you need to place [] and 
    
    owners.append(card_no)
    
    
        
    



From the list we will pull a random sample of 325 owners. 

In [None]:
random.seed(20201015)#start random generator at same spot
owner_sample = random.choices(owners,k=325) #random list of owners




Next we will take the query built in GBQ and utilize the random sample of owners to pull the data. There are 2 ways of joinging the query with the list of owners. The cleaner option is run with the list comprehension while another option is commented out. 

In [None]:
query = """Select * From `affable-operand-291614.wedge_data.transArchive*` Where card_no in ("""

# for owner in owner_sample:
#     query += str(owner)+ ","
    
# query = query[:-1] + ")"



query = query + ",".join([str(owner) for owner in owner_sample]) + ")"

In [None]:
query_job = client.query(query,location = "US",)

In [None]:
#header names
headers = ["datetime","register_no","emp_no","trans_no","upc","description","trans_type","trans_subtype","trans_status","department","quantity","Scale","cost","unitPrice","total","regPrice","altPrice","tax","taxexempt","foodstamp","wicable","discount","memDiscount","discountable","discounttype","voided","percentDiscount","ItemQtty","volDiscType","volume","VolSpecial","mixMatch","matched","memType","staff","numflag","itemstatus","tenderstatus","charflag","varflag","batchHeaderID","local","organic","display","receipt","card_no","store","branch","match_id","trans_id"]

In [None]:
with open("owner_query.txt", "w") as outfile: 
    outfile.write(",".join(headers) + "\n") #write the files and join the headers to the new outfile 
    for row in query_job: 
        outfile.write(",".join([str(item) for item in row])+"\n")
        
        
     

## Task 3

For this task we will create a new database, run three different queries from GBQ, write them to txt files, and upload them to the new database

In [3]:
#create new database named Wedge_Task_3

db = sqlite3.connect("Wedge_Task_3.db")
cur =db.cursor # place the cursor in the start of the database

In [4]:
#sales by date by hour query 

query1 = (
    """SELECT (EXTRACT(date FROM datetime)) AS Date,
    (EXTRACT(hour FROM datetime)) AS Hour,
    SUM(total) AS Sales,
    COUNT(DISTINCT(Date(datetime) || register_no || emp_no || trans_no)) AS Transactions,
    SUM(CASE WHEN(trans_status = 'V' OR trans_status = 'R') THEN -1 ELSE 1 END) as Items
    FROM `affable-operand-291614.wedge_data.transArchive*` 
    WHERE card_no != 3
    AND department != 0
    AND department != 15
    AND trans_status != 'M'
    AND trans_status != 'C'
    AND trans_status != 'J'
    AND (trans_status = ''
    OR trans_status = ' '
    OR trans_status =  'V'
    OR trans_status = 'R')
    GROUP BY Date, Hour
    ORDER BY Date, Hour"""
)

# execute queries with `client.query`
results1 = client.query(
    query1,
    location="US",)

#Create output txt file for the first query

with open('Sales_by_date_by_hour.txt',"w") as outfile:
    
    for row in results1: 
        outfile.write(",".join([str(item) for item in row])+"\n")


In [5]:

# input the results of the first query into the Wedge_Task_3 database as the table Sales_by_date_by_hour
input_file1 = "Sales_by_date_by_hour.txt"

db = sqlite3.connect("Wedge_Task_3.db") # connect to the new database
cur = db.cursor()

cur.execute('''DROP TABLE IF EXISTS Sales_by_Date_by_Hour''') # remove the table if it already exists

# create the table in the connected database and set up the schema
cur.execute('''CREATE TABLE Sales_by_Date_by_Hour (
    Date TIMESTAMP,
    Hour TIMESTAMP,
    Sales REAL,
    Transactions INTEGER,
    Items TEXT)''')

# input the Sales by Date by Hour text data into the established table in the database
with open(input_file1, 'r', encoding = 'Latin-1') as infile:
    for idx,line in enumerate(infile.readlines()):
        line = line.strip().split(',')
        cur.execute('''
        INSERT INTO Sales_by_Date_by_Hour (Date, Hour, Sales, Transactions, Items)
        VALUES (?,?,?,?,?)''', line) #make sure that the ? match up with the column labels for value insertion
db.commit()

In [15]:
# Second Query of Wedge data in GBQ - Owner, Year, Month, Sales, Transactions, and Items
query2 = (
    """SELECT card_no As Owner,
    (EXTRACT(year FROM datetime)) AS Year,
    (EXTRACT(month FROM datetime)) AS Month,
    SUM(total) AS Sales,
    COUNT(DISTINCT(Date(datetime) || register_no || emp_no || trans_no)) AS Transactions,
    SUM(CASE WHEN(trans_status = 'V' OR trans_status = 'R') THEN -1 ELSE 1 END) as Items
    FROM `affable-operand-291614.wedge_data.transArchive*` 
    WHERE card_no != 3
    AND department != 0
    AND department != 15
    AND trans_status != 'M'
    AND trans_status != 'C'
    AND trans_status != 'J'
    AND (trans_status = ''
    OR trans_status = ' '
    OR trans_status =  'V'
    OR trans_status = 'R')
    GROUP BY Owner, Year, Month
    ORDER BY Owner, Year, Month DESC"""
)

# And we execute queries with `client.query`
results2 = client.query(
    query2,
    location="US",
)

#Create output txt file for the first query

with open('Sales_by_Owner_Date.txt',"w") as outfile:
    
    for row in results2: 
        outfile.write(",".join([str(item) for item in row])+"\n")

In [16]:
# input the results of the first query into the WedgeTask3 database as the table Sales_by_Owner_by_Date
input_file2 = "Sales_by_Owner_Date.txt"

db = sqlite3.connect("Wedge_Task_3.db") # connect to the WedgeTask 3 database
cur = db.cursor()

cur.execute('''DROP TABLE IF EXISTS Sales_by_Owner_Date''') # remove the table if it already exists

# create the table in the connected database and set up the schema
cur.execute('''CREATE TABLE Sales_by_Owner_Date (
    Owner INTEGER,
    Year TIMESTAMP,
    Month TIMESTAMP,
    Sales REAL,
    Transactions INTEGER,
    Items TEXT)''')

# input the Sales by Owner by Date text data into the established table in the database
with open(input_file2, 'r', encoding = 'Latin-1') as infile:
    for idx,line in enumerate(infile.readlines()):
        line = line.strip().split(',')
        cur.execute('''
        INSERT INTO Sales_by_Owner_Date (Owner, Year, Month, Sales, Transactions, Items)
        VALUES (?,?,?,?,?,?)''', line)
db.commit()

In [19]:
# Product description by year, month w/columns upc, description, dpt #, dpt name, yr, mo, sales, trans, items
query3 = (
    """SELECT Upc AS UPC,
    description AS Product,
    a.department AS Department,
    b.dept_name AS Dept_Name,
    (EXTRACT(year FROM datetime)) AS Year,
    (EXTRACT(month FROM datetime)) AS Month,
    SUM(total) AS Sales,
    COUNT(DISTINCT(Date(datetime) || register_no || emp_no || trans_no)) AS Transactions,
    SUM(CASE WHEN(trans_status = 'V' OR trans_status = 'R') THEN -1 ELSE 1 END) as Items
    FROM `wedge_data.transArchive*` a
    LEFT OUTER JOIN `wedge_data.wedge_department` b
    ON a.department = b.department
    WHERE card_no != 3
    AND a.department != 0
    AND a.department != 15
    AND trans_status != 'M'
    AND trans_status != 'C'
    AND trans_status != 'J'
    AND (trans_status = ''
    OR trans_status = ' '
    OR trans_status =  'V'
    OR trans_status = 'R')
    GROUP BY UPC, Product, Department, Dept_Name, Year, Month
    ORDER BY UPC, Product, Department, Dept_Name, Year, Month DESC"""
)

# And we execute queries with `client.query`
results3 = client.query(
    query3,
    location="US",)

# output the thrid query results as a text file
with open('Sales_by_Product_by_Date.txt', 'w') as outfile :
    for row in results3: 
        outfile.write(",".join([str(item) for item in row])+"\n")



In [20]:
# input the results of the first query into the WedgeTask3 database as the table Sales_by_Product_by_Date
input_file3 = "Sales_by_Product_by_Date.txt"

db = sqlite3.connect("Wedge_Task_3.db")# connect to the WedgeTask 3 database
cur = db.cursor()

cur.execute('''DROP TABLE IF EXISTS Sales_by_Product_by_Date''') # remove the table if it already exists

# create the table in the connected database and set up the schema
cur.execute('''CREATE TABLE Sales_by_Product_by_Date (
    UPC STRING,
    Description STRING,
    Department FLOAT,
    Dept_Name STRING,
    Year TIMESTAMP,
    Month TIMESTAMP,
    Sales REAL,
    Transactions INTEGER,
    Items TEXT)''')

# input the Sales by Product by Date text data into the established table in the database
with open(input_file3, 'r', encoding = 'Latin-1') as infile:
    for idx,line in enumerate(infile.readlines()):
        line = line.strip().split(',')
        cur.execute('''
        INSERT INTO Sales_by_Product_by_Date (UPC, Description, Department, Dept_Name, Year, Month, Sales, Transactions, Items)
        VALUES (?,?,?,?,?,?,?,?,?)''', line)
db.commit()