Data Engineer OnBoard - Week 2 
# GCS and BigQuery
Angus Tu
2025.09.17

----
# BigQuery
BigQuery can be accessed and operated via the `BQ CLI command line`, `BQ SQL syntax`, or `Python` (e.g., using `pandas`) etc.



## Set Permission 
You need the <u>BigQuery Job User</u> and <u>BigQuery Data Editor</u> role and the permissions <u>storage.buckets.create</u> and <u>storage.buckets.list</u>.

In [None]:
gcloud config configurations activate default
gcloud projects add-iam-policy-binding tw-rd-data-angus-tu \
    --member="serviceAccount:angus-personal@tw-rd-data-angus-tu.iam.gserviceaccount.com" \
    --role="roles/bigquery.jobUser"
gcloud projects add-iam-policy-binding tw-rd-data-angus-tu \
    --member="serviceAccount:angus-personal@tw-rd-data-angus-tu.iam.gserviceaccount.com" \
    --role="roles/bigquery.dataEditor" 
gcloud projects add-iam-policy-binding tw-rd-data-angus-tu \
    --member="serviceAccount:angus-personal@tw-rd-data-angus-tu.iam.gserviceaccount.com" \
    --role="roles/bigquery.user" 
gcloud config configurations activate sa

## Create a Dataset

In [None]:
CREATE SCHEMA IF NOT EXISTS `tw-rd-data-angus-tu.travel_demo`
OPTIONS(location="asia-east1");

## Create a (Partition/Clustering) Table

### 1. Create an empty table, and import a DataFrame from Python pandas.

> Create an empty partition and clustering table via BQ SQL

In [None]:
CREATE TABLE IF NOT EXISTS `travel_demo.order` (
  MID string,
  PID string,
  date date
)
PARTITION BY date
CLUSTER BY MID

> [Sample Data] ORDER 

In [12]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from tabulate import tabulate

def funcOrder(n):
    mids = np.char.add("M", np.random.randint(10000000, 100000000, size=n).astype(str))
    pids = np.char.add("P",np.random.randint(100, 1000, size=n).astype(str))
    start_date = np.datetime64('2023-01-01')
    end_date = np.datetime64('today')
    delta_days = (end_date - start_date).astype(int)
    random_days = np.random.randint(0, delta_days + 1, size=n)
    dates = start_date + random_days.astype('timedelta64[D]')
    df = pd.DataFrame({'MID': mids,'PID': pids,'date': dates})
    return df

order_df = funcOrder(100000)
print(tabulate(order_df.head(5), headers='keys', tablefmt='grid', showindex=False))


+-----------+-------+---------------------+
| MID       | PID   | date                |
| M33430978 | P498  | 2025-02-22 00:00:00 |
+-----------+-------+---------------------+
| M36302512 | P277  | 2024-12-11 00:00:00 |
+-----------+-------+---------------------+
| M27964407 | P403  | 2023-10-01 00:00:00 |
+-----------+-------+---------------------+
| M69658690 | P486  | 2024-10-30 00:00:00 |
+-----------+-------+---------------------+
| M50233372 | P324  | 2025-07-17 00:00:00 |
+-----------+-------+---------------------+


> Import data from Python

In [14]:
from google.cloud import bigquery

BQ = bigquery.Client()
table_ref = "travel_demo.order"

job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE",
    schema=[
        bigquery.SchemaField("MID", "STRING"),
        bigquery.SchemaField("PID", "STRING"),
        bigquery.SchemaField("date", "DATE"),
    ])
job = BQ.load_table_from_dataframe(order_df, table_ref, job_config=job_config)
job.result()

LoadJob<project=tw-rd-data-angus-tu, location=asia-east1, id=bfb1aa8b-ab13-4508-a95f-9cdab00b51f3>

### 2. Import a CSV via GCS

> [Sample Data] PROD

In [5]:
pids = [f"P{i}" for i in range(100, 1000)]  # P100 ~ P999
n = len(pids)
amounts = np.random.randint(1, 1000, size=n) * 100
prod_df = pd.DataFrame({"PID": pids, "amount": amounts})

prod_df.to_csv("travel_demo_prod.csv", index=False)
print(tabulate(prod_df.head(5), headers='keys', tablefmt='grid', showindex=False))

+-------+----------+
| PID   |   amount |
| P100  |    70500 |
+-------+----------+
| P101  |    60200 |
+-------+----------+
| P102  |    35400 |
+-------+----------+
| P103  |    92400 |
+-------+----------+
| P104  |    57500 |
+-------+----------+


> Upload file to GCS

In [8]:
def upload_file(client, bucket_name, source_file_name, sink_file_name, sink_dir = ""):

    bucket = client.bucket(bucket_name)

    if sink_dir: 
        sink_dir = sink_dir.rstrip("/") + "/"
    blob_name = sink_dir + sink_file_name
    blob = bucket.blob(blob_name)
    blob.upload_from_filename(source_file_name)

    return f"gs://{bucket_name}/{blob_name}"

In [9]:
from google.cloud import storage
GCS = storage.Client()
upload_file(GCS, "tw-rd-data-angus-tu-travel-demo1", "travel_demo_prod.csv", "prod.csv", "test")

'gs://tw-rd-data-angus-tu-travel-demo1/test/prod.csv'

> Load via BQ SQL

In [None]:
LOAD DATA OVERWRITE `travel_demo.prod` (PID STRING, amount INT64)
FROM FILES (
  format = 'CSV',
  uris = ['gs://tw-rd-data-angus-tu-travel-demo1/test/prod.csv'],
  skip_leading_rows = 1);

> Load via Python API

In [11]:
job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("PID", "STRING"),
        bigquery.SchemaField("ammount", "INT64"),
    ],
    skip_leading_rows=1,
    source_format=bigquery.SourceFormat.CSV,
)
uri = 'gs://tw-rd-data-angus-tu-travel-demo1/test/prod.csv'

job = BQ.load_table_from_uri(uri, "travel_demo.prod2", job_config=job_config)
job.result()

LoadJob<project=tw-rd-data-angus-tu, location=asia-east1, id=fbf936d8-e63a-4443-8dff-a0ce0608d333>

## Create a View

In [None]:
CREATE VIEW `travel_demo.order_in_7_days`
AS
select MID, PID, date
from `travel_demo.order`
where date >= current_date()-7



In [17]:
view_df = BQ.query("select * from `travel_demo.order_in_7_days` limit 1000").to_dataframe()
print(tabulate(view_df.head(5), headers='keys', tablefmt='grid', showindex=False))

+-----------+-------+------------+
| MID       | PID   | date       |
| M22375521 | P112  | 2025-09-16 |
+-----------+-------+------------+
| M61134922 | P114  | 2025-09-16 |
+-----------+-------+------------+
| M28510495 | P117  | 2025-09-16 |
+-----------+-------+------------+
| M39955645 | P118  | 2025-09-16 |
+-----------+-------+------------+
| M19579881 | P130  | 2025-09-16 |
+-----------+-------+------------+
