## Task 1 of Wedge project


In [1]:
# Do our imports for the code
import os
import csv
from google.cloud import bigquery
from google.oauth2 import service_account

# this creates a path to where the source (large, cleaned) files reside locally
path_to_files = "C:\\Users\\Craig\\Documents\\000000000 UofM fall 2021\\BMIS670 Chandler AppliedDataAnalytics\\Wedge project\\clean_full\\"
clean_wedge_files = os.listdir(path_to_files) # assigning  to object 'clean_wedge_files'


In [2]:
clean_wedge_files

['transArchive_201204_201206_clean.csv',
 'transArchive_201204_201206_inactive_clean.csv',
 'transArchive_201207_201209_clean.csv',
 'transArchive_201207_201209_inactive_clean.csv',
 'transArchive_201210_201212_clean.csv',
 'transArchive_201210_201212_inactive_clean.csv',
 'transArchive_201301_201303_clean.csv',
 'transArchive_201301_201303_inactive_clean.csv',
 'transArchive_201304_201306_clean.csv',
 'transArchive_201304_201306_inactive_clean.csv',
 'transArchive_201307_201309_clean.csv',
 'transArchive_201307_201309_inactive_clean.csv',
 'transArchive_201310_201312_clean.csv',
 'transArchive_201310_201312_inactive_clean.csv',
 'transArchive_201401_201403_clean.csv',
 'transArchive_201401_201403_inactive_clean.csv',
 'transArchive_201404_201406_clean.csv',
 'transArchive_201404_201406_inactive_clean.csv',
 'transArchive_201407_201409_clean.csv',
 'transArchive_201407_201409_inactive_clean.csv',
 'transArchive_201410_201412_clean.csv',
 'transArchive_201410_201412_inactive_clean.csv',

In [3]:
# Identify the path to the JSON that links us with Google Big Query (GBQ) project, dataset 
service_path = "C:\\Users\\Craig\\Documents\\000000000 UofM fall 2021\\BMIS670 Chandler AppliedDataAnalytics\\Wedge project\\"
service_file = 'wedge-project-fall2021-92691-f3182a53adb6.json' # this is your authentication information  
gbq_proj_id = 'wedge-project-fall2021-92691'  # change this to your project_id
gbq_dataset_id = 'Wedge_FULL' # and change this to your data set ID
#gbq_dataset_id = 'wedge_clean_full' # and change this to your data set ID
private_key = service_path + service_file

In [4]:
# Get your credentials
credentials = service_account.Credentials.from_service_account_file(service_path + service_file)

# And create a client to talk to GBQ
client = bigquery.Client(credentials = credentials, project=gbq_proj_id)

At this point, our `client` variable holds a connection to the project. This is client is similar to a file handle--it allows you to "talk" to the project.  

---

## Creating a Table

Let's create a couple of tables based on our Wedge table schema. First, put in a name for your table.

In [5]:
my_table = "wedge-project-fall2021-92691-f3182a53adb6"

table_full_name = ".".join([gbq_proj_id,gbq_dataset_id,my_table])

In [6]:
table_full_name

'wedge-project-fall2021-92691.Wedge_FULL.wedge-project-fall2021-92691-f3182a53adb6'

Determine if our table exists, and if not... create it (see stack overflow's https://stackoverflow.com/questions/28731102/bigquery-check-if-table-already-exists).

In [7]:
def tbl_exists(client, table_ref):
    from google.cloud.exceptions import NotFound
    try:
        client.get_table(table_ref)
        return True
    except NotFound:
        return False

In [8]:
if not tbl_exists(client, table_full_name) :
    table_ref = client.create_table(
        table = table_full_name
    )
else :
    table_ref = client.get_table(table_full_name)

At this point our table is empty and doesn't even have a schema. 

In [9]:
table = client.get_table(table_ref)
print("Table {} contains {} columns".format(table_ref.table_id,len(table.schema)))

Table wedge-project-fall2021-92691-f3182a53adb6 contains 0 columns


In [10]:
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job_config.schema_update_options = [
    bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION # This allows us to modify the table. 
]

### Create Schema

This next cell has a massive set of SchemaField additions. I built this programmatically, so that you can just copy and paste it when you need it. But it's worth reading it *and* comparing it to the `wedge_transaction_schema.json` to see if it makes sense. 

In [11]:
job_config.schema = [
    bigquery.SchemaField("datetime", "TIMESTAMP", mode="NULLABLE"),
    bigquery.SchemaField("register_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("emp_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("upc", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_type", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_subtype", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_status", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("department", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("quantity", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("Scale", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("cost", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("unitPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("total", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("regPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("altPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tax", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("taxexempt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("foodstamp", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("wicable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discountable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discounttype", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("voided", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("percentDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("ItemQtty", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volDiscType", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volume", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("VolSpecial", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("mixMatch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("matched", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memType", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("staff", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("numflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("itemstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tenderstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("charflag", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("varflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("batchHeaderID", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("local", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("organic", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("display", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("receipt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("card_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("store", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("branch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("match_id", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_id", "FLOAT", mode="NULLABLE"),
]
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 0

## Upload the source data to GBQ

In [12]:
from timeit import default_timer as timer # to establish access to 'process timer'
start = timer()  # begin timer for this code block (will return #seconds elapsed)

for file in clean_wedge_files:
    my_table, junk = file.split(".")
    table_full_name = ".".join([gbq_proj_id,gbq_dataset_id,my_table])
    if not tbl_exists(client, table_full_name) :
        table_ref =client.create_table(
            table = table_full_name
        )
    else :
        table_ref = client.get_table(table_full_name)
    table = client.get_table(table_ref)
    print("Table {} now contains {} columns.".format(table_ref.table_id, len(table.schema)))

    with open(path_to_files + file, "rb") as source_file:
        job = client.load_table_from_file(
            source_file,
            table_ref,
            location="US",  # Must match the destination dataset location.
            job_config=job_config,
        )  # API request

    job.result()  # Waits for table load to complete.
    print(
        "Loaded {} rows into {}:{}.".format(
            job.output_rows, 'wedge_example', table_ref.table_id
        )
    ) 

end = timer() # stops the code-block timer process after all iterations of all loops
print(end - start)  # displays total seconds elapsed in this code block process    

Table transArchive_201204_201206_clean now contains 0 columns.
Loaded 3083546 rows into wedge_example:transArchive_201204_201206_clean.
Table transArchive_201204_201206_inactive_clean now contains 0 columns.
Loaded 237990 rows into wedge_example:transArchive_201204_201206_inactive_clean.
Table transArchive_201207_201209_clean now contains 0 columns.
Loaded 2925608 rows into wedge_example:transArchive_201207_201209_clean.
Table transArchive_201207_201209_inactive_clean now contains 0 columns.
Loaded 190877 rows into wedge_example:transArchive_201207_201209_inactive_clean.
Table transArchive_201210_201212_clean now contains 0 columns.
Loaded 2893637 rows into wedge_example:transArchive_201210_201212_clean.
Table transArchive_201210_201212_inactive_clean now contains 0 columns.
Loaded 162988 rows into wedge_example:transArchive_201210_201212_inactive_clean.
Table transArchive_201301_201303_clean now contains 0 columns.
Loaded 2903987 rows into wedge_example:transArchive_201301_201303_clea

In [13]:

# Checks the updated length of the schema
table = client.get_table(table)
print("Table {} now contains {} columns.".format(table_ref.table_id, len(table.schema)))

Table transArchive_201701_clean now contains 50 columns.
