## Upload Data to GCS and Load Data to BigQuery

### Step 1 - Install Libraries

In [None]:
!pip install gcloud google-cloud-storage google-cloud-bigquery

### Step 2 - Import Libraries and Setup Google Service Account

In [None]:
from google.api_core.exceptions import NotFound
from google.cloud import storage
from google.cloud import bigquery

GCP_SERVICE_ACCOUNT_JSON = "secret/etl-training-427511-8a5bde222176.json"

### Step 3 - Upload CSV to GCS

In [None]:
bucket_name = "muic-etl"
file_path = "generated/exam_grades-from-db-tf.csv"
destination_path = "data/exam_grades-from-db-tf.csv"

In [None]:
storage_client = storage.Client.from_service_account_json(GCP_SERVICE_ACCOUNT_JSON)
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_path)
blob.upload_from_filename(file_path)
print(f"File {file_path} uploaded to {destination_path} in bucket {bucket_name}.")

### Step 4 - Load Data to BigQuery (Autodetect Schema)

In [None]:
client = bigquery.Client.from_service_account_json(GCP_SERVICE_ACCOUNT_JSON)
source_uri = f"gs://{bucket_name}/{destination_path}"
dataset_id = "exam"

In [None]:
# try getting the dataset and create if missing
try:
    dataset_ref = client.get_dataset(dataset_id)
except NotFound as e:
    # requires bigquery.datasets.create permission
    dataset_ref = client.create_dataset(dataset_id)

In [None]:
# try getting the table and empty it if exists
table_autodetect_id = "exam_grades_autodetect"

table_autodetect_ref = dataset_ref.table(table_autodetect_id)
try:
    table_autodetect = client.get_table(table_autodetect_ref)
    print(f"Table {table_autodetect_id} contains {table_autodetect.num_rows} rows.")
    
    print(f"Deleting table {table_autodetect_id}...")
    client.delete_table(table_autodetect_ref)
except NotFound as e:
    pass

In [None]:
job_config = bigquery.LoadJobConfig(
    autodetect=True,
    skip_leading_rows=1,
    # The source format defaults to CSV, so the line below is optional.
    source_format=bigquery.SourceFormat.CSV,
)

# Make an API request.
load_job = client.load_table_from_uri(
    source_uri, table_autodetect_ref, job_config=job_config
)

# Waits for the job to complete.
load_job.result()

destination_table = client.get_table(table_autodetect_ref)
print("Loaded {} rows.".format(destination_table.num_rows))

### Step 5 - Load Data to BigQuery (Custom Schema)

In [None]:
# try getting the table and empty it if exists
table_custom_id = "exam_grades_custom"

table_custom_ref = dataset_ref.table(table_custom_id)
try:
    table_custom = client.get_table(table_custom_ref)
    print(f"Table {table_custom_id} contains {table_custom.num_rows} rows.")
    
    print(f"Deleting table {table_custom_id}...")
    client.delete_table(table_custom_ref)
except NotFound as e:
    pass

In [None]:
# https://cloud.google.com/bigquery/docs/samples/bigquery-load-table-gcs-csv
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableFieldSchema.FIELDS.type

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("year", "INTEGER"),
        bigquery.SchemaField("term", "INTEGER"),
        bigquery.SchemaField("gender", "STRING"),
        bigquery.SchemaField("exam1", "NUMERIC"),
        bigquery.SchemaField("exam2", "NUMERIC"),
        bigquery.SchemaField("exam3", "NUMERIC"),
        bigquery.SchemaField("course_grade", "NUMERIC"),
    ],
    skip_leading_rows=1,
    # The source format defaults to CSV, so the line below is optional.
    source_format=bigquery.SourceFormat.CSV,
)

# Make an API request.
load_job = client.load_table_from_uri(
    source_uri, table_custom_ref, job_config=job_config
)

# Waits for the job to complete.
load_job.result()

destination_table = client.get_table(table_custom_ref)
print("Loaded {} rows.".format(destination_table.num_rows))

### Step 6 - Load Data to BigQuery (Truncate Before Update)

In [None]:
job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    skip_leading_rows=1,
    # The source format defaults to CSV, so the line below is optional.
    source_format=bigquery.SourceFormat.CSV,
)

# Make an API request.
load_job = client.load_table_from_uri(
    source_uri, table_custom_ref, job_config=job_config
)

# Waits for the job to complete.
load_job.result()

destination_table = client.get_table(table_custom_ref)
print("Loaded {} rows.".format(destination_table.num_rows))