## STAGE 3: S3 VECTORS (NEW)


- Stage 1 fact is authoritative, consolidated fact, pure data engineering result.
- Stage 2 meta table REMAINS authoritative for both INFO + Embeddings layer (ML). Stage 2 - Lean table + Meta Fact table.
- Stage 3 S3 Table is a search index projection.


#### Code Auth: Joel Markapudi. 

---


**Part 1: Simple Config Validation**
- Purpose: Verify S3 vectors paths and local cache paths are correctly configured
- : Low - just queries config and prints results

**Part 2: Stage 3 Table Preparation Pipeline**
- Purpose: Join Stage 2 meta + embeddings → create S3 Vectors table (Stage 3)
- : High - multi-step ETL with validation, hashing, S3 upload


### Execution flow:
1. cache_stage2_meta_table()              ` # Ensure Stage 2 available `
2. cache_embeddings_table()               ` # Ensure embeddings available `
3. build_s3vectors_stage3()               ` # Transform + join → Stage 3 `
4. initialize_s3vectors_table()           ` # Upload to S3 (if INIT=True) `
5. cache_s3vectors_table()                ` # Download/cache locally `

```
├─ key (int64 surrogate)
├─ embedding (1024-d float32 vector)
├─ metadata_filterable (20 bytes: cik_int, report_year, section_id, ...)
├─ metadata_non_filterable (30 bytes: embedding_id, context_hint)
├─ Purpose: Optimized for semantic search with AWS-managed ANN
```

```
1. build_s3vectors_stage3(meta_df, vectors_df, provider)
   └─> Core transformation logic
   
2. cache_s3vectors_table(config, provider, force_recache)
   └─> Download from S3, cache locally
   
3. initialize_s3vectors_table(config, provider, df_stage3, force_reinit)
   └─> Upload to S3, handle overwrites
   
4. validate_s3vectors_schema(df, expected_dims)
   └─> Schema validation helper
```


## Quick call for `validate_s3vectors_config` method.

In [1]:
from stage3_config_validation import validate_s3vectors_config

validate_s3vectors_config()

[DEBUG] ✓ Found ModelPipeline via file path: D:\JoelDesktop folds_24\NEU FALL2025\MLops IE7374 Project\FinSights\ModelPipeline
[DEBUG] ✓ AWS credentials loaded from aws_credentials.env
CONFIG VALIDATION - S3 VECTORS PATHS

✓ Base path: ML_EMBED_ASSETS/S3_VECTORS_STAGING

✓ Provider: cohere_1024d
  S3 Path: ML_EMBED_ASSETS/S3_VECTORS_STAGING/cohere_1024d/finrag_embeddings_s3vectors_cohere_1024d.parquet
  Dimensions: 1024d

✓ Provider: titan_1024d
  S3 Path: ML_EMBED_ASSETS/S3_VECTORS_STAGING/titan_1024d/finrag_embeddings_s3vectors_titan_1024d.parquet
  Dimensions: 1024d

AUTO-DETECTION TEST (provider=None)
  Default model: cohere_embed_v4
  Auto-detected path: ML_EMBED_ASSETS/S3_VECTORS_STAGING/cohere_1024d/finrag_embeddings_s3vectors_cohere_1024d.parquet
  Auto-detected dims: 1024d

LOCAL CACHE PATH VALIDATION

✓ Provider: cohere_1024d
  Cache: D:\JoelDesktop folds_24\NEU FALL2025\MLops IE7374 Project\FinSights\ModelPipeline\finrag_ml_tg1\data_cache\stage3_s3vectors\cohere_1024d\finrag

{'base_path': 'ML_EMBED_ASSETS/S3_VECTORS_STAGING',
 'providers': {'cohere_1024d': {'s3_path': 'ML_EMBED_ASSETS/S3_VECTORS_STAGING/cohere_1024d/finrag_embeddings_s3vectors_cohere_1024d.parquet',
   'dimensions': 1024,
   'cache_path': 'D:\\JoelDesktop folds_24\\NEU FALL2025\\MLops IE7374 Project\\FinSights\\ModelPipeline\\finrag_ml_tg1\\data_cache\\stage3_s3vectors\\cohere_1024d\\finrag_embeddings_s3vectors_cohere_1024d.parquet',
   'cache_exists': True},
  'titan_1024d': {'s3_path': 'ML_EMBED_ASSETS/S3_VECTORS_STAGING/titan_1024d/finrag_embeddings_s3vectors_titan_1024d.parquet',
   'dimensions': 1024,
   'cache_path': 'D:\\JoelDesktop folds_24\\NEU FALL2025\\MLops IE7374 Project\\FinSights\\ModelPipeline\\finrag_ml_tg1\\data_cache\\stage3_s3vectors\\titan_1024d\\finrag_embeddings_s3vectors_titan_1024d.parquet',
   'cache_exists': False}},
 'auto_detection': {'default_model': 'cohere_embed_v4',
  'path': 'ML_EMBED_ASSETS/S3_VECTORS_STAGING/cohere_1024d/finrag_embeddings_s3vectors_coher

In [4]:
import mmh3
# Test basic functionality
test_hash = mmh3.hash64("test_sentence_id", signed=True)[0]
print(f"✓ mmh3 installed successfully")
print(f"  Test hash: {test_hash}")
print(f"  Type: {type(test_hash)}")

✓ mmh3 installed successfully
  Test hash: -7804031895798801076
  Type: <class 'int'>


### Decisions for S3 Vector table:

```
# Stage 2 → Stage 3 mapping (CONFIRMED):
✓ sentenceID           → sentenceID
✓ cik_int              → cik_int
✓ report_year          → report_year (exists, not 'year')
✓ section_name         → section_name (exists)
✓ sic                  → sic
✓ section_sentence_count → section_sentence_count

# Embeddings table:
✓ embedding_id         → embedding_id
✓ embedding            → embedding

# Derived:
→ sentenceID_numsurrogate (mmh3 hash)
→ sentence_pos (extract from sentenceID with fallback)
```

--- 
## TEST: Build Stage 3 Table Locally (No S3 Upload)
### Validates streaming join + sink works without kernel crash
---

In [2]:


from s3vectors_table_preparation import S3VectorsTablePipeline

pipeline = S3VectorsTablePipeline(
    provider="cohere_1024d",      
    build_table=True,              # ✓ Build Stage 3 locally
    upload_to_s3=False,            # ✗ Skip S3 upload for now
    force_overwrite=True           
)

# Execute - only builds table, skips upload
summary = pipeline.run()

# Verify results
print(f"\n{'='*70}")
print(f"TEST RESULTS - Local Build Only")
print(f"{'='*70}")
print(f"  Rows built:      {summary['row_count']:,}")
print(f"  Dimensions:      {summary['dimensions']}d")
print(f"  Cache location:  {summary['cache_path']}")
print(f"  Built locally:   {summary['built_locally']}")
print(f"  Uploaded to S3:  {summary['uploaded_to_s3']}")
print(f"{'='*70}")
print(f"\n✓ If no kernel crash → streaming pipeline works!")
print(f"  Next: Set upload_to_s3=True to complete pipeline")

[DEBUG] ✓ Found ModelPipeline via file path: D:\JoelDesktop folds_24\NEU FALL2025\MLops IE7374 Project\FinSights\ModelPipeline
[DEBUG] ✓ AWS credentials loaded from aws_credentials.env
S3 VECTORS PIPELINE (Stage 3)
Provider: cohere_1024d
Model: cohere.embed-v4:0 (1024d)

[Stage 3 Build - cohere_1024d]
  Meta path: d:\JoelDesktop folds_24\NEU FALL2025\MLops IE7374 Project\FinSights\ModelPipeline\finrag_ml_tg1\data_cache\meta_embeds\finrag_fact_sentences_meta_embeds.parquet
  Embeddings path: d:\JoelDesktop folds_24\NEU FALL2025\MLops IE7374 Project\FinSights\ModelPipeline\finrag_ml_tg1\data_cache\embeddings\cohere_1024d\finrag_embeddings_cohere_1024d.parquet

[Validating Dimensions]
  ✓ Dimension check: 1024d (validated from embeddings file)

[Building Lazy Pipeline]
  ✓ Lazy meta scan (7 columns selected)
  ✓ Lazy vectors scan (2 columns selected)
  ✓ Lazy join configured (inner join on sentenceID)
  ✓ Transformations configured:
    - mmh3 hash (sentenceID_numsurrogate)
    - sentence

### == RECORD: 24-26 seconds.

In [3]:
# ============================================================================
# S3 VECTORS PIPELINE (Stage 3) - Execution Parameters
# Build Stage 3 Table (Meta + Embeddings → S3 Vectors Format)
# - Stage 3 is a JOIN operation (cheap, ~1-2 min)
# - Depends on Stage 1 + Stage 2 (upstream changes)
# - No complex merge logic needed (just rebuild)
# - NOTE: ALWAYS does a simple rebuild, instead of tracking complexity with deltas.
# ============================================================================


from s3vectors_table_preparation import S3VectorsTablePipeline

pipeline = S3VectorsTablePipeline(
    provider="cohere_1024d",      # Which embedding provider
    build_table=True,              # Build Stage 3 locally
    upload_to_s3=True,             # Upload to S3 after building
    force_overwrite=True           # Overwrite existing S3 table
)

summary = pipeline.run()              # execute: builds table, validates schema, uploads to S3

print(f"\n✓ Stage 3 Complete:")
print(f"  Rows: {summary['row_count']:,}")
print(f"  Dimensions validated: {summary['dimensions']}d")
print(f"  S3 location: {summary['s3_uri']}")

[DEBUG] ✓ Found ModelPipeline via file path: D:\JoelDesktop folds_24\NEU FALL2025\MLops IE7374 Project\FinSights\ModelPipeline
[DEBUG] ✓ AWS credentials loaded from aws_credentials.env
S3 VECTORS PIPELINE (Stage 3)
Provider: cohere_1024d
Model: cohere.embed-v4:0 (1024d)

[Stage 3 Build - cohere_1024d]
  Meta path: d:\JoelDesktop folds_24\NEU FALL2025\MLops IE7374 Project\FinSights\ModelPipeline\finrag_ml_tg1\data_cache\meta_embeds\finrag_fact_sentences_meta_embeds.parquet
  Embeddings path: d:\JoelDesktop folds_24\NEU FALL2025\MLops IE7374 Project\FinSights\ModelPipeline\finrag_ml_tg1\data_cache\embeddings\cohere_1024d\finrag_embeddings_cohere_1024d.parquet

[Validating Dimensions]
  ✓ Dimension check: 1024d (validated from embeddings file)

[Building Lazy Pipeline]
  ✓ Lazy meta scan (7 columns selected)
  ✓ Lazy vectors scan (2 columns selected)
  ✓ Lazy join configured (inner join on sentenceID)
  ✓ Transformations configured:
    - mmh3 hash (sentenceID_numsurrogate)
    - sentence

In [None]:
## Quick Validation Cells 1 - 4. 

cache_path = config.get_s3vectors_cache_path("cohere_1024d")
df_check = pl.read_parquet(cache_path)

print(f"Rows: {len(df_check):,}")
print(f"Columns: {df_check.columns}")
print(f"\nSample:")
print(df_check.head(3))

# Check for -1 sentence_pos (extraction failures)
failed = df_check.filter(pl.col('sentence_pos') == -1).height
print(f"\nSentence position extraction failures: {failed}")

Rows: 203,076
Columns: ['sentenceID_numsurrogate', 'sentenceID', 'embedding', 'cik_int', 'report_year', 'section_name', 'sic', 'sentence_pos', 'embedding_id', 'section_sentence_count']

Sample:
shape: (3, 10)
┌────────────┬────────────┬────────────┬─────────┬───┬──────┬────────────┬────────────┬────────────┐
│ sentenceID ┆ sentenceID ┆ embedding  ┆ cik_int ┆ … ┆ sic  ┆ sentence_p ┆ embedding_ ┆ section_se │
│ _numsurrog ┆ ---        ┆ ---        ┆ ---     ┆   ┆ ---  ┆ os         ┆ id         ┆ ntence_cou │
│ ate        ┆ str        ┆ list[f32]  ┆ i32     ┆   ┆ str  ┆ ---        ┆ ---        ┆ nt         │
│ ---        ┆            ┆            ┆         ┆   ┆      ┆ i16        ┆ str        ┆ ---        │
│ i64        ┆            ┆            ┆         ┆   ┆      ┆            ┆            ┆ u32        │
╞════════════╪════════════╪════════════╪═════════╪═══╪══════╪════════════╪════════════╪════════════╡
│ -610014736 ┆ 0001403161 ┆ [0.025757, ┆ 1403161 ┆ … ┆ 7389 ┆ 90         ┆ bedrock_c

In [4]:
df_check.group_by('cik_int').agg(pl.count().alias('n')).sort('n', descending=True)

(Deprecated in version 0.20.5)
  df_check.group_by('cik_int').agg(pl.count().alias('n')).sort('n', descending=True)


cik_int,n
i32,u32
1276520,20442
1273813,17425
813762,17326
890926,15033
814585,12532
…,…
200406,5405
909832,5030
1018724,4903
1065280,4861


In [5]:
df_check.group_by('report_year').agg(pl.count().alias('n')).sort('report_year')

(Deprecated in version 0.20.5)
  df_check.group_by('report_year').agg(pl.count().alias('n')).sort('report_year')


report_year,n
i64,u32
2015,11471
2016,39615
2017,35033
2018,37853
2019,37034
2020,42070


In [6]:
# Check embedding dimensions
sample_embedding = df_check['embedding'][0]
print(f"Embedding length: {len(sample_embedding)}")
print(f"Embedding type: {type(sample_embedding)}")

Embedding length: 1024
Embedding type: <class 'polars.series.series.Series'>


In [8]:
# Verify no hash collisions
unique_hashes = df_check['sentenceID_numsurrogate'].n_unique()
total_rows = len(df_check)

print(f"Total rows: {total_rows:,}")
print(f"Unique hashes: {unique_hashes:,}")
print(f"Collisions: {total_rows - unique_hashes}")

Total rows: 203,076
Unique hashes: 203,076
Collisions: 0


## Post Code success.

- 'sentenceID', 'embedding_id', 'section_sentence_count'. 
- ['sentenceID_numsurrogate',  'embedding', 'cik_int', 'report_year', 'section_name', 'sic', 'sentence_pos' ]
  
- sentenceID_numsurrogate → Vector ID (primary key)
- embedding → Vector values (1024d array)
