# Gold Semantic Layer

This notebook transforms raw financial article data into a star schema with:
- **Fact Table**: Company event signals with metrics
- **Dimension Tables**: Companies and event types

The pipeline is idempotent and can be safely re-run.

## Configuration

In [None]:
# Catalog configuration
CATALOG_NAME = "stage-us-east-1-jngai-dev"
SOURCE_NAMESPACE = "financial_articles"
SOURCE_TABLE = "fmp_articles"
TARGET_NAMESPACE = "gold_semantic_layer"

# Table names
FACT_TABLE = "fact_company_event_signal"
DIM_COMPANY_TABLE = "dim_company"
DIM_EVENT_TYPE_TABLE = "dim_event_type"

## Startup Cells

In [None]:
import os
os.environ['DataZoneProjectId'] = '4hril53mejrp2f'
os.environ['DataZoneDomainId'] = 'dzd-5w47wlphwxsdev'
os.environ['DataZoneEnvironmentId'] = 'dcza2emsroy8br'
os.environ['DataZoneDomainRegion'] = 'us-east-1'

_resource_metadata = None

def _get_resource_metadata():
    global _resource_metadata
    if _resource_metadata is None:
        _resource_metadata = {
            "AdditionalMetadata": {
                "DataZoneProjectId": "4hril53mejrp2f",
                "DataZoneDomainId": "dzd-5w47wlphwxsdev",
                "DataZoneEnvironmentId": "dcza2emsroy8br",
                "DataZoneDomainRegion": "us-east-1",
            }
        }
    return _resource_metadata

metadata = _get_resource_metadata()

In [None]:
from typing import Optional

def _set_logging(log_dir: str, log_file: str, log_name: Optional[str] = None):
    import os
    import logging
    from logging.handlers import RotatingFileHandler

    level = logging.INFO
    max_bytes = 5 * 1024 * 1024
    backup_count = 5

    try:
        os.makedirs(log_dir, exist_ok=True)
    except Exception:
        log_dir = "/tmp/kernels/"

    os.makedirs(log_dir, exist_ok=True)
    log_path = os.path.join(log_dir, log_file)

    logger = logging.getLogger() if not log_name else logging.getLogger(log_name)
    logger.handlers = []
    logger.setLevel(level)

    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    fh = RotatingFileHandler(filename=log_path, maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8")
    fh.setFormatter(formatter)
    logger.addHandler(fh)
    logger.info(f"Logging initialized for {log_name}.")

_set_logging("/var/log/computeEnvironments/kernel/", "kernel.log")
_set_logging("/var/log/studio/data-notebook-kernel-server/", "metrics.log", "metrics")

In [None]:
import logging
from sagemaker_studio import ClientConfig, sqlutils, sparkutils, dataframeutils

logger = logging.getLogger(__name__)
logger.info("Initializing sparkutils")
spark = sparkutils.init()
logger.info("Finished initializing sparkutils")

In [None]:
def _reset_os_path():
    try:
        import os
        import logging
        logger = logging.getLogger(__name__)
        logger.info("---------Before------")
        logger.info("CWD: %s", os.getcwd())
        logger.info("stat('.'): %s %s", os.stat('.').st_dev, os.stat('.').st_ino)
        logger.info("stat('/home/sagemaker-user'): %s %s", os.stat('/home/sagemaker-user').st_dev, os.stat('/home/sagemaker-user').st_ino)
        os.chdir("/home/sagemaker-user")
        logger.info("---------After------")
        logger.info("CWD: %s", os.getcwd())
        logger.info("stat('.'): %s %s", os.stat('.').st_dev, os.stat('.').st_ino)
        logger.info("stat('/home/sagemaker-user'): %s %s", os.stat('/home/sagemaker-user').st_dev, os.stat('/home/sagemaker-user').st_ino)
    except Exception as e:
        logger.exception(f"Failed to reset working directory: {e}")

_reset_os_path()

## Import Libraries

In [None]:
from pyspark.sql import functions as F, DataFrame
from pyspark.sql.window import Window
import re

## Helper Functions

In [None]:
def classify_event_type(title: str) -> str:
    """Classify event type based on article title."""
    title_lower = title.lower()
    
    if "earnings" in title_lower or "quarter" in title_lower:
        return "earnings"
    elif "price target" in title_lower:
        return "price_target"
    elif "rating" in title_lower or "buy rating" in title_lower:
        return "rating"
    elif "product" in title_lower or "launch" in title_lower:
        return "product"
    else:
        return "other"

def extract_price_target_delta(content: str) -> float:
    """Extract price target percentage change from article content."""
    matches = re.findall(r"\$(\d+\.?\d*)", content)
    if len(matches) >= 2:
        try:
            target = float(matches[0])
            price = float(matches[1])
            if price != 0:
                return (target - price) / price
        except (ValueError, ZeroDivisionError):
            pass
    return None

# Register UDFs
classify_event_type_udf = F.udf(classify_event_type)
extract_price_target_delta_udf = F.udf(extract_price_target_delta, "double")

In [None]:
def write_table_idempotent(df: DataFrame, catalog: str, namespace: str, table_name: str) -> None:
    """Write DataFrame to Iceberg table with overwrite for idempotency.
    
    Args:
        df: Spark DataFrame to write
        catalog: Catalog name
        namespace: Namespace/schema name
        table_name: Table name
    """
    full_table_name = f"`{catalog}`.`{namespace}`.`{table_name}`"
    
    # Check if table exists
    table_exists = spark.catalog.tableExists(f"{catalog}.{namespace}.{table_name}")
    
    if table_exists:
        # Overwrite existing data
        df.writeTo(full_table_name).overwritePartitions()
        print(f"✓ Overwrote table: {full_table_name}")
    else:
        # Create new table
        df.writeTo(full_table_name).using("iceberg").create()
        print(f"✓ Created table: {full_table_name}")

## Extract Data

In [None]:
# Read source data
source_table = f"`{CATALOG_NAME}`.`{SOURCE_NAMESPACE}`.`{SOURCE_TABLE}`"
df_raw = spark.read.table(source_table)

print(f"Loaded {df_raw.count()} records from {source_table}")
df_raw.printSchema()

## Transform Data

In [None]:
# Base transformations
df_transformed = (df_raw
    # Parse datetime
    .withColumn("event_datetime", F.to_timestamp("date"))
    .withColumn("event_date", F.to_date("event_datetime"))
    
    # Extract ticker and exchange
    .withColumn("exchange", F.split("tickers", ":").getItem(0))
    .withColumn("ticker", F.split("tickers", ":").getItem(1))
    
    # Generate stable article ID from link
    .withColumn("article_id", F.abs(F.hash("link")) % F.lit(10**12))
    
    # Classify event type
    .withColumn("event_type", classify_event_type_udf("title"))
    
    # Extract price target delta
    .withColumn("price_target_delta_pct", extract_price_target_delta_udf("content"))
    
    # Add placeholder columns
    .withColumn("earnings_surprise_pct", F.lit(None).cast("double"))
    .withColumn("sentiment_score", F.lit(0.0))
    .withColumn("sentiment_label", F.lit("neutral"))
)

print(f"Transformed {df_transformed.count()} records")

## Create Dimension Tables

In [None]:
# Dimension: Company
dim_company = (df_transformed
    .select("ticker", "exchange")
    .distinct()
    .withColumn("company_id", F.monotonically_increasing_id() + 1)
    .select("company_id", "ticker", "exchange")
)

print(f"Created dim_company with {dim_company.count()} companies")
dim_company.show(5, truncate=False)

In [None]:
# Dimension: Event Type
dim_event_type = (df_transformed
    .select("event_type")
    .distinct()
    .withColumn("event_type_id", F.monotonically_increasing_id() + 1)
    .select("event_type_id", "event_type")
)

print(f"Created dim_event_type with {dim_event_type.count()} event types")
dim_event_type.show(truncate=False)

## Create Fact Table

In [None]:
# Fact: Company Event Signal
fact_company_event_signal = (df_transformed
    # Join with company dimension
    .join(dim_company, ["ticker", "exchange"], "left")
    
    # Join with event type dimension
    .join(dim_event_type, ["event_type"], "left")
    
    # Add event ID
    .withColumn("event_id", F.monotonically_increasing_id() + 1)
    
    # Select final columns
    .select(
        "event_id",
        "company_id",
        "event_type_id",
        "article_id",
        "event_date",
        "sentiment_score",
        "sentiment_label",
        "earnings_surprise_pct",
        "price_target_delta_pct",
        "site"
    )
)

print(f"Created fact_company_event_signal with {fact_company_event_signal.count()} events")
fact_company_event_signal.show(5, truncate=False)

## Data Quality Checks

In [None]:
# Check for null company_ids in fact table
null_company_ids = fact_company_event_signal.filter(F.col("company_id").isNull()).count()
if null_company_ids > 0:
    print(f"⚠️  Warning: {null_company_ids} events have null company_id")

# Check for null event_type_ids in fact table
null_event_type_ids = fact_company_event_signal.filter(F.col("event_type_id").isNull()).count()
if null_event_type_ids > 0:
    print(f"⚠️  Warning: {null_event_type_ids} events have null event_type_id")

# Check for duplicate event_ids
duplicate_events = fact_company_event_signal.groupBy("event_id").count().filter(F.col("count") > 1).count()
if duplicate_events > 0:
    print(f"⚠️  Warning: {duplicate_events} duplicate event_ids found")

print("✓ Data quality checks complete")

## Write to Gold Layer

In [None]:
# Write dimension tables
write_table_idempotent(dim_company, CATALOG_NAME, TARGET_NAMESPACE, DIM_COMPANY_TABLE)
write_table_idempotent(dim_event_type, CATALOG_NAME, TARGET_NAMESPACE, DIM_EVENT_TYPE_TABLE)

# Write fact table
write_table_idempotent(fact_company_event_signal, CATALOG_NAME, TARGET_NAMESPACE, FACT_TABLE)

print("\n=== Gold Semantic Layer Created Successfully ===")
print(f"Catalog: {CATALOG_NAME}")
print(f"Namespace: {TARGET_NAMESPACE}")
print(f"Tables: {FACT_TABLE}, {DIM_COMPANY_TABLE}, {DIM_EVENT_TYPE_TABLE}")

## Verification Queries

In [None]:
# Verify table counts
print("Table Counts:")
for table in [FACT_TABLE, DIM_COMPANY_TABLE, DIM_EVENT_TYPE_TABLE]:
    full_name = f"`{CATALOG_NAME}`.`{TARGET_NAMESPACE}`.`{table}`"
    count = spark.table(full_name).count()
    print(f"  {table}: {count} rows")

In [None]:
# Sample query: Top companies by event count
print("\nTop 10 Companies by Event Count:")
query = f"""
SELECT 
    c.ticker,
    c.exchange,
    COUNT(*) as event_count
FROM `{CATALOG_NAME}`.`{TARGET_NAMESPACE}`.`{FACT_TABLE}` f
JOIN `{CATALOG_NAME}`.`{TARGET_NAMESPACE}`.`{DIM_COMPANY_TABLE}` c
    ON f.company_id = c.company_id
GROUP BY c.ticker, c.exchange
ORDER BY event_count DESC
LIMIT 10
"""
spark.sql(query).show(truncate=False)

## Shutdown

In [None]:
# Stop spark session
from IPython import get_ipython as _get_ipython
_get_ipython().user_ns["spark"].stop()