### Load CSV data to BigQuery Staging Tables

> This script run daily to read csv file in data/<br>
> On each completion, csv files will be moved to data_backup/

In [1]:
import os
import re
import sys
import shutil
import logging
import pandas as pd
from datetime import datetime
from google.cloud import bigquery
from google.cloud.exceptions import NotFound

# ------------------------------------------------------------------------------
# Paths and Settings
# ------------------------------------------------------------------------------
csv_folder = "../data"
backup_folder = "../data_backup"
log_folder = "../logs"
project_id = "olist-ecommerce-454812"  # or "horace-integration-453108"
staging_dataset = "olist_data_staging"

# Create backup folder, if it exists, skipped
if not os.path.exists(backup_folder):
    os.makedirs(backup_folder)

# Create logs folder, if it exists, skipped
if not os.path.exists(log_folder):
    os.makedirs(log_folder)

# ------------------------------------------------------------------------------
# Configure Logging
# ------------------------------------------------------------------------------
log_date = datetime.today().strftime('%Y_%m_%d')
log_filename = os.path.join(log_folder, f"csv_to_staging_{log_date}.log")

logging.basicConfig(
    filename=log_filename,
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)

# ------------------------------------------------------------------------------
# BigQuery Client + Connectivity Check
# ------------------------------------------------------------------------------
try:
    bq_client = bigquery.Client(project=project_id)
    # Attempt to list datasets as a connectivity check
    _ = list(bq_client.list_datasets())
    logging.info("Connected to BigQuery successfully.")
except Exception as e:
    logging.error(f"Failed to connect to BigQuery: {e}")
    sys.exit(1)

# ------------------------------------------------------------------------------
# Regex to match CSV files like: <filename>_YYYY_MM_DD.csv
# ------------------------------------------------------------------------------
pattern = re.compile(r"^(.*)_\d{4}_\d{2}_\d{2}\.csv$")

# ------------------------------------------------------------------------------
# Process Each CSV File
# ------------------------------------------------------------------------------
for file in os.listdir(csv_folder):
    file_path = os.path.join(csv_folder, file)
    match = pattern.match(file)
    if match:
        base_table_name = match.group(1)
        logging.info(f"Processing file: {file} -> Target table: {base_table_name}")
        
        # Read CSV
        try:
            df = pd.read_csv(file_path)
            logging.info(f"Read CSV with shape: {df.shape}")
        except Exception as e:
            logging.error(f"Error reading {file}: {e}")
            continue
        
        # Skip empty DataFrame
        if df.empty:
            logging.info(f"File {file} is empty. Skipping ingestion and moving to backup.")
            try:
                shutil.move(file_path, os.path.join(backup_folder, file))
                logging.info(f"Moved empty file {file} to backup folder.")
            except Exception as move_err:
                logging.error(f"Error moving empty file {file} to backup folder: {move_err}")
            continue
        
        # ------------------------------------------------------------------------------
        # Add a timestamp column (Python datetime) for BigQuery
        # ------------------------------------------------------------------------------
        df['last_updated'] = datetime.now()  # Python datetime object
        
        # Define BigQuery table ID
        table_id = f"{project_id}.{staging_dataset}.{base_table_name}"
        
        # Check if table exists
        try:
            bq_client.get_table(table_id)
            write_disposition = bigquery.WriteDisposition.WRITE_APPEND
            logging.info(f"BigQuery table {table_id} exists; appending data.")
        except NotFound:
            write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
            logging.info(f"BigQuery table {table_id} does not exist; it will be created.")
        
        # Configure load job
        job_config = bigquery.LoadJobConfig(
            create_disposition=bigquery.CreateDisposition.CREATE_IF_NEEDED,
            write_disposition=write_disposition,
            autodetect=True,
        )
        
        # Load data into BigQuery
        try:
            load_job = bq_client.load_table_from_dataframe(df, table_id, job_config=job_config)
            load_job.result()  # Wait for the job to complete
            logging.info(f"Data successfully loaded into {table_id}.")
        except Exception as e:
            logging.error(f"Error loading data into BigQuery table {table_id}: {e}")
            continue
        
        # Move processed file to backup folder
        try:
            shutil.move(file_path, os.path.join(backup_folder, file))
            logging.info(f"Moved {file} to backup folder.")
        except Exception as e:
            logging.error(f"Error moving {file} to backup folder: {e}")

logging.info("CSV import process completed.")
