In [17]:
import csv
import io
import time
from datetime import datetime
import random
from google.cloud import bigquery, storage
from google.cloud import bigquery_connection_v1 as bq_connection
from google.api_core import exceptions
from google.iam.v1 import iam_policy_pb2, policy_pb2

# --- Configuration ---
PROJECT_ID = "bq-sme-governance-build"
DATASET_ID = "sme_raw_layer"
# We can use us-west1, this is fine.
LOCATION = "us-west1"
# This is the ID for the BQ Connection resource
CONNECTION_ID = "bq-gcs-lab-connection"

# GCS Bucket for external table data.
# !- UPDATE THIS to a globally unique name -!
BUCKET_NAME = f"{PROJECT_ID}-lab-data-source"
# GCS path for the new Iceberg table's data and metadata
ICEBERG_STORAGE_URI = f"gs://{BUCKET_NAME}/iceberg_metadata/orders"
# ---------------------

# Initialize clients
bq_client = bigquery.Client(project=PROJECT_ID)
storage_client = storage.Client(project=PROJECT_ID)
connection_client = bq_connection.ConnectionServiceClient()

# --- Mock Data Generation ---

def get_mock_customers():
    """Generates mock data for the 'customers' table."""
    return [
        {"customer_id": "C1001", "first_name": "Alice", "last_name": "Smith", "email": "alice@example.com", "join_date": "2023-01-15"},
        {"customer_id": "C1002", "first_name": "Bob", "last_name": "Johnson", "email": "bob@example.com", "join_date": "2023-02-10"},
        {"customer_id": "C1003", "first_name": "Charlie", "last_name": "Brown", "email": "charlie@example.com", "join_date": "2023-03-05"},
        {"customer_id": "C1004", "first_name": "David", "last_name": "Lee", "email": "david@example.com", "join_date": "2023-04-20"},
        {"customer_id": "C1005", "first_name": "Eve", "last_name": "Davis", "email": "eve@example.com", "join_date": "2023-05-15"},
    ]

def get_mock_products():
    """Generates mock data for the 'products' table."""
    return [
        {"product_id": "P2001", "product_name": "Laptop", "category": "Electronics", "unit_price": 1200.00},
        {"product_id": "P2002", "product_name": "Mouse", "category": "Electronics", "unit_price": 25.50},
        {"product_id": "P2003", "product_name": "Coffee Mug", "category": "Homeware", "unit_price": 15.00},
        {"product_id": "P2004", "product_name": "Notebook", "category": "Stationery", "unit_price": 5.75},
        {"product_id": "P9999", "product_name": "Test Item", "category": "UNKNOWN", "unit_price": -1.00},
    ]

def get_mock_orders():
    """Generates mock data for 'orders' as a list of dicts."""
    return [
        {"order_id": "E101", "customer_id": "C1001", "order_date": "2024-05-01", "status": "Shipped"},
        {"order_id": "E102", "customer_id": "C1002", "order_date": "2024-05-03", "status": "Processing"},
        {"order_id": "E103", "customer_id": "C1001", "order_date": "2024-05-04", "status": "Shipped"},
        {"order_id": "E104", "customer_id": "C1003", "order_date": "2024-05-05", "status": "Delivered"},
        {"order_id": "E105", "customer_id": "C1004", "order_date": "2024-05-06", "status": "Shipped"},
        {"order_id": "E106", "customer_id": "C9999", "order_date": "2024-05-07", "status": "Pending"},
    ]

def get_mock_order_items_csv():
    """Generates mock data for 'order_items' as a CSV string."""
    data = [
        ["item_id", "order_id", "product_id", "quantity"],
        ["OI301", "E101", "P2001", 1],
        ["OI32", "E101", "P2002", 1],
        ["OI303", "E102", "P2003", 2],
        ["OI304", "E103", "P2004", 5],
        ["OI305", "E104", "P2001", 1],
        ["OI306", "E105", "P2003", 1],
        ["OI307", "E106", "P9999", 99],
    ]
    output = io.StringIO()
    writer = csv.writer(output)
    writer.writerows(data)
    return output.getvalue()
# --- End of Mock Data Generation ---


# --- Cloud Resource Setup ---

def ensure_gcs_bucket_exists():
    """Checks for GCS bucket and creates it if not found."""
    print(f"Checking for GCS bucket: {BUCKET_NAME}...")
    try:
        bucket = storage_client.get_bucket(BUCKET_NAME)
        print("...bucket already exists.")
    except exceptions.NotFound:
        print("...bucket not found, creating new bucket.")
        bucket = storage_client.create_bucket(BUCKET_NAME, location=LOCATION)
        # Add a short delay to help with eventual consistency
        print(f"...created bucket {bucket.name} in {bucket.location}. Waiting 5s...")
        time.sleep(5)
    return bucket

def ensure_bq_dataset_exists():
    """Checks for BQ dataset and creates it if not found."""
    dataset_ref = bq_client.dataset(DATASET_ID)
    print(f"Checking for BigQuery dataset: {DATASET_ID}...")
    try:
        bq_client.get_dataset(dataset_ref)
        print("...dataset already exists.")
    except exceptions.NotFound:
        print("...dataset not found, creating new dataset.")
        dataset = bigquery.Dataset(dataset_ref)
        dataset.location = LOCATION
        bq_client.create_dataset(dataset, timeout=30)
        print(f"...created dataset in {dataset.location}.")

def get_or_create_bq_connection():
    """Creates or gets a BQ Connection for GCS. Returns its service_account_id."""
    parent = f"projects/{PROJECT_ID}/locations/{LOCATION}"
    connection_full_name = f"{parent}/connections/{CONNECTION_ID}"

    print(f"Checking for BQ Connection: {CONNECTION_ID}...")
    try:
        # Check if connection exists
        connection = connection_client.get_connection(
            request=bq_connection.GetConnectionRequest(name=connection_full_name)
        )
        print("...connection already exists.")
    except exceptions.NotFound:
        print("...connection not found, creating new connection.")
        # Create a connection for GCS (CLOUD_RESOURCE)
        connection_request = bq_connection.CreateConnectionRequest(
            parent=parent,
            connection_id=CONNECTION_ID,
            connection=bq_connection.Connection(
                cloud_resource=bq_connection.CloudResourceProperties()
            ),
        )
        connection = connection_client.create_connection(request=connection_request)
        print("...connection created. Waiting 10s for SA to be created...")
        # Add a delay to allow the service account to be created before we try to use it
        time.sleep(10)
    except Exception as e:
        print(f"Error: Could not get or create BQ Connection.")
        print("Please ensure the BigQuery Connection API is enabled.")
        raise e

    # Extract the service account ID
    if not connection.cloud_resource.service_account_id:
        print("...ERROR: Connection created but service_account_id is missing.")
        raise ValueError("Connection service account ID is empty")

    print(f"...Connection Service Account: {connection.cloud_resource.service_account_id}")
    return connection.name, connection.cloud_resource.service_account_id

def grant_gcs_permissions_to_sa(bucket_name, service_account_email, max_retries=5, delay_seconds=10):
    """
    Grants Storage Object Admin role to the connection's service account.
    Includes retry logic to handle IAM propagation delay.
    """
    print(f"Granting GCS permissions to: {service_account_email}...")

    try:
        bucket = storage_client.get_bucket(bucket_name)
    except Exception as e:
        print(f"Failed to get bucket {bucket_name}: {e}")
        raise

    role = "roles/storage.objectAdmin"
    member = f"serviceAccount:{service_account_email}"

    for attempt in range(max_retries):
        try:
            policy = bucket.get_iam_policy(requested_policy_version=3)

            # Check if the binding already exists
            binding_exists = False
            for binding in policy.bindings:
                if binding["role"] == role and member in binding["members"]:
                    print("...IAM binding already exists.")
                    binding_exists = True
                    break

            if binding_exists:
                return # Already done, success!

            # Add the new binding to the policy if it doesn't exist
            print("...IAM binding not found, adding it.")
            policy.bindings.append({"role": role, "members": {member}})

            bucket.set_iam_policy(policy)
            print(f"...Successfully granted role {role} on bucket {bucket_name}.")
            return # Success

        except exceptions.BadRequest as e:
            # This is the error GCS API returns for a non-existent SA
            if "does not exist" in str(e).lower():
                print(f"...Attempt {attempt + 1} of {max_retries} failed: IAM service account not yet propagated.")
                if attempt < max_retries - 1:
                    print(f"...Waiting {delay_seconds}s for IAM propagation...")
                    time.sleep(delay_seconds)
                else:
                    print("...Max retries reached. IAM grant failed.")
                    raise e
            else:
                # Some other bad request
                print(f"Error granting IAM permissions to service account: {e}")
                raise e
        except Exception as e:
            # Other unexpected errors
            print(f"An unexpected error occurred during IAM grant (Attempt {attempt + 1}): {e}")
            if attempt >= max_retries - 1:
                raise e # Re-raise last error
            time.sleep(delay_seconds)


def upload_to_gcs(bucket, blob_name, data, content_type, max_retries=3, delay_seconds=5):
    """
    Uploads a string or bytes as a file to GCS.
    Includes retries to handle eventual consistency on bucket creation.
    """
    print(f"Uploading {blob_name} to GCS bucket {bucket.name}...")

    for attempt in range(max_retries):
        try:
            blob = bucket.blob(blob_name)
            blob.upload_from_string(data, content_type=content_type)
            print("...upload complete.")
            return f"gs://{bucket.name}/{blob_name}"
        except exceptions.NotFound as e:
            print(f"...Attempt {attempt + 1} of {max_retries} failed (404 NotFound).")
            if attempt < max_retries - 1:
                print(f"...Waiting {delay_seconds}s for GCS to propagate bucket creation...")
                time.sleep(delay_seconds)
            else:
                print("...Max retries reached. Upload failed.")
                raise e
        except Exception as e:
            print(f"An unexpected error occurred during upload: {e}")
            raise e

# --- BigQuery Table Creation ---

def load_table_from_memory(table_id, data, schema):
    """Loads data from a list of dicts into a new BQ table."""
    full_table_id = f"{PROJECT_ID}.{DATASET_ID}.{table_id}"
    print(f"Starting load job for table: {full_table_id}...")

    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    )

    try:
        load_job = bq_client.load_table_from_json(
            data, full_table_id, job_config=job_config
        )
        load_job.result()
        print("...load job finished.")
    except Exception as e:
        print(f"Error loading table {full_table_id}: {e}")

def create_and_load_external_iceberg_table(table_id, data, schema, connection_name, storage_uri):
    """Creates an External Iceberg table (BigLake) and inserts data."""
    full_table_id_sql_safe = f"`{PROJECT_ID}`.`{DATASET_ID}`.`{table_id}`"
    print(f"Creating External Iceberg table: {full_table_id_sql_safe}...")

    try:
        # 1. Generate the DDL for the Iceberg table definition
        schema_sql_parts = []
        for field in schema:
            field_sql = f"`{field.name}` {field.field_type}"
            if field.mode == "REQUIRED":
                field_sql += " NOT NULL"
            schema_sql_parts.append(field_sql)
        schema_sql = ", ".join(schema_sql_parts)

        # This DDL query creates the managed Iceberg table explicitly
        ddl_query = f"""
        CREATE OR REPLACE TABLE {full_table_id_sql_safe}
        (
            {schema_sql}
        )
        WITH CONNECTION `{connection_name}`
        OPTIONS(
            table_format = 'ICEBERG',
            file_format = 'PARQUET',
            storage_uri = '{storage_uri}'
        )
        """

        print("...executing DDL to create table definition:")

        # 2. Execute the DDL query to create the table structure
        query_job = bq_client.query(ddl_query)
        query_job.result()
        print("...External Iceberg table definition created.")

        # 3. Load data using an INSERT INTO ... VALUES ... query
        print("...preparing INSERT query to load data.")
        rows_to_insert = []
        for row in data:
            # Format values for SQL (e.g., strings in quotes, dates in quotes)
            values = [
                f"'{row['order_id']}'",
                f"'{row['customer_id']}'",
                f"DATE('{row['order_date']}')",
                f"'{row['status']}'"
            ]
            rows_to_insert.append(f"({', '.join(values)})")

        insert_query = f"""
        INSERT INTO {full_table_id_sql_safe}
        (order_id, customer_id, order_date, status)
        VALUES {', '.join(rows_to_insert)}
        """

        print("...executing INSERT query.")
        query_job = bq_client.query(insert_query)
        query_job.result()  # Wait for the insert to complete
        print("...data loaded into Iceberg table.")

    except Exception as e:
        print(f"Error creating or loading Iceberg table {full_table_id_sql_safe}: {e}")
        raise e


def create_external_table(table_id, schema, gcs_uri, format="CSV"):
    """Creates a new BQ external table pointing to a GCS file (CSV)."""
    full_table_id = f"{PROJECT_ID}.{DATASET_ID}.{table_id}"
    print(f"Creating external {format} table: {full_table_id}...")

    if format == "CSV":
        external_config = bigquery.ExternalConfig("CSV")
        external_config.source_uris = [gcs_uri]
        external_config.schema = schema
        external_config.csv_options.skip_leading_rows = 1
    else:
        raise ValueError(f"Unsupported external format: {format}")

    table = bigquery.Table(full_table_id)
    table.external_data_configuration = external_config

    try:
        bq_client.delete_table(full_table_id, not_found_ok=True)
        print(f"...deleted existing table (if any).")
        bq_client.create_table(table)
        print("...external table created.")
    except Exception as e:
        print(f"Error creating external table {full_table_id}: {e}")

# --- Main Execution ---

def main():
    print(f"Starting data setup for project: {PROJECT_ID}\n")

    try:
        # 1. Create GCS and BQ resources
        bucket = ensure_gcs_bucket_exists()
        ensure_bq_dataset_exists()

        # 2. Create BQ Connection and grant it permissions on the bucket
        connection_name, sa_email = get_or_create_bq_connection()
        grant_gcs_permissions_to_sa(bucket.name, sa_email)

    except Exception as e:
        print(f"Failed to create prerequisite cloud resources: {e}")
        print("Please check your permissions and project configuration.")
        return

    print("\n--- Step 1: Handling Internal Tables (Load Jobs) ---")

    # 3. 'customers' (Standard Table)
    customers_schema = [
        bigquery.SchemaField("customer_id", "STRING", "REQUIRED"),
        bigquery.SchemaField("first_name", "STRING"),
        bigquery.SchemaField("last_name", "STRING"),
        bigquery.SchemaField("email", "STRING"),
        bigquery.SchemaField("join_date", "DATE"),
    ]
    customers_data = get_mock_customers()
    load_table_from_memory("customers", customers_data, customers_schema)

    # 4. 'products' (Standard Table)
    products_schema = [
        bigquery.SchemaField("product_id", "STRING", "REQUIRED"),
        bigquery.SchemaField("product_name", "STRING"),
        bigquery.SchemaField("category", "STRING"),
        bigquery.SchemaField("unit_price", "FLOAT"),
    ]
    products_data = get_mock_products()
    load_table_from_memory("products", products_data, products_schema)

    print("\n--- Step 2: Handling External & BigLake Tables ---")

    # 5. 'orders' (External Iceberg Table via BigLake)
    orders_schema = [
        bigquery.SchemaField("order_id", "STRING", "REQUIRED"),
        bigquery.SchemaField("customer_id", "STRING"),
        bigquery.SchemaField("order_date", "DATE"),
        bigquery.SchemaField("status", "STRING"),
    ]
    orders_data = get_mock_orders()
    # Pass the connection name and storage URI to the creation function
    create_and_load_external_iceberg_table(
        "orders",
        orders_data,
        orders_schema,
        connection_name,
        ICEBERG_STORAGE_URI
    )

    # 6. Upload 'order_items' data to GCS
    order_items_csv_data = get_mock_order_items_csv()
    order_items_gcs_uri = upload_to_gcs(
        bucket,
        "raw/order_items/order_items.csv",
        order_items_csv_data,
        content_type="text/csv"
    )

    # 7. Create 'order_items' external CSV table
    order_items_schema = [
        bigquery.SchemaField("item_id", "STRING", "REQUIRED"),
        bigquery.SchemaField("order_id", "STRING"),
        bigquery.SchemaField("product_id", "STRING"),
        bigquery.SchemaField("quantity", "INTEGER"),
    ]
    create_external_table("order_items", order_items_schema, order_items_gcs_uri, format="CSV")

    print("\n--- Setup Complete! ---")
    print(f"Project: {PROJECT_ID}")
    print(f"Dataset: {DATASET_ID}")
    print("Tables created:")
    print(" - customers (Standard BQ Table)")
    print(" - products (Standard BQ Table)")
    print(" - orders (External Iceberg/BigLake Table)")
    print(" - order_items (External CSV Table)")

# --- Cleanup Function ---

def cleanup_resources():
    """
    Deletes all created resources for a clean teardown.
    WARNING: This is destructive and irreversible.
    """
    print("\n--- STARTING RESOURCE CLEANUP ---")

    # 1. Delete the GCS Bucket and its contents
    print(f"Attempting to delete GCS Bucket: {BUCKET_NAME}...")
    try:
        bucket = storage_client.get_bucket(BUCKET_NAME)
        print("...deleting all blobs in bucket...")
        # This will also delete the Iceberg metadata/data files
        blobs = list(bucket.list_blobs(force=True))
        for blob in blobs:
            try:
                blob.delete()
            except Exception as e:
                print(f"...warning: could not delete blob {blob.name}: {e}")
        print(f"...deleted {len(blobs)} blobs.")

        # Wait a moment for deletes to propagate
        time.sleep(2)

        bucket.delete(force=True)
        print(f"...bucket {BUCKET_NAME} deleted successfully.")
    except exceptions.NotFound:
        print(f"...bucket {BUCKET_NAME} not found, skipping.")
    except Exception as e:
        print(f"Error deleting bucket {BUCKET_NAME}: {e}")

    # 2. Delete the BigQuery Dataset and its contents
    # This will also delete the 'orders' table definition
    print(f"Attempting to delete BigQuery Dataset: {DATASET_ID}...")
    try:
        bq_client.delete_dataset(
            DATASET_ID, delete_contents=True, not_found_ok=True
        )
        print(f"...dataset {DATASET_ID} deleted successfully.")
    except exceptions.NotFound:
        print(f"...dataset {DATASET_ID} not found, skipping.")
    except Exception as e:
        print(f"Error deleting dataset {DATASET_ID}: {e}")

    # 3. Delete the BQ Connection
    parent = f"projects/{PROJECT_ID}/locations/{LOCATION}"
    connection_full_name = f"{parent}/connections/{CONNECTION_ID}"
    print(f"Attempting to delete BQ Connection: {CONNECTION_ID}...")
    try:
        connection_client.delete_connection(
            request=bq_connection.DeleteConnectionRequest(name=connection_full_name)
        )
        print(f"...connection {CONNECTION_ID} deleted successfully.")
    except exceptions.NotFound:
        print(f"...connection {CONNECTION_ID} not found, skipping.")
    except Exception as e:
        print(f"Error deleting connection {CONNECTION_ID}: {e}")

    print("--- CLEANUP COMPLETE ---")


if __name__ == "__main__":
    # --- Main execution ---
    main()

    # --- To run cleanup ---
    #cleanup_resources()



Starting data setup for project: bq-sme-governance-build

Checking for GCS bucket: bq-sme-governance-build-lab-data-source...
...bucket already exists.
Checking for BigQuery dataset: sme_raw_layer...
...dataset not found, creating new dataset.
...created dataset in us-west1.
Checking for BQ Connection: bq-gcs-lab-connection...
...connection not found, creating new connection.
...connection created. Waiting 10s for SA to be created...
...Connection Service Account: bqcx-184517388310-8nih@gcp-sa-bigquery-condel.iam.gserviceaccount.com
Granting GCS permissions to: bqcx-184517388310-8nih@gcp-sa-bigquery-condel.iam.gserviceaccount.com...
...IAM binding not found, adding it.
...Successfully granted role roles/storage.objectAdmin on bucket bq-sme-governance-build-lab-data-source.

--- Step 1: Handling Internal Tables (Load Jobs) ---
Starting load job for table: bq-sme-governance-build.sme_raw_layer.customers...
...load job finished.
Starting load job for table: bq-sme-governance-build.sme_raw