In [0]:
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "mfg_agent_bricks_demo")
dbutils.widgets.text("uc_volume", "finance_unstructured_data")

UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {UC_CATALOG}.{UC_SCHEMA}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {UC_CATALOG}.{UC_SCHEMA}.{UC_VOLUME}")
spark.sql(f"USE CATALOG {UC_CATALOG}")
spark.sql(f"USE SCHEMA {UC_SCHEMA}")

### Copy Finance data into new volume

In [0]:
# src_base = "/Volumes/main/fins_agent_bricks_demo/magnificent_seven_unstructured_data"
# dst_base = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}"

# folders = ["10k", "10q", "annual_reports", "call_transcripts", "earning_releases"]
# # Copy each folder recursively
# for f in folders:
#     dbutils.fs.cp(f"{src_base}/{f}", f"{dst_base}/{f}", recurse=True)

### Build synthetic UC Tables

In [0]:
%pip install Faker

In [0]:
# Run in a Databricks Python notebook
# %pip install Faker

from faker import Faker
from datetime import date, timedelta
import random

fake = Faker(); random.seed(42)

# Canonical dimensions
skus = ["ACC-GPU", "SOC-CPU", "MCU-EDGE"]
families = {"ACC-GPU":"Accelerator","SOC-CPU":"Compute","MCU-EDGE":"Embedded"}
regions = ["NA","EMEA","APJ"]
dcs = ["RNO","DFW","AMS"]
suppliers = [("SUP1","Atlas Substrates"),("SUP2","Beta Foundry"),("SUP3","Cobalt Assembly"),("SUP4","Delta Memory")]
dists = [("D1","GlobalTech NA","NA","dist"),("D2","EuroParts EMEA","EMEA","dist"),("D3","AsiaLogic APJ","APJ","dist")]

start = date(2025,1,1)
days_since = (date.today() - start).days
days = [start + timedelta(d) for d in range(days_since)]

# Supply Chain: suppliers
spark.createDataFrame([(sid,name,random.choice(["1","2"]), random.choice(["US","DE","TW","MX"])) for sid,name in suppliers],
                      "supplier_id string, supplier_name string, tier string, country string") \
     .write.mode("overwrite").saveAsTable("suppliers")

# Supply Chain: inventory positions
inv = [(dc, sku, d, random.randint(120,420), 150) for dc in dcs for sku in skus for d in days]
spark.createDataFrame(inv,"dc_id string, sku string, as_of_date date, on_hand_units int, safety_stock int") \
     .write.mode("overwrite").saveAsTable("inventory_positions")

# Supply Chain: demand forecast daily
fc = [(sku, random.choice(regions), random.choice(dcs), d, random.randint(20,120)) for sku in skus for d in days]
spark.createDataFrame(fc,"sku string, region string, dc_id string, demand_date date, forecast_units int") \
     .write.mode("overwrite").saveAsTable("demand_forecast_daily")

# Supply Chain: supply plan inbound
sp = []
for d in days[::3]:
    for sku in skus:
        sid = random.choice(suppliers)[0]
        dc = random.choice(dcs)
        units = random.randint(50,220)
        sp.append((sku, sid, f"PO_{sid}_{sku}_{d}", dc, d, d + timedelta(random.randint(5,18)), units))
spark.createDataFrame(sp,"sku string, supplier_id string, po_id string, dc_id string, ship_date date, eta_date date, inbound_units int") \
     .write.mode("overwrite").saveAsTable("supply_plan_inbound")

# Finance: product master
spark.createDataFrame([(sku, families[sku], date(2023,10,1) if sku=="ACC-GPU" else date(2020,6,1) if sku=="SOC-CPU" else date(2024,9,1), 
                        "premium" if sku in ["ACC-GPU","MCU-EDGE"] else "standard")
                       for sku in skus],
                      "sku string, product_family string, launch_date date, price_tier string") \
     .write.mode("overwrite").saveAsTable("product_master")

# Finance: distributors
spark.createDataFrame(dists,"distributor_id string, distributor_name string, region string, channel string") \
     .write.mode("overwrite").saveAsTable("distributors")

# Finance: COGS reference
cogs = [(sku, start, float(random.randint(8000,22000))) for sku in skus]
spark.createDataFrame(cogs,"sku string, effective_date date, unit_cogs double") \
     .write.mode("overwrite").saveAsTable("cogs_reference")

# Finance: sales orders
so = []
for d in days:
    for sku in skus:
        units = random.randint(0,30)
        if units:
            dist = random.choice(dists)[0]
            price = random.randint(15000,35000)
            so.append((f"SO_{sku}_{d}", d, sku, dist, random.choice(regions), units, float(price)))
spark.createDataFrame(so,"order_id string, order_date date, sku string, distributor_id string, region string, units int, unit_price double") \
     .write.mode("overwrite").saveAsTable("sales_orders")


### Create Contract PDF-like Delta tables

In [0]:
# Create a table of narrative text that mimics supplier and distributor agreements
docs = []
def rand_bool(p=0.7): return random.random() < p

for sid,_ in suppliers:
    incoterms = random.choice(["FOB","CIF","DAP","EXW"])
    lead = random.randint(7,35)
    sla = round(random.uniform(0.90,0.98),3)
    expedite = rand_bool()
    exp_fee = round(random.uniform(0.02,0.08),3)
    penalty = round(random.uniform(0.01,0.05),3)
    moq = random.choice([10,25,50,100])
    pay = random.choice(["Net 30","Net 45","Net 60"])
    law = random.choice(["Delaware","California","New York","Texas"])
    title = f"Supplier Master Supply Agreement – {sid}"
    body_md = f"""
# {title}
Effective: 2025-01-01  |  Expires: 2026-12-31

- Incoterms: {incoterms}
- Standard Lead Time (days): {lead}
- On-time Delivery SLA: {int(sla*100)}%
- Expedite Allowed: {expedite} (Fee: {int(exp_fee*100)}%)
- Late Delivery Penalty: {int(penalty*100)}%
- Minimum Order Quantity (MOQ): {moq}
- Payment Terms: {pay}
- Governing Law: {law}

Service Levels apply to all Purchase Orders; deviations require written approval and may incur fees or penalties as specified.
"""
    docs.append((f"DOC_{sid}_MSA", "supplier", sid, "MSA", title, body_md, "markdown", date(2025,1,1), date(2026,12,31)))

for did,_,_,_ in dists:
    base_disc = round(random.uniform(0.05,0.18),3)
    rebate = round(random.uniform(0.01,0.05),3)
    mdf = round(random.uniform(0.00,0.03),3)
    returns = random.choice([15,30,45])
    min_commit = random.choice([50,100,200,400])
    special = random.choice(["Quarterly promo on ACC-GPU in NA","No special terms","Extended RMA support on MCU-EDGE"])
    title = f"Distributor Agreement – {did}"
    body_html = f"""
<h1>{title}</h1>
<p>Effective: 2025-01-01 | Expires: 2026-12-31</p>
<ul>
  <li>Base Discount: {int(base_disc*100)}%</li>
  <li>Rebate: {int(rebate*100)}%</li>
  <li>MDF: {int(mdf*100)}%</li>
  <li>Return Window: {returns} days</li>
  <li>Minimum Quarterly Commit: {min_commit} units</li>
  <li>Special Terms: {special}</li>
</ul>
<p>Discounts subject to quarterly true-up and compliance with program guidelines.</p>
"""
    docs.append((f"DOC_{did}_DA", "distributor", did, "DA", title, body_html, "html", date(2025,1,1), date(2026,12,31)))

spark.createDataFrame(docs, """
  doc_id string, entity_type string, entity_id string, doc_type string,
  title string, text string, format string, effective_date date, expiry_date date
""").write.mode("overwrite").saveAsTable("contract_texts")


### Update column-level and table-metadata

In [0]:
def _sql_quote(s: str) -> str:
    # Escape single quotes for SQL literals
    return (s or "").replace("'", "''")

def set_table_and_column_comments(table_name: str, table_comment: str, column_comments: dict):
    """
    table_name: optionally fully-qualified UC name, e.g., catalog.schema.table
    table_comment: table-level description
    column_comments: dict of {column_name: description}
    """
    # Table comment
    spark.sql(f"COMMENT ON TABLE {table_name} IS '{_sql_quote(table_comment)}'")
    # Column comments (use COMMENT ON COLUMN for broad compatibility)
    for col, comment in column_comments.items():
        spark.sql(f"COMMENT ON COLUMN {table_name}.`{col}` IS '{_sql_quote(str(comment))}'")


In [0]:
# suppliers
set_table_and_column_comments(
    "suppliers",
    "Supplier master data for semiconductor supply chain, including tier and country of origin.",
    {
        "supplier_id": "Stable supplier identifier used across supply, finance, and contracts.",
        "supplier_name": "Human-readable supplier legal or trade name.",
        "tier": "Supplier tier classification for risk and sourcing (e.g., 1=primary, 2=secondary).",
        "country": "Primary country associated with supply origin or contracting entity."
    }
)

# inventory_positions
set_table_and_column_comments(
    "inventory_positions",
    "Daily inventory snapshots by distribution center and SKU with safety stock targets.",
    {
        "dc_id": "Distribution center identifier where stock is held.",
        "sku": "Stock-keeping unit code for the semiconductor product.",
        "as_of_date": "Inventory snapshot date at end of business.",
        "on_hand_units": "Physical on-hand inventory units available at the DC on the snapshot date.",
        "safety_stock": "Required minimum inventory level to buffer demand and supply variability."
    }
)

# demand_forecast_daily
set_table_and_column_comments(
    "demand_forecast_daily",
    "Daily demand forecast by SKU, region, and DC used for reconciliation and stockout risk.",
    {
        "sku": "Stock-keeping unit code for the semiconductor product.",
        "region": "Sales or planning region aligned to commercial reporting (e.g., NA, EMEA, APJ).",
        "dc_id": "Distribution center identifier associated with forecast fulfillment.",
        "demand_date": "Planned demand date at daily granularity.",
        "forecast_units": "Forecasted units for the given SKU, region, DC, and date."
    }
)

# supply_plan_inbound
set_table_and_column_comments(
    "supply_plan_inbound",
    "Planned inbound supply by PO, ETA, and destination DC for reconciliation against demand.",
    {
        "sku": "Stock-keeping unit code for the semiconductor product.",
        "supplier_id": "Identifier of the supplier shipping the inbound units.",
        "po_id": "Purchase order identifier for the inbound supply line.",
        "dc_id": "Destination distribution center for the inbound shipment.",
        "ship_date": "Date the shipment departed from the supplier or origin.",
        "eta_date": "Estimated date the shipment will arrive at the destination DC.",
        "inbound_units": "Units planned to arrive on the ETA for the specified SKU and DC."
    }
)

# product_master
set_table_and_column_comments(
    "product_master",
    "Product dimension with family, launch date, and price tier for analytics and reporting.",
    {
        "sku": "Stock-keeping unit code for the semiconductor product.",
        "product_family": "Product family or series categorization (e.g., Accelerator, Compute, Embedded).",
        "launch_date": "First commercial availability date for the SKU.",
        "price_tier": "Relative pricing tier classification (e.g., premium, standard)."
    }
)

# distributors
set_table_and_column_comments(
    "distributors",
    "Distributor dimension including region and channel for sales order attribution.",
    {
        "distributor_id": "Stable distributor identifier used in sales orders and agreements.",
        "distributor_name": "Human-readable distributor or channel partner name.",
        "region": "Primary region of operation for the distributor.",
        "channel": "Sales channel classification, typically dist for distributor."
    }
)

# cogs_reference
set_table_and_column_comments(
    "cogs_reference",
    "Reference table for unit COGS by SKU and effective date used to compute margin.",
    {
        "sku": "Stock-keeping unit code for the semiconductor product.",
        "effective_date": "Date from which the referenced unit COGS becomes applicable.",
        "unit_cogs": "Per-unit cost of goods sold for the SKU effective from the given date."
    }
)

# sales_orders
set_table_and_column_comments(
    "sales_orders",
    "Line-level sales orders with units and realized unit price for revenue calculations.",
    {
        "order_id": "Sales order identifier unique at the line or header level.",
        "order_date": "Booking or fulfillment date associated with the sales order line.",
        "sku": "Stock-keeping unit code for the product sold.",
        "distributor_id": "Identifier of the distributor or channel partner for the order.",
        "region": "Commercial region associated with the order’s destination or customer.",
        "units": "Units sold on the order line.",
        "unit_price": "Realized price per unit for the order line’s SKU."
    }
)

# contract_texts
set_table_and_column_comments(
    "contract_texts",
    "Narrative agreement content (Markdown/HTML) mimicking supplier and distributor terms.",
    {
        "doc_id": "Unique identifier for the agreement-like document.",
        "entity_type": "Type of entity the document references, e.g., supplier or distributor.",
        "entity_id": "Identifier of the supplier or distributor linked to this document.",
        "doc_type": "Agreement subtype, e.g., MSA or DA.",
        "title": "Document title for display in apps or search results.",
        "text": "Raw document content in Markdown or HTML format.",
        "format": "Serialization format of text, e.g., markdown or html.",
        "effective_date": "Date on which the agreement terms become effective.",
        "expiry_date": "Date on which the agreement terms expire absent renewal."
    }
)
