## STAGE 3: S3 VECTORS (NEW)
```
├─ key (int64 surrogate)
├─ embedding (1024-d float32 vector)
├─ metadata_filterable (20 bytes: cik_int, report_year, section_id, ...)
├─ metadata_non_filterable (30 bytes: embedding_id, context_hint)
├─ Purpose: Optimized for semantic search with AWS-managed ANN
```

- Stage 1 fact is authoritative, consolidated fact, pure data engineering result.
- Stage 2 meta table REMAINS authoritative for both INFO + Embeddings layer (ML). Stage 2 - Lean table + Meta Fact table.
- Stage 3 S3 Table is a search index projection.



### Execution flow:
1. cache_stage2_meta_table()              ` # Ensure Stage 2 available `
2. cache_embeddings_table()               ` # Ensure embeddings available `
3. build_s3vectors_stage3()               ` # Transform + join → Stage 3 `
4. initialize_s3vectors_table()           ` # Upload to S3 (if INIT=True) `
5. cache_s3vectors_table()                ` # Download/cache locally `



```
1. build_s3vectors_stage3(meta_df, vectors_df, provider)
   └─> Core transformation logic
   
2. cache_s3vectors_table(config, provider, force_recache)
   └─> Download from S3, cache locally
   
3. initialize_s3vectors_table(config, provider, df_stage3, force_reinit)
   └─> Upload to S3, handle overwrites
   
4. validate_s3vectors_schema(df, expected_dims)
   └─> Schema validation helper
```


In [1]:
# PATH VALIDATION: Validate S3 vectors paths from configuration

import sys
from pathlib import Path
sys.path.append(str(Path.cwd().parent / 'loaders'))

from ml_config_loader import MLConfig

config = MLConfig()

print("="*70)
print("CONFIG VALIDATION - S3 VECTORS PATHS")
print("="*70)

# Test 1: Base path
print(f"\n✓ Base path: {config.s3vectors_base_path}")

# Test 2: Provider-specific paths
for provider in config.s3vectors_providers:
    s3_path = config.s3vectors_path(provider)
    dims = config.s3vectors_dimensions(provider)
    print(f"\n✓ Provider: {provider}")
    print(f"  S3 Path: {s3_path}")
    print(f"  Dimensions: {dims}d")

# Test 3: Auto-detection (None provider)
print(f"\n{'='*70}")
print("AUTO-DETECTION TEST (provider=None)")
print("="*70)
print(f"  Default model: {config.bedrock_default_model_key}")
print(f"  Auto-detected path: {config.s3vectors_path(None)}")
print(f"  Auto-detected dims: {config.s3vectors_dimensions(None)}d")

# Test 4: Local cache paths
print(f"\n{'='*70}")
print("LOCAL CACHE PATH VALIDATION")
print("="*70)

for provider in config.s3vectors_providers:
    cache_path = config.get_s3vectors_cache_path(provider)
    print(f"\n✓ Provider: {provider}")
    print(f"  Cache: {cache_path}")
    print(f"  Exists: {cache_path.exists()}")

print(f"\n{'='*70}")
print("✓ Phase 1 Config Validation Complete")
print("="*70)


[DEBUG] ✓ AWS credentials loaded from aws_credentials.env
CONFIG VALIDATION - S3 VECTORS PATHS

✓ Base path: ML_EMBED_ASSETS/S3_VECTORS_STAGING

✓ Provider: cohere_1024d
  S3 Path: ML_EMBED_ASSETS/S3_VECTORS_STAGING/cohere_1024d/finrag_embeddings_s3vectors_cohere_1024d.parquet
  Dimensions: 1024d

✓ Provider: titan_1024d
  S3 Path: ML_EMBED_ASSETS/S3_VECTORS_STAGING/titan_1024d/finrag_embeddings_s3vectors_titan_1024d.parquet
  Dimensions: 1024d

AUTO-DETECTION TEST (provider=None)
  Default model: cohere_embed_v4
  Auto-detected path: ML_EMBED_ASSETS/S3_VECTORS_STAGING/cohere_1024d/finrag_embeddings_s3vectors_cohere_1024d.parquet
  Auto-detected dims: 1024d

LOCAL CACHE PATH VALIDATION

✓ Provider: cohere_1024d
  Cache: d:\JoelDesktop folds_24\NEU FALL2025\MLops IE7374 Project\FinSights\ModelPipeline\finrag_ml_tg1\data_cache\stage3_s3vectors\cohere_1024d\finrag_embeddings_s3vectors_cohere_1024d.parquet
  Exists: False

✓ Provider: titan_1024d
  Cache: d:\JoelDesktop folds_24\NEU FALL20

In [None]:
import mmh3

# Test basic functionality
test_hash = mmh3.hash64("test_sentence_id", signed=True)[0]
print(f"✓ mmh3 installed successfully")
print(f"  Test hash: {test_hash}")
print(f"  Type: {type(test_hash)}")

✓ mmh3 installed successfully
  Test hash: -7804031895798801076
  Type: <class 'int'>


### Decisions for S3 Vector table:

```
# Stage 2 → Stage 3 mapping (CONFIRMED):
✓ sentenceID           → sentenceID
✓ cik_int              → cik_int
✓ report_year          → report_year (exists, not 'year')
✓ section_name         → section_name (exists)
✓ sic                  → sic
✓ section_sentence_count → section_sentence_count

# Embeddings table:
✓ embedding_id         → embedding_id
✓ embedding            → embedding

# Derived:
→ sentenceID_numsurrogate (mmh3 hash)
→ sentence_pos (extract from sentenceID with fallback)
```

In [10]:
# ============================================================================
# S3 VECTORS PIPELINE (Stage 3) - Execution Parameters

# - Stage 3 is a JOIN operation (cheap, ~1-2 min)
# - Depends on Stage 1 + Stage 2 (upstream changes)
# - No complex merge logic needed (just rebuild)
# ============================================================================

# Build Stage 3 locally
BUILD_S3VECTORS_TABLE = True      # Create Stage 3 from Stage 2 + Embeddings

# S3 Table Initialization
UPLOAD_TO_S3  = True      # Upload Stage 3 to S3
FORCE_OVERWRITE_S3  = True    # Overwrite existing S3 table

# Local Caching -- Not needed with always rebuild concept. DELETED.
# CACHE_S3VECTORS_LOCALLY = False   # Download from S3 and cache
# FORCE_RECACHE_S3VECTORS = False   # Re-download even if cached

# Provider Selection
S3VECTORS_PROVIDER = "cohere_1024d"  # 'cohere_1024d' or 'titan_1024d'

# ============================================================================
# IMPORTS
# ============================================================================
import polars as pl
import mmh3
import sys
from pathlib import Path
sys.path.append(str(Path.cwd().parent / 'loaders'))

from ml_config_loader import MLConfig
import polars as pl
import mmh3

# Helper function (from DATA PREP)
from botocore.exceptions import ClientError

def check_s3_exists(s3_client, bucket, s3_key):
    """Check if S3 object exists"""
    try:
        s3_client.head_object(Bucket=bucket, Key=s3_key)
        return True
    except ClientError as e:
        if e.response['Error']['Code'] == '404':
            return False
        else:
            raise

def _egress_cost_usd(bytes_count: int) -> float:
    """Estimate S3 egress cost"""
    gb = bytes_count / (1024 * 1024 * 1024)
    return gb * 0.09

# ============================================================================

# 1. Standard: ends with numeric sequence → extract it
# 2. Malformed: no numeric end → default to -1 (flag for investigation)

def extract_sentence_position(sentenceID_series):
    """
    Extract sentence position from sentenceID with fallback
    
    Logic:
    - Split by '_', take last segment
    - If numeric → cast to int16
    - If non-numeric → return -1 (sentinel value)
    
    Sentinel value choice: -1 vs 99 vs NULL
    - -1: Clear indicator of extraction failure, sorts first
    - 99: Ambiguous (could be real position)
    - NULL: Causes filtering issues in S3 Vectors
    
    Recommendation: Use -1
    """
    return (
        pl.col(sentenceID_series)
        .str.split('_')
        .list.last()
        .cast(pl.Int16, strict=False)  # strict=False → NULL on cast failure
        .fill_null(-1)                  # NULL → -1 sentinel
    )



def build_s3vectors_stage3(meta_df, vectors_df, provider, config):
    """
    Transform Stage 2 (meta) + Embeddings → Stage 3 (S3 Vectors ready)
    
    Args:
        meta_df: Stage 2 meta table (34 cols, 1M rows)
        vectors_df: Embeddings table (3 cols, ~900K rows)
        provider: 'cohere_1024d' or 'titan_1024d'
        config: MLConfig instance
    
    Returns:
        DataFrame with 10 columns, ready for S3 Vectors ingestion
    
    Schema:
        sentenceID_numsurrogate (int64)
        sentenceID (varchar)
        embedding (f32[1024])
        cik_int (int32)
        report_year (int16)
        section_name (varchar)
        sic (varchar)
        sentence_pos (int16)
        embedding_id (varchar)
        section_sentence_count (int16)
    """
    
    print(f"\n[Stage 3 Build - {provider}]")
    print(f"  Input Meta: {len(meta_df):,} rows × {len(meta_df.columns)} cols")
    print(f"  Input Vectors: {len(vectors_df):,} rows × {len(vectors_df.columns)} cols")
    
    # STEP 1: Inner Join (only embedded sentences)
    df_joined = (
        meta_df
        .join(vectors_df, on='sentenceID', how='inner')
        .filter(pl.col('embedding_id').is_not_null())  # Safety filter
    )
    
    print(f"  After join: {len(df_joined):,} rows")
    
    if len(df_joined) == 0:
        raise ValueError(f"No embeddings found for provider {provider}. Check join keys.")
    
    # STEP 2: Derive sentenceID_numsurrogate (mmh3 hash)
    print(f"  Computing mmh3 hashes...")
    
    # Convert sentenceID to hash using mmh3
    sentence_ids = df_joined['sentenceID'].to_list()
    hashes = [mmh3.hash64(sid, signed=True)[0] for sid in sentence_ids]
    
    df_joined = df_joined.with_columns([
        pl.Series('sentenceID_numsurrogate', hashes, dtype=pl.Int64)
    ])
    
    # Validate hash uniqueness
    hash_counts = df_joined.group_by('sentenceID_numsurrogate').agg(pl.count().alias('n'))
    collisions = hash_counts.filter(pl.col('n') > 1)
    
    if len(collisions) > 0:
        print(f"  ⚠️  WARNING: {len(collisions)} hash collisions detected")
        print(f"     Collision rate: {len(collisions)/len(df_joined)*100:.4f}%")
        # Note: At 1M scale, collisions are extremely rare with mmh3
    else:
        print(f"  ✓ Hash uniqueness validated (0 collisions)")
    
    # STEP 3: Extract sentence_pos with fallback
    df_joined = df_joined.with_columns([
        pl.col('sentenceID')
          .str.split('_')
          .list.last()
          .cast(pl.Int16, strict=False)
          .fill_null(-1)
          .alias('sentence_pos')
    ])
    
    # Diagnostic: Check how many failed extraction
    failed_extractions = df_joined.filter(pl.col('sentence_pos') == -1).height
    if failed_extractions > 0:
        print(f"  ⚠️  {failed_extractions} sentences with position extraction failure (set to -1)")
    
    # STEP 4: Select & Order Final Columns
    df_stage3 = df_joined.select([
        # Primary keys
        'sentenceID_numsurrogate',
        'sentenceID',
        
        # Embedding
        'embedding',
        
        # Filterable metadata
        'cik_int',
        'report_year',
        'section_name',
        'sic',
        'sentence_pos',
        
        # Non-filterable metadata
        'embedding_id',
        'section_sentence_count'
    ])
    
    # STEP 5: Validate Schema
    expected_dims = config.s3vectors_dimensions(provider)
    actual_dims = df_stage3['embedding'].list.len()[0]
    
    if actual_dims != expected_dims:
        raise ValueError(
            f"Dimension mismatch for {provider}:\n"
            f"  Expected: {expected_dims}d (from config)\n"
            f"  Actual: {actual_dims}d (in embedding column)"
        )
    
    print(f"\n  ✓ Stage 3 Complete:")
    print(f"    Rows: {len(df_stage3):,}")
    print(f"    Columns: {len(df_stage3.columns)}")
    print(f"    Dimensions: {actual_dims}d (validated)")
    
    return df_stage3



def initialize_s3vectors_table(config, provider, df_stage3, force_reinit=False):
    """
    Upload Stage 3 table to S3 (provider-specific)
    
    Args:
        config: MLConfig instance
        provider: 'cohere_1024d' or 'titan_1024d'
        df_stage3: Transformed DataFrame
        force_reinit: If True, overwrite existing S3 table
    """
    
    s3_client = config.get_s3_client()
    s3_key = config.s3vectors_path(provider)
    s3_uri = f"s3://{config.bucket}/{s3_key}"
    
    # Check if exists
    exists = check_s3_exists(s3_client, config.bucket, s3_key)
    
    if exists and not force_reinit:
        print(f"\n[S3 Vectors Table - Already Exists]")
        print(f"  Provider: {provider}")
        print(f"  Location: {s3_uri}")
        print(f"  Set FORCE_OVERWRITE_S3 =True to recreate")
        return
    
    elif exists and force_reinit:
        print(f"\n[S3 Vectors Table - Recreating]")
        print(f"  Provider: {provider}")
        s3_client.delete_object(Bucket=config.bucket, Key=s3_key)
        print(f"  ✓ Deleted existing")
    
    # Upload to S3
    print(f"\n[S3 Vectors Table - Creating]")
    print(f"  Provider: {provider}")
    print(f"  Destination: {s3_uri}")
    print(f"  Rows: {len(df_stage3):,}")
    print(f"  Dimensions: {config.s3vectors_dimensions(provider)}d")
    
    df_stage3.write_parquet(
        s3_uri,
        storage_options=config.get_storage_options(),
        compression='zstd'
    )
    
    print(f"  ✓ Upload complete (Cost: $0.00 ingress)")
    
    
    
# wont be called for now.
def cache_s3vectors_table(config, provider, force_recache=False):
    """
    Download and cache Stage 3 table locally
    
    Args:
        config: MLConfig instance
        provider: 'cohere_1024d' or 'titan_1024d'
        force_recache: If True, re-download even if cached
    
    Returns:
        DataFrame loaded from cache or S3
    """
    
    cache_path = config.get_s3vectors_cache_path(provider)
    
    # Check local cache first
    if not force_recache and cache_path.exists():
        print(f"\n[S3 Vectors Table - Using Cache]")
        print(f"  Provider: {provider}")
        print(f"  Location: {cache_path.name}")
        df = pl.read_parquet(cache_path)
        print(f"  Loaded: {len(df):,} rows × {len(df.columns)} columns (Cost: $0.00)")
        return df
    
    # Download from S3
    print(f"\n[S3 Vectors Table - Downloading from S3]")
    print(f"  Provider: {provider}")
    
    s3_key = config.s3vectors_path(provider)
    s3_uri = f"s3://{config.bucket}/{s3_key}"
    
    s3_client = config.get_s3_client()
    
    # Check if exists on S3
    if not check_s3_exists(s3_client, config.bucket, s3_key):
        raise FileNotFoundError(
            f"S3 Vectors table not found for {provider}!\n"
            f"  Expected: {s3_uri}\n"
            f"  Run with UPLOAD_TO_S3 =True to create"
        )
    
    response = s3_client.head_object(Bucket=config.bucket, Key=s3_key)
    file_size_mb = response['ContentLength'] / 1024 / 1024
    egress_cost = _egress_cost_usd(response['ContentLength'])
    
    print(f"  Source: {s3_uri}")
    print(f"  Size: {file_size_mb:.1f} MB")
    
    df = pl.read_parquet(s3_uri, storage_options=config.get_storage_options())
    print(f"  Downloaded: {len(df):,} rows (Cost: ${egress_cost:.4f} egress)")
    
    # Cache for future use
    cache_path.parent.mkdir(parents=True, exist_ok=True)
    df.write_parquet(cache_path, compression='zstd')
    print(f"  ✓ Cached to: {cache_path}")
    
    return df
# ============================================================================
# MAIN EXECUTION
# ============================================================================

config = MLConfig()

print("="*70)
print("S3 VECTORS PIPELINE (Stage 3)")
print("="*70)
print(f"Provider: {S3VECTORS_PROVIDER}")
print(f"Model: {config.bedrock_model_id} ({config.s3vectors_dimensions(S3VECTORS_PROVIDER)}d)")

df_stage3 = None

# ============================================================================
# TASK 1: Build Stage 3 Table Locally
# ============================================================================

if BUILD_S3VECTORS_TABLE:
    # Load dependencies
    print(f"\n[Loading Dependencies]")
    
    # Stage 2 meta table
    meta_cache = Path.cwd().parent / 'data_cache' / 'meta_embeds' / 'finrag_fact_sentences_meta_embeds.parquet'
    if not meta_cache.exists():
        raise FileNotFoundError(f"Stage 2 meta not cached: {meta_cache}")
    meta_df = pl.read_parquet(meta_cache)
    print(f"  ✓ Stage 2 Meta: {len(meta_df):,} rows")
    
    # Embeddings table
    emb_filename = Path(config.embeddings_path(S3VECTORS_PROVIDER)).name
    emb_cache = Path.cwd().parent / 'data_cache' / 'embeddings' / S3VECTORS_PROVIDER / emb_filename
    if not emb_cache.exists():
        raise FileNotFoundError(f"Embeddings not cached: {emb_cache}")
    vectors_df = pl.read_parquet(emb_cache)
    print(f"  ✓ Embeddings: {len(vectors_df):,} rows")
    
    # Build Stage 3
    df_stage3 = build_s3vectors_stage3(meta_df, vectors_df, S3VECTORS_PROVIDER, config)
    
    # Cache locally
    cache_path = config.get_s3vectors_cache_path(S3VECTORS_PROVIDER)
    cache_path.parent.mkdir(parents=True, exist_ok=True)
    df_stage3.write_parquet(cache_path, compression='zstd')
    print(f"\n  ✓ Cached locally: {cache_path}")

# ============================================================================
# TASK 2: Initialize S3 Table
# ============================================================================

if UPLOAD_TO_S3 :
    if df_stage3 is None:
        # Load from local cache
        cache_path = config.get_s3vectors_cache_path(S3VECTORS_PROVIDER)
        if not cache_path.exists():
            raise FileNotFoundError(f"Stage 3 not built. Run with BUILD_S3VECTORS_TABLE=True first.")
        df_stage3 = pl.read_parquet(cache_path)
    
    initialize_s3vectors_table(config, S3VECTORS_PROVIDER, df_stage3, force_reinit=FORCE_OVERWRITE_S3 )

# ============================================================================
# TASK 3: Cache from S3
# ============================================================================

#  DELETED. DELETED. DELETED. DELETED.
# if CACHE_S3VECTORS_LOCALLY:
#     df_cached = cache_s3vectors_table(config, S3VECTORS_PROVIDER, force_recache=FORCE_RECACHE_S3VECTORS)

# ============================================================================
# SUMMARY
# ============================================================================

print(f"\n{'='*70}")
print(f"✓ S3 VECTORS PIPELINE COMPLETE")
print(f"{'='*70}")
print(f"\nActions completed:")
if BUILD_S3VECTORS_TABLE:
    print(f"  ✓ Stage 3 built locally ({len(df_stage3):,} rows)")
if UPLOAD_TO_S3 :
    print(f"  ✓ Uploaded to S3")

print("="*70)

[DEBUG] ✓ AWS credentials loaded from aws_credentials.env
S3 VECTORS PIPELINE (Stage 3)
Provider: cohere_1024d
Model: cohere.embed-v4:0 (1024d)

[Loading Dependencies]
  ✓ Stage 2 Meta: 469,252 rows
  ✓ Embeddings: 203,076 rows

[Stage 3 Build - cohere_1024d]
  Input Meta: 469,252 rows × 34 cols
  Input Vectors: 203,076 rows × 3 cols
  After join: 203,076 rows
  Computing mmh3 hashes...


(Deprecated in version 0.20.5)
  hash_counts = df_joined.group_by('sentenceID_numsurrogate').agg(pl.count().alias('n'))


  ✓ Hash uniqueness validated (0 collisions)

  ✓ Stage 3 Complete:
    Rows: 203,076
    Columns: 10
    Dimensions: 1024d (validated)

  ✓ Cached locally: d:\JoelDesktop folds_24\NEU FALL2025\MLops IE7374 Project\FinSights\ModelPipeline\finrag_ml_tg1\data_cache\stage3_s3vectors\cohere_1024d\finrag_embeddings_s3vectors_cohere_1024d.parquet

[S3 Vectors Table - Recreating]
  Provider: cohere_1024d
  ✓ Deleted existing

[S3 Vectors Table - Creating]
  Provider: cohere_1024d
  Destination: s3://sentence-data-ingestion/ML_EMBED_ASSETS/S3_VECTORS_STAGING/cohere_1024d/finrag_embeddings_s3vectors_cohere_1024d.parquet
  Rows: 203,076
  Dimensions: 1024d
  ✓ Upload complete (Cost: $0.00 ingress)

✓ S3 VECTORS PIPELINE COMPLETE

Actions completed:
  ✓ Stage 3 built locally (203,076 rows)
  ✓ Uploaded to S3


In [None]:
## Quick Validation Cells 1 - 4. 

cache_path = config.get_s3vectors_cache_path("cohere_1024d")
df_check = pl.read_parquet(cache_path)

print(f"Rows: {len(df_check):,}")
print(f"Columns: {df_check.columns}")
print(f"\nSample:")
print(df_check.head(3))

# Check for -1 sentence_pos (extraction failures)
failed = df_check.filter(pl.col('sentence_pos') == -1).height
print(f"\nSentence position extraction failures: {failed}")

Rows: 203,076
Columns: ['sentenceID_numsurrogate', 'sentenceID', 'embedding', 'cik_int', 'report_year', 'section_name', 'sic', 'sentence_pos', 'embedding_id', 'section_sentence_count']

Sample:
shape: (3, 10)
┌────────────┬────────────┬────────────┬─────────┬───┬──────┬────────────┬────────────┬────────────┐
│ sentenceID ┆ sentenceID ┆ embedding  ┆ cik_int ┆ … ┆ sic  ┆ sentence_p ┆ embedding_ ┆ section_se │
│ _numsurrog ┆ ---        ┆ ---        ┆ ---     ┆   ┆ ---  ┆ os         ┆ id         ┆ ntence_cou │
│ ate        ┆ str        ┆ list[f32]  ┆ i32     ┆   ┆ str  ┆ ---        ┆ ---        ┆ nt         │
│ ---        ┆            ┆            ┆         ┆   ┆      ┆ i16        ┆ str        ┆ ---        │
│ i64        ┆            ┆            ┆         ┆   ┆      ┆            ┆            ┆ u32        │
╞════════════╪════════════╪════════════╪═════════╪═══╪══════╪════════════╪════════════╪════════════╡
│ -610014736 ┆ 0001403161 ┆ [0.025757, ┆ 1403161 ┆ … ┆ 7389 ┆ 90         ┆ bedrock_c

In [4]:
df_check.group_by('cik_int').agg(pl.count().alias('n')).sort('n', descending=True)

(Deprecated in version 0.20.5)
  df_check.group_by('cik_int').agg(pl.count().alias('n')).sort('n', descending=True)


cik_int,n
i32,u32
1276520,20442
1273813,17425
813762,17326
890926,15033
814585,12532
…,…
200406,5405
909832,5030
1018724,4903
1065280,4861


In [5]:
df_check.group_by('report_year').agg(pl.count().alias('n')).sort('report_year')

(Deprecated in version 0.20.5)
  df_check.group_by('report_year').agg(pl.count().alias('n')).sort('report_year')


report_year,n
i64,u32
2015,11471
2016,39615
2017,35033
2018,37853
2019,37034
2020,42070


In [6]:
# Check embedding dimensions
sample_embedding = df_check['embedding'][0]
print(f"Embedding length: {len(sample_embedding)}")
print(f"Embedding type: {type(sample_embedding)}")

Embedding length: 1024
Embedding type: <class 'polars.series.series.Series'>


In [8]:
# Verify no hash collisions
unique_hashes = df_check['sentenceID_numsurrogate'].n_unique()
total_rows = len(df_check)

print(f"Total rows: {total_rows:,}")
print(f"Unique hashes: {unique_hashes:,}")
print(f"Collisions: {total_rows - unique_hashes}")

Total rows: 203,076
Unique hashes: 203,076
Collisions: 0


## Post Code success.

- 'sentenceID', 'embedding_id', 'section_sentence_count'. 
- ['sentenceID_numsurrogate',  'embedding', 'cik_int', 'report_year', 'section_name', 'sic', 'sentence_pos' ]
  
- sentenceID_numsurrogate → Vector ID (primary key)
- embedding → Vector values (1024d array)
