In [1]:
import os
import io
from google.cloud import bigquery
from google.oauth2 import service_account

In [2]:
# These first two values will be different on your machine. 
service_path = '/Users/steviodong/Desktop/Applied Data Analytics/TWP/'
service_file = 'wedge-328901-b7c6a35e5da2.json' # change this to your authentication information  
gbq_proj_id = 'wedge-328901' # change this to your project. 
gbq_dataset_id = 'wedgeclean' # and change this to your data set ID

# And this should stay the same. 
private_key =service_path + service_file



In [3]:
# Identify path to the clean Wedge files and establish a list to store the files
clean_files = "/Users/steviodong/Desktop/Applied Data Analytics/TWP/"
clean_wedge_files = os.listdir(clean_files)

In [4]:
# Now we pass in our credentials so that Python has permission to access our project.
credentials = service_account.Credentials.from_service_account_file(service_file)
# And finally we establish our connection
client = bigquery.Client(credentials = credentials, project=gbq_proj_id)


In [5]:
# store the name of my GBQ table into this python instance
my_table = "transArchive_clean"

table_full_name = ".".join([gbq_proj_id,gbq_dataset_id,my_table])

In [6]:
# test for the connection to the GBQ, create it an empty table if connection fails
def tbl_exists(client, table_ref):
    from google.cloud.exceptions import NotFound
    try:
        client.get_table(table_ref)
        return True
    except NotFound:
        return False

if not tbl_exists(client, table_full_name) :
    table_ref = client.create_table(
        table = table_full_name
    )
else :
    table_ref = client.get_table(table_full_name)

In [7]:
# Check if the table exists in GBQ 
table = client.get_table(table_ref)
print("Table {} contains {} columns".format(table_ref.table_id,len(table.schema)))

Table transArchive_clean contains 50 columns


In [8]:
# Connect to the new table in GBQ and prepare it to have a schema uploaded
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job_config.schema_update_options = [
    bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION 
]

In [9]:
# Prepare the schema for GBQ 
job_config.schema = [
    bigquery.SchemaField("datetime", "TIMESTAMP", mode="NULLABLE"),
    bigquery.SchemaField("register_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("emp_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("upc", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_type", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_subtype", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_status", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("department", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("quantity", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("Scale", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("cost", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("unitPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("total", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("regPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("altPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tax", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("taxexempt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("foodstamp", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("wicable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discountable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discounttype", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("voided", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("percentDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("ItemQtty", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volDiscType", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volume", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("VolSpecial", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("mixMatch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("matched", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memType", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("staff", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("numflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("itemstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tenderstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("charflag", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("varflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("batchHeaderID", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("local", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("organic", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("display", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("receipt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("card_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("store", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("branch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("match_id", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_id", "FLOAT", mode="NULLABLE"),
]
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1

In [10]:
# Upload the csv file to GBQ 
with open("/Users/steviodong/Desktop/Applied Data Analytics/TWP/clean-files/transArchive_201701_clean.csv", "rb") as source_file:
    job = client.load_table_from_file(
        source_file,
        table_ref,
        location="US",  
        job_config=job_config,
    )  

In [11]:
# show the result of the upload process
job.result()  
print(
    "Loaded {} rows into {}:{}.".format(
        job.output_rows, 'wedgeclean', table_ref.table_id
    )
)

# Checks the length of the schema
table = client.get_table(table)
print("Table {} now contains {} columns.".format(table_ref.table_id, len(table.schema)))

Loaded 936740 rows into wedgeclean:transArchive_clean.
Table transArchive_clean now contains 50 columns.


In [12]:
# Delete the test file
query_text ="".join(['DELETE FROM `',table_full_name,'` WHERE 1=1'])

job_config = bigquery.QueryJobConfig()

query_job = client.query(
    query_text,
    location="US",
    job_config=job_config,
)  # API request - starts the query

# Waits for the query to finish
query_job.result()

<google.cloud.bigquery.table._EmptyRowIterator at 0x7fc53f36fc40>

In [13]:
# Runs the big job of uploading all of the wedge data into GBQ
# NOTE: This takes a while, 2-4 hours depending on your connection speed

job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job_config.skip_leading_rows = 1 # Skips the header row here.

In [15]:
for filename in os.listdir("/Users/steviodong/Desktop/Applied Data Analytics/TWP/"):
    if filename.endswith(".csv"):
        with open("clean-files/" + filename, "rb") as source_file:
            job = client.load_table_from_file(
                source_file,
                table_ref,
                location="US",  # Must match the destination dataset location.
                job_config=job_config,
            )  # API request

job.result()  # Waits for table load to complete.
print(
    "Loaded {} rows into {}:{}.".format(
        job.output_rows, 'wedgeclean', table_ref.table_id
    )
)

# Checks the updated length of the schema
table = client.get_table(table)
print("Table {} now contains {} columns.".format(table_ref.table_id, len(table.schema)))

Loaded 936740 rows into wedgeclean:transArchive_clean.
Table transArchive_clean now contains 50 columns.
