## Python Google Big Query Example

This notebook is a continuation of `Python GBQ Example.ipynb`. That file uploaded data to GBQ on tables we created manually. This version will create a couple of tables (based on the same schema) and fill them with data. 


In [None]:
# Do our imports for the code
from google.cloud import bigquery
from google.oauth2 import service_account

In [None]:
# These first two values will be different on your machine. 
service_path = "C:\\users\\jchan\\dropbox\\teaching\\"
service_file = 'UMT-MSBA-7b4265df0ca4.json' # this is your authentication information  
gbq_proj_id = 'umt-msba'  # change this to your project_id
gbq_dataset_id = 'wedge_example' # and change this to your data set ID

private_key =service_path + service_file

In [None]:
# Get your credentials
credentials = service_account.Credentials.from_service_account_file(service_path + service_file)

# And create a client to talk to GBQ
client = bigquery.Client(credentials = credentials, project=gbq_proj_id)

At this point, our `client` variable holds a connection to the project. This is client is similar to a file handle--it allows you to "talk" to the project.  

---

## Creating a Table

Let's create a couple of tables based on our Wedge table schema. First, put in a name for your table.

In [None]:
my_table = "chandler_wedge_test"

table_full_name = ".".join([gbq_proj_id,gbq_dataset_id,my_table])

Now we'll test to see if that table exists and, if it doesn't, create it as an empty table. There's not natively a function to test for table existence in GBQ, so we'll write our own, which I found on StackOverflow [here](https://stackoverflow.com/questions/28731102/bigquery-check-if-table-already-exists).

In [None]:
def tbl_exists(client, table_ref):
    from google.cloud.exceptions import NotFound
    try:
        client.get_table(table_ref)
        return True
    except NotFound:
        return False

In [None]:
if not tbl_exists(client, table_full_name) :
    table_ref = client.create_table(
        table = table_full_name
    )
else :
    table_ref = client.get_table(table_full_name)

At this point our table is empty and doesn't even have a schema. 

In [None]:
table = client.get_table(table_ref)
print("Table {} contains {} columns".format(table_ref.table_id,len(table.schema)))

In [None]:
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job_config.schema_update_options = [
    bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION # This allows us to modify the table. 
]

This next cell has a massive set of SchemaField additions. I built this programmatically, so that you can just copy and paste it when you need it. But it's worth reading it *and* comparing it to the `wedge_transaction_schema.json` to see if it makes sense. 

In [None]:
job_config.schema = [
    bigquery.SchemaField("datetime", "TIMESTAMP", mode="NULLABLE"),
    bigquery.SchemaField("register_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("emp_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("upc", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_type", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_subtype", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_status", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("department", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("quantity", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("Scale", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("cost", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("unitPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("total", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("regPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("altPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tax", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("taxexempt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("foodstamp", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("wicable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discountable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discounttype", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("voided", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("percentDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("ItemQtty", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volDiscType", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volume", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("VolSpecial", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("mixMatch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("matched", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memType", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("staff", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("numflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("itemstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tenderstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("charflag", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("varflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("batchHeaderID", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("local", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("organic", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("display", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("receipt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("card_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("store", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("branch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("match_id", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_id", "FLOAT", mode="NULLABLE"),
]
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1

This next cell does the actual loading.

In [None]:
with open("clean_transArchive_201307_201309_small.csv", "rb") as source_file:
    job = client.load_table_from_file(
        source_file,
        table_ref,
        location="US",  # Must match the destination dataset location.
        job_config=job_config,
    )  # API request

And now we'll see how it did. This kind of code, that doesn't do the actual work but helps you monitor what's going on, is invaluable.

In [None]:
job.result()  # Waits for table load to complete.
print(
    "Loaded {} rows into {}:{}.".format(
        job. , 'wedge_example', table_ref.table_id
    )
)

# Checks the updated length of the schema
table = client.get_table(table)
print("Table {} now contains {} columns.".format(table_ref.table_id, len(table.schema)))

Okay, that allowed us to load 10K rows from the small version. Let's clean it out and load a larger version.

First, the clean out.

In [None]:
query_text ="".join(['DELETE FROM `',table_full_name,'` WHERE 1=1'])
# you have to have WHERE clause in a DELETE for GBQ

job_config = bigquery.QueryJobConfig()

query_job = client.query(
    query_text,
    location="US",
    job_config=job_config,
)  # API request - starts the query

query_job.result()  # Waits for the query to finish

So, now we've deleted the rows, let's once again fill the table up, this time with a slightly larger file. 

In [None]:
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job_config.skip_leading_rows = 1 # Need to skip the header row here.

with open("small_trans_file.csv", "rb") as source_file:
    job = client.load_table_from_file(
        source_file,
        table_ref,
        location="US",  # Must match the destination dataset location.
        job_config=job_config,
    )  # API request
    
job.result()  # Waits for table load to complete.
print(
    "Loaded {} rows into {}:{}.".format(
        job.output_rows, 'wedge_example', table_ref.table_id
    )
)

# Checks the updated length of the schema
table = client.get_table(table)
print("Table {} now contains {} columns.".format(table_ref.table_id, len(table.schema)))

In [None]:
# Let's query to see how many owners and records we now have loaded in

qry = """SELECT
              COUNT(DISTINCT card_no) as Owners,
              COUNT(*) as Records
         FROM  
             `umt-msba.wedge_example.chandler_wedge_test`"""

# And we execute queries with `client.query`
query_job = client.query(
    qry,
    location="US",
)

for row in query_job :
    print(f"We have {row[0]} owners and {row[1]} total records.")


And now, some tasks for you:

1. Using the console (in your browser), determine how many days of data we have in our new table. 
1. Use Python code to create a _new_ table with the Wedge schema. 
1. Try to upload one of the files from `WedgeZipOfZips_Small.zip`. Invariably you're going to get some errors. See if you can figure out what's going wrong. 
1. Try to upload a different file that's formatted differently (different delimiter, different presence of headers, etc.) If that one throws errors, can you make sense of them? 
1. Using whatever mechanism you want (Python, Excel, find-and-replace), try to get one of those smaller transaction files loaded into GBQ. 
1. Your goal next goal: load the full version of this file into a new table in GBQ using Python. Bonus goal: load a couple in to their own tables.  