In [None]:
# Install BigQuery client library (run this cell once)
%pip install google-cloud-bigquery --quiet
%pip install bigquery-magics --quiet
%pip install db-dtypes --quiet
# Import libraries
from google.cloud import bigquery
import pandas as pd
%load_ext bigquery_magics
import db_dtypes
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "salesforce-465614-2cf9e37da64b.json"
from google.cloud import bigquery
# Initialize BigQuery client
client = bigquery.Client()

<b>Truncate Stage tables before loading from CSV's</b>

In [None]:
%%bqsql
truncate table salesforce-465614.cust_analytics.stage_transactions;
truncate table salesforce-465614.cust_analytics.stage_customers;
truncate table salesforce-465614.cust_analytics.stage_products;
truncate table salesforce-465614.cust_analytics.stage_stores;

Query is running:   0%|          |

<b>Entry in ETL Job Monitoring table</b>

In [None]:
from datetime import datetime
from google.cloud import bigquery

# Set monitoring table and module name
monitoring_table = "salesforce-465614.cust_analytics.etl_job_monitoring"
module_name = "File to Stage Layer"
job_run_date = datetime.utcnow().date()
start_time = datetime.utcnow()
# Insert start record
insert_start = f"""
INSERT INTO `{monitoring_table}` (job_run_date, module_name, start_time, status)
VALUES (DATE('{job_run_date}'), '{module_name}', TIMESTAMP('{start_time}'), 'Running')
"""
client.query(insert_start).result()

  job_run_date = datetime.utcnow().date()
  start_time = datetime.utcnow()


<google.cloud.bigquery.table._EmptyRowIterator at 0x24807336720>

<b>Loading Stage Tables</b>

In [None]:
from google.cloud import bigquery
# Initialize BigQuery client
client = bigquery.Client()
# Define dataset and table names
dataset_id = "cust_analytics"  
file_path = "v2/"
# Mapping of file names to table names
# "DimCustomer.csv": "stage_customers",
# "DimProduct.csv": "stage_products",
# "DimStore.csv": "stage_stores"
file_table_map = {
    "Transactions.csv": "stage_transactions",
    "Customer.csv": "stage_customers",
    "Product.csv": "stage_products",
    "Store.csv": "stage_stores"
}

# Load each CSV into its respective BigQuery table
for file_name, table_name in file_table_map.items():
    table_id = f"{client.project}.{dataset_id}.{table_name}"
    full_file_path = os.path.join(file_path, file_name)
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        autodetect=True,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
    )
    with open(full_file_path, "rb") as source_file:
        load_job = client.load_table_from_file(source_file, table_id, job_config=job_config)
    load_job.result()  # Wait for the job to complete
    print(f"Loaded {file_name} to {table_id}")
try:
    end_time = datetime.utcnow()
    update_success = f"""
    UPDATE `{monitoring_table}`
    SET end_time = TIMESTAMP('{end_time}'), status = 'Success'
    WHERE job_run_date = DATE('{job_run_date}') AND module_name = '{module_name}' AND start_time = TIMESTAMP('{start_time}')
    """
    client.query(update_success).result()
except Exception as e:
    # Update status as Failed
    fail_time = datetime.utcnow()
    update_failed = f"""
    UPDATE `{monitoring_table}`
    SET end_time = TIMESTAMP('{fail_time}'), status = 'Failed'
    WHERE job_run_date = DATE('{job_run_date}') AND module_name = '{module_name}' AND start_time = TIMESTAMP('{start_time}')
    """
    client.query(update_failed).result()
    print("ETL job failed:", e)
    raise

Loaded Transactions.csv to salesforce-465614.cust_analytics.stage_transactions
Loaded Customer.csv to salesforce-465614.cust_analytics.stage_customers
Loaded Product.csv to salesforce-465614.cust_analytics.stage_products
Loaded Store.csv to salesforce-465614.cust_analytics.stage_stores


<b>Data Cleaning before loading to Persistant Tables</b>

In [None]:
# List of stage tables to check
stage_tables = [
    "stage_customers",
    "stage_products",
    "stage_transactions",
    "stage_stores"
]
# Dictionary to store not-null counts for each table
not_null_counts_dict = {}
for table in stage_tables:
    query = f"SELECT * FROM `{client.project}.{dataset_id}.{table}`"
    df = client.query(query).to_dataframe()
    not_null_counts_dict[table] = df.notnull().sum()
# Display not-null counts for each table
for table, counts in not_null_counts_dict.items():
    print(f"Not-null counts for {table}:")
    print(counts)
    print("-" * 40)



Not-null counts for stage_customers:
customer_id        2000
first_name         2000
last_name          2000
gender             2000
age                2000
signup_date        2000
loyalty_program    2000
email              2000
city               2000
state              2000
country            2000
dtype: int64
----------------------------------------
Not-null counts for stage_products:
product_id      200
product_name    200
category        200
sub_category    200
price           200
dtype: int64
----------------------------------------
Not-null counts for stage_transactions:
Transaction_id    20000
Customer_ID       20000
Product_id        20000
Store_Location    20000
Purchase_Date     20000
Product_Type      20000
Unit_Price        20000
Quantity          20000
Total_Price       20000
Payment_Method    20000
Channel           20000
Order_Status      20000
Loyalty_Member    20000
Age               20000
Gender            20000
City              20000
dtype: int64
------------------

In [None]:
# Script to delete duplicates in each stage table based on all columns

stage_tables = [
    "stage_customers",
    "stage_products",
    "stage_transactions",
    "stage_stores"
]

for table in stage_tables:
    # Get column names for the table
    table_ref = f"{client.project}.{dataset_id}.{table}"
    schema = client.get_table(table_ref).schema
    columns = [field.name for field in schema]
    # Build partition columns (all columns except row_number)
    col_list = ", ".join(columns)
    # Build delete query using row_number window function
    delete_duplicates_query = f"""
    CREATE OR REPLACE TABLE `{table_ref}` AS
    SELECT * EXCEPT(row_num) FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY {col_list}) as row_num
        FROM `{table_ref}`
    )
    WHERE row_num = 1
    """
    client.query(delete_duplicates_query).result()
    print(f"Duplicates removed from {table}")