# Wedge Task 1


In [1]:
import os
from zipfile import ZipFile
import io

# Do our imports for part 2 of uploading
from google.cloud import bigquery
from google.oauth2 import service_account
from pathlib import Path



In [2]:
# These first two values will be different on your machine. 
service_path = "c:\\Users\\logan\\DataScience\\"
service_file = 'beskoonlettuce-327919-af8cb2931ac8.json' # this is your authentication information  
gbq_proj_id = 'beskoonlettuce-327919'  # change this to your project_id
gbq_dataset_id = 'wedge_transactions' # and change this to your data set ID

private_key =service_path + service_file

In [3]:
# Get your credentials
credentials = service_account.Credentials.from_service_account_file(service_path + service_file)

# And create a client to talk to GBQ
client = bigquery.Client(credentials = credentials, project=gbq_proj_id)

The code listed below became my final approach for how to upload files, via Google Cloud Storage.

The first step involved uploading all the csv files to GSC through the UI.


In [4]:

file_name = os.listdir("WedgeFiles_Clean/clean-files/")
print(file_name[2])

transArchive_201004_201006_clean.csv


## Batch Loading begins

In [14]:

#Now we want to create our table.
#NOTE: change item of file_name for each file
current_file = file_name[53]
my_table = Path(current_file).stem
print(my_table)
table_full_name = ".".join([gbq_proj_id,gbq_dataset_id,my_table])

transArchive_201701_clean


In [15]:
def tbl_exists(client, table_ref):
    from google.cloud.exceptions import NotFound
    try:
        client.get_table(table_ref)
        return True
    except NotFound:
        return False

In [16]:
if not tbl_exists(client, table_full_name) :
    table_ref = client.create_table(
        table = table_full_name
    )
else :
    table_ref = client.get_table(table_full_name)

Now we have an empty table. The code chunk below is schema to be imported.


In [17]:
table = client.get_table(table_ref)
print("Table {} contains {} columns".format(table_ref.table_id,len(table.schema)))

Table transArchive_201701_clean contains 0 columns


In [18]:
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job_config.schema_update_options = [
    bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION # This allows us to modify the table. 
]

In [19]:
job_config.schema = [
    bigquery.SchemaField("datetime", "TIMESTAMP", mode="NULLABLE"),
    bigquery.SchemaField("register_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("emp_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("upc", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_type", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_subtype", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_status", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("department", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("quantity", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("Scale", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("cost", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("unitPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("total", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("regPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("altPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tax", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("taxexempt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("foodstamp", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("wicable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discountable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discounttype", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("voided", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("percentDiscount", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("ItemQtty", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volDiscType", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volume", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("VolSpecial", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("mixMatch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("matched", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memType", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("staff", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("numflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("itemstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tenderstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("charflag", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("varflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("batchHeaderID", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("local", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("organic", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("display", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("receipt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("card_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("store", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("branch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("match_id", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_id", "FLOAT", mode="NULLABLE"),
]
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1

In [20]:
#generate correct URI
uri = "gs://lettucewedge_bucket/clean-files/" + current_file
print(uri)

gs://lettucewedge_bucket/clean-files/transArchive_201701_clean.csv


In [21]:

load_job = client.load_table_from_uri(
    uri, table, job_config=job_config
) #Make API request

load_job.result()  # Waits for table load to complete.
print(
        "Loaded {} rows into {}:{}.".format(
            load_job.output_rows, 'wedge_example', table_ref.table_id
    )
)

# Checks the updated length of the schema
table = client.get_table(table)
print("Table {} now contains {} columns.".format(table_ref.table_id, len(table.schema)))


Loaded 936740 rows into wedge_example:transArchive_201701_clean.
Table transArchive_201701_clean now contains 50 columns.


Need to go back to batch loading to complete all uploads. After 53, finished!