# Gold semantic layer



## Startup cells

In [0]:
# Set environment variables for sagemaker_studio imports

import os
os.environ['DataZoneProjectId'] = '4hril53mejrp2f'
os.environ['DataZoneDomainId'] = 'dzd-5w47wlphwxsdev'
os.environ['DataZoneEnvironmentId'] = 'dcza2emsroy8br'
os.environ['DataZoneDomainRegion'] = 'us-east-1'

# create both a function and variable for metadata access
_resource_metadata = None

def _get_resource_metadata():
    global _resource_metadata
    if _resource_metadata is None:
        _resource_metadata = {
            "AdditionalMetadata": {
                "DataZoneProjectId": "4hril53mejrp2f",
                "DataZoneDomainId": "dzd-5w47wlphwxsdev",
                "DataZoneEnvironmentId": "dcza2emsroy8br",
                "DataZoneDomainRegion": "us-east-1",
            }
        }
    return _resource_metadata
metadata = _get_resource_metadata()

In [0]:
"""
Logging Configuration

Purpose:
--------
This sets up the logging framework for code executed in the user namespace.
"""

from typing import Optional


def _set_logging(log_dir: str, log_file: str, log_name: Optional[str] = None):
    import os
    import logging
    from logging.handlers import RotatingFileHandler

    level = logging.INFO
    max_bytes = 5 * 1024 * 1024
    backup_count = 5

    # fallback to /tmp dir on access, helpful for local dev setup
    try:
        os.makedirs(log_dir, exist_ok=True)
    except Exception:
        log_dir = "/tmp/kernels/"

    os.makedirs(log_dir, exist_ok=True)
    log_path = os.path.join(log_dir, log_file)

    logger = logging.getLogger() if not log_name else logging.getLogger(log_name)
    logger.handlers = []
    logger.setLevel(level)

    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")

    # Rotating file handler
    fh = RotatingFileHandler(filename=log_path, maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8")
    fh.setFormatter(formatter)
    logger.addHandler(fh)

    logger.info(f"Logging initialized for {log_name}.")


_set_logging("/var/log/computeEnvironments/kernel/", "kernel.log")
_set_logging("/var/log/studio/data-notebook-kernel-server/", "metrics.log", "metrics")

In [0]:
import logging
from sagemaker_studio import ClientConfig, sqlutils, sparkutils, dataframeutils

logger = logging.getLogger(__name__)
logger.info("Initializing sparkutils")
spark = sparkutils.init()
logger.info("Finished initializing sparkutils")

In [0]:
def _reset_os_path():
    """
    Reset the process's working directory to handle mount timing issues.
    
    This function resolves a race condition where the Python process starts
    before the filesystem mount is complete, causing the process to reference
    old mount paths and inodes. By explicitly changing to the mounted directory
    (/home/sagemaker-user), we ensure the process uses the correct, up-to-date
    mount point.
    
    The function logs stat information (device ID and inode) before and after
    the directory change to verify that the working directory is properly
    updated to reference the new mount.
    
    Note:
        This is executed at module import time to ensure the fix is applied
        as early as possible in the kernel initialization process.
    """
    try:
        import os
        import logging

        logger = logging.getLogger(__name__)
        logger.info("---------Before------")
        logger.info("CWD: %s", os.getcwd())
        logger.info("stat('.'): %s %s", os.stat('.').st_dev, os.stat('.').st_ino)
        logger.info("stat('/home/sagemaker-user'): %s %s", os.stat('/home/sagemaker-user').st_dev, os.stat('/home/sagemaker-user').st_ino)

        os.chdir("/home/sagemaker-user")

        logger.info("---------After------")
        logger.info("CWD: %s", os.getcwd())
        logger.info("stat('.'): %s %s", os.stat('.').st_dev, os.stat('.').st_ino)
        logger.info("stat('/home/sagemaker-user'): %s %s", os.stat('/home/sagemaker-user').st_dev, os.stat('/home/sagemaker-user').st_ino)
    except Exception as e:
        logger.exception(f"Failed to reset working directory: {e}")

_reset_os_path()

## Notebook

In [0]:
import pandas as pd
import numpy as np
import re
from datetime import datetime

In [0]:
# Read data from S3 Tables using Spark
spark_df = spark.read.table("`stage-us-east-1-jngai-dev`.`financial_articles`.`fmp_articles`")

# Convert to pandas DataFrame
df = spark_df.toPandas()

In [0]:
gold_base = df.copy()

# Parse datetime
gold_base["event_datetime"] = pd.to_datetime(gold_base["date"])
gold_base["event_date"] = gold_base["event_datetime"].dt.date

# Normalize ticker / exchange
gold_base["exchange"] = gold_base["tickers"].str.split(":").str[0]
gold_base["ticker"] = gold_base["tickers"].str.split(":").str[1]

# Article ID (stable hash)
gold_base["article_id"] = (
    gold_base["link"]
    .astype(str)
    .apply(lambda x: abs(hash(x)) % (10 ** 12))
)

In [0]:
def classify_event_type(text):
    text = text.lower()

    if "earnings" in text or "quarter" in text:
        return "earnings"
    if "price target" in text:
        return "price_target"
    if "rating" in text or "buy rating" in text:
        return "rating"
    if "product" in text or "launch" in text:
        return "product"
    return "other"

gold_base["event_type"] = gold_base["title"].apply(classify_event_type)

In [0]:
def extract_price_target_delta(content):
    matches = re.findall(r"\$(\d+\.?\d*)", content)
    if len(matches) >= 2:
        target = float(matches[0])
        price = float(matches[1])
        return (target - price) / price
    return np.nan

gold_base["price_target_delta_pct"] = gold_base["content"].apply(extract_price_target_delta)

In [0]:
gold_base["earnings_surprise_pct"] = np.nan

# Placeholder until NLP sentiment is wired
gold_base["sentiment_score"] = 0.0
gold_base["sentiment_label"] = "neutral"

In [0]:
dim_company = (
    gold_base[["ticker", "exchange"]]
    .drop_duplicates()
    .reset_index(drop=True)
)

dim_company["company_id"] = dim_company.index + 1

In [0]:
dim_event_type = pd.DataFrame({
    "event_type": gold_base["event_type"].unique()
})

In [0]:
fact_company_event_signal = (
    gold_base
    .merge(dim_company, on=["ticker", "exchange"], how="left")
    .assign(event_id=lambda x: range(1, len(x) + 1))
    [[
        "event_id",
        "company_id",
        "article_id",
        "event_date",
        "event_type",
        "sentiment_score",
        "sentiment_label",
        "earnings_surprise_pct",
        "price_target_delta_pct",
        "site"
    ]]
)

In [0]:
print("FACT TABLE")
display(fact_company_event_signal)

print("DIM COMPANY")
display(dim_company)

print("DIM EVENT TYPE")
display(dim_event_type)

FACT TABLE


Unnamed: 0,event_id,company_id,article_id,event_date,event_type,sentiment_score,sentiment_label,earnings_surprise_pct,price_target_delta_pct,site
0,1,1,880318236553,2025-12-11,other,0.0,neutral,,0.0,Financial Modeling Prep
1,2,2,916234972820,2025-12-11,rating,0.0,neutral,,0.096494,Financial Modeling Prep
2,3,3,527658238417,2025-12-11,price_target,0.0,neutral,,-0.06,Financial Modeling Prep


DIM COMPANY


Unnamed: 0,ticker,exchange,company_id
0,CIEN,NYSE,1
1,ESLOY,OTC,2
2,GIS,NYSE,3


DIM EVENT TYPE


Unnamed: 0,event_type
0,other
1,rating
2,price_target


In [0]:
from sagemaker_studio import Project

# Initialize project
proj = Project()

# Define catalog and namespace
catalog_name = "stage-us-east-1-jngai-dev"
namespace = "gold_semantic_layer"

# Create full table names with backticks for proper identifier quoting
fact_table_name = f"`{catalog_name}`.`{namespace}`.`factcompanyeventsignal`"
dim_company_table_name = f"`{catalog_name}`.`{namespace}`.`dimensioncompany`"
dim_event_type_table_name = f"`{catalog_name}`.`{namespace}`.`dimensioneventtype`"

# 1. Create and write FactCompanyEventSignal table
# Extract schema from dataframe
fact_schema_ddl = ", ".join([f"{field.name} {field.dataType.simpleString()}" for field in spark.createDataFrame(fact_company_event_signal).schema.fields])

# Create table with schema
spark.sql(f"CREATE TABLE IF NOT EXISTS {fact_table_name} ({fact_schema_ddl}) USING iceberg")

# Convert pandas to Spark DataFrame and write data
spark.createDataFrame(fact_company_event_signal).writeTo(fact_table_name).append()

print(f"✓ Created table: {fact_table_name}")

# 2. Create and write DimensionCompany table
# Extract schema from dataframe
dim_company_schema_ddl = ", ".join([f"{field.name} {field.dataType.simpleString()}" for field in spark.createDataFrame(dim_company).schema.fields])

# Create table with schema
spark.sql(f"CREATE TABLE IF NOT EXISTS {dim_company_table_name} ({dim_company_schema_ddl}) USING iceberg")

# Convert pandas to Spark DataFrame and write data
spark.createDataFrame(dim_company).writeTo(dim_company_table_name).append()

print(f"✓ Created table: {dim_company_table_name}")

# 3. Create and write DimensionEventType table
# Extract schema from dataframe
dim_event_type_schema_ddl = ", ".join([f"{field.name} {field.dataType.simpleString()}" for field in spark.createDataFrame(dim_event_type).schema.fields])

# Create table with schema
spark.sql(f"CREATE TABLE IF NOT EXISTS {dim_event_type_table_name} ({dim_event_type_schema_ddl}) USING iceberg")

# Convert pandas to Spark DataFrame and write data
spark.createDataFrame(dim_event_type).writeTo(dim_event_type_table_name).append()

print(f"✓ Created table: {dim_event_type_table_name}")

print("\n=== All tables created successfully in S3 Tables (Apache Iceberg format) ===")
print(f"Catalog: {catalog_name}")
print(f"Namespace: {namespace}")
print(f"Tables: FactCompanyEventSignal, DimensionCompany, DimensionEventType")

✓ Created table: `stage-us-east-1-jngai-dev`.`gold_semantic_layer`.`factcompanyeventsignal`


✓ Created table: `stage-us-east-1-jngai-dev`.`gold_semantic_layer`.`dimensioncompany`


✓ Created table: `stage-us-east-1-jngai-dev`.`gold_semantic_layer`.`dimensioneventtype`

=== All tables created successfully in S3 Tables (Apache Iceberg format) ===
Catalog: stage-us-east-1-jngai-dev
Namespace: gold_semantic_layer
Tables: FactCompanyEventSignal, DimensionCompany, DimensionEventType


## Shutdown cells

In [0]:
"""
Stop spark session and associated Athena Spark session
"""

from IPython import get_ipython as _get_ipython
_get_ipython().user_ns["spark"].stop()