# Database Vectorization Demo

This notebook demonstrates how to use the Cocoon Database Vectorization module to:

1. Process CSV files with product data
2. Generate vector embeddings using AWS Bedrock
3. Store results in S3 or local storage
4. Track progress and handle errors

This is a practical, production-ready example.


In [1]:
# Import necessary libraries
import os
import sys
import pandas as pd
import time
from dotenv import load_dotenv

# Add parent directory to Python path so we can import cocoon
current_dir = os.path.dirname(os.path.abspath("__file__"))
parent_dir = os.path.dirname(current_dir)
sys.path.insert(0, parent_dir)
print(f"Added {parent_dir} to Python path")

# Load environment variables
load_dotenv()

# Import the new DatabaseVectorizerService
from cocoon.services.database_vectorizer import DatabaseVectorizerService

print("✅ Successfully imported DatabaseVectorizerService")

Added /Users/sagarthacker/Tracera/repos/ml/cocoon to Python path
✅ Successfully imported DatabaseVectorizerService


## 2. Configuration

In [2]:
# Create absolute paths for output directories
current_dir = os.path.dirname(os.path.abspath("__file__"))
output_dir = os.path.join(current_dir, "output")
temp_dir = os.path.join(current_dir, "temp")

# Create output directories if they don't exist
os.makedirs(output_dir, exist_ok=True)
os.makedirs(temp_dir, exist_ok=True)

print(f"Created output directory: {output_dir}")
print(f"Created temp directory: {temp_dir}")

Created output directory: /Users/sagarthacker/Tracera/repos/ml/cocoon/demo/output
Created temp directory: /Users/sagarthacker/Tracera/repos/ml/cocoon/demo/temp


In [3]:
# Define the mapping of database names to text columns
database_to_text_column = {
    "naics": "2017 NAICS Title",
    "defra": "EF Name",
    "ecoinvent": "activity_name"
}

In [4]:
def print_config_pretty(config):
    """Print a VectorizationPipelineConfig in a user-friendly way."""
    def print_section(title, section_dict, indent=2):
        print(f"{title}:")
        for k, v in section_dict.items():
            if isinstance(v, dict):
                print(" " * indent + f"{k}:")
                for subk, subv in v.items():
                    print(" " * (indent + 2) + f"{subk}: {subv}")
            else:
                print(" " * indent + f"{k}: {v}")

    print("Vectorization Pipeline Configuration")
    print("=" * 35)
    # Input
    print_section("Input", config.input.model_dump())
    print()
    # Processing
    print_section("Processing", config.processing.model_dump())
    print()
    # Vectorization
    print_section("Vectorization", config.vectorization.model_dump())
    print()
    # Output
    print_section("Output", config.output.model_dump())
    print("=" * 35)

## 3. NAICS

In [24]:
# Configuration
database = "naics"

# S3 vectorizer config
naics_input_file_path=f"s3://{os.getenv('READ_FILE_S3_BUCKET')}/{os.getenv('NAICS_DB_S3_PREFIX')}"
target_column=database_to_text_column[database]
naics_output_path_prefix=f"s3://{os.getenv('SAVE_RESULT_S3_BUCKET')}/{os.getenv('OUTPUT_PREFIX').format(database=database)}/{os.getenv('OUTPUT_FILENAME')}"

metadata_columns=[]
output_format="csv"

# Additional processing options
deduplicate_text=True
text_cleaning=True
min_text_length=1
batch_size=50

### Titan-V2

In [25]:
embed_model_name = "titan_v2"
os.environ["EMBEDDING_MODEL_ID"] = "amazon.titan-embed-text-v2:0"

naics_output_path = naics_output_path_prefix.format(embed_model_name=embed_model_name)

In [26]:
print(f"Input file path: {naics_input_file_path}")
print(f"Output file path: {naics_output_path}")

Input file path: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/raw/csv/2025/08/13/SupplyChainGHGEmissionFactors_v1.3.0_NAICS_CO2e_USD2022.csv
Output file path: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/features/v1.0.0/naics/titan_v2_embeddings_output.csv


In [27]:
service = DatabaseVectorizerService()

print("✅ Service initialized with default configuration")
print(f"Default region: {service.config.vectorization.embedding_model_config['aws_region_name']}")
print(f"Default model: {service.config.vectorization.embedding_model_config['model_id']}")
print(f"Default batch size: {service.config.vectorization.batch_size}")

# Check service health
health = service.health_check()
print(f"\n🏥 Service health check: {health['status']}")
if health['status'] == 'healthy':
    print(f"   ✅ Embeddings available: {health['embeddings_available']}")
    print(f"   ✅ Test embedding dimensions: {health['test_embedding_dimensions']}")
else:
    print(f"   ❌ Error: {health.get('error', 'Unknown error')}")

✅ Service initialized with default configuration
Default region: eu-central-1
Default model: amazon.titan-embed-text-v2:0
Default batch size: 100

🏥 Service health check: healthy
   ✅ Embeddings available: True
   ✅ Test embedding dimensions: 1024


In [28]:
print_config_pretty(service.config)


Vectorization Pipeline Configuration
Input:
  file_path: placeholder.csv
  file_type: csv
  csv_delimiter: ,
  csv_encoding: utf-8
  excel_sheet_name: None
  excel_engine: openpyxl

Processing:
  deduplicate_text: True
  preserve_original_indices: True
  filter_empty_rows: True
  text_cleaning: True
  text_cleaning_options:
    lowercase: True
    remove_punctuation: False
    remove_numbers: False
    remove_special_chars: False
    normalize_whitespace: True

Vectorization:
  embedding_model_config:
    model_id: amazon.titan-embed-text-v2:0
    aws_region_name: eu-central-1
  target_column: text
  metadata_columns: []
  batch_size: 100

Output:
  output_path: output.parquet
  output_format: parquet
  compression: snappy
  include_metadata: True


In [29]:
# Process our sample CSV file with local storage
print("🚀 Processing sample CSV with DatabaseVectorizerService...")
start_time = time.time()

try:
    # Process the file - this creates an immutable config for this operation
    result = service.process_file(
        input_file_path=naics_input_file_path,
        target_column=target_column,
        output_path=naics_output_path,
        metadata_columns=metadata_columns,
        output_format=output_format,
        # Additional processing options
        deduplicate_text=deduplicate_text,
        text_cleaning=text_cleaning,
        min_text_length=min_text_length,
        batch_size=batch_size
    )
    
    processing_time = time.time() - start_time
    
    print(f"✅ Processing completed in {processing_time:.2f} seconds!")
    print(f"   📊 Status: {result['status']}")
    print(f"   📈 Rows processed: {result.get('rows_processed', 'N/A')}")
    print(f"   🎯 Unique texts: {result.get('unique_texts', 'N/A')}")
    print(f"   📁 Output saved to: {result.get('output_file', 'N/A')}")
    
except Exception as e:
    print(f"❌ Error during processing: {str(e)}")
    print(f"   Error type: {type(e).__name__}")


🚀 Processing sample CSV with DatabaseVectorizerService...
✅ Processing completed in 185.50 seconds!
   📊 Status: success
   📈 Rows processed: 1016
   🎯 Unique texts: 1016
   📁 Output saved to: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/features/v1.0.0/naics/titan_v2_embeddings_output.csv


### Titan-G1

In [30]:
embed_model_name = "titan_g1"
os.environ["EMBEDDING_MODEL_ID"] = "amazon.titan-embed-text-v1"

naics_output_path = naics_output_path_prefix.format(embed_model_name=embed_model_name)

In [31]:
print(f"Input file path: {naics_input_file_path}")
print(f"Output file path: {naics_output_path}")

Input file path: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/raw/csv/2025/08/13/SupplyChainGHGEmissionFactors_v1.3.0_NAICS_CO2e_USD2022.csv
Output file path: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/features/v1.0.0/naics/titan_g1_embeddings_output.csv


In [32]:
# Option 1: Initialize with default configuration
# This will use environment variables for AWS region, embedding model, etc.
service = DatabaseVectorizerService()

print("✅ Service initialized with default configuration")
print(f"Default region: {service.config.vectorization.embedding_model_config['aws_region_name']}")
print(f"Default model: {service.config.vectorization.embedding_model_config['model_id']}")
print(f"Default batch size: {service.config.vectorization.batch_size}")

# Check service health
health = service.health_check()
print(f"\n🏥 Service health check: {health['status']}")
if health['status'] == 'healthy':
    print(f"   ✅ Embeddings available: {health['embeddings_available']}")
    print(f"   ✅ Test embedding dimensions: {health['test_embedding_dimensions']}")
else:
    print(f"   ❌ Error: {health.get('error', 'Unknown error')}")

✅ Service initialized with default configuration
Default region: eu-central-1
Default model: amazon.titan-embed-text-v1
Default batch size: 100

🏥 Service health check: healthy
   ✅ Embeddings available: True
   ✅ Test embedding dimensions: 1536


In [33]:
print_config_pretty(service.config)

Vectorization Pipeline Configuration
Input:
  file_path: placeholder.csv
  file_type: csv
  csv_delimiter: ,
  csv_encoding: utf-8
  excel_sheet_name: None
  excel_engine: openpyxl

Processing:
  deduplicate_text: True
  preserve_original_indices: True
  filter_empty_rows: True
  text_cleaning: True
  text_cleaning_options:
    lowercase: True
    remove_punctuation: False
    remove_numbers: False
    remove_special_chars: False
    normalize_whitespace: True

Vectorization:
  embedding_model_config:
    model_id: amazon.titan-embed-text-v1
    aws_region_name: eu-central-1
  target_column: text
  metadata_columns: []
  batch_size: 100

Output:
  output_path: output.parquet
  output_format: parquet
  compression: snappy
  include_metadata: True


In [34]:
# Process our sample CSV file with local storage
print("🚀 Processing sample CSV with DatabaseVectorizerService...")
start_time = time.time()

try:
    # Process the file - this creates an immutable config for this operation
    result = service.process_file(
        input_file_path=naics_input_file_path,
        target_column=target_column,
        output_path=naics_output_path,
        metadata_columns=metadata_columns,
        output_format=output_format,
        # Additional processing options
        deduplicate_text=deduplicate_text,
        text_cleaning=text_cleaning,
        min_text_length=min_text_length,
        batch_size=batch_size
    )
    
    processing_time = time.time() - start_time
    
    print(f"✅ Processing completed in {processing_time:.2f} seconds!")
    print(f"   📊 Status: {result['status']}")
    print(f"   📈 Rows processed: {result.get('rows_processed', 'N/A')}")
    print(f"   🎯 Unique texts: {result.get('unique_texts', 'N/A')}")
    print(f"   📁 Output saved to: {result.get('output_file', 'N/A')}")
    
except Exception as e:
    print(f"❌ Error during processing: {str(e)}")
    print(f"   Error type: {type(e).__name__}")


🚀 Processing sample CSV with DatabaseVectorizerService...
✅ Processing completed in 182.22 seconds!
   📊 Status: success
   📈 Rows processed: 1016
   🎯 Unique texts: 1016
   📁 Output saved to: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/features/v1.0.0/naics/titan_g1_embeddings_output.csv


## 4. DEFRA

In [5]:
# Configuration
database = "defra"

# S3 vectorizer config
defra_input_file_path=f"s3://{os.getenv('READ_FILE_S3_BUCKET')}/{os.getenv('DEFRA_DB_S3_PREFIX')}"
target_column=database_to_text_column[database]
defra_output_path_prefix=f"s3://{os.getenv('SAVE_RESULT_S3_BUCKET')}/{os.getenv('OUTPUT_PREFIX').format(database=database)}/{os.getenv('OUTPUT_FILENAME')}"

metadata_columns=[]
output_format="csv"

# Additional processing options
deduplicate_text=True
text_cleaning=True
min_text_length=1
batch_size=50

### Titan-V2

In [6]:
embed_model_name = "titan_v2"
os.environ["EMBEDDING_MODEL_ID"] = "amazon.titan-embed-text-v2:0"

defra_output_path = defra_output_path_prefix.format(embed_model_name=embed_model_name)

In [7]:
print(f"Input file path: {defra_input_file_path}")
print(f"Output file path: {defra_output_path}")

Input file path: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/raw/csv/2025/08/13/20250308_DEFRA_S3_Emission_Factors_Database(Cat_1_&_2).csv
Output file path: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/features/v1.0.0/defra/titan_v2_embeddings_output.csv


In [8]:
# Option 1: Initialize with default configuration
# This will use environment variables for AWS region, embedding model, etc.
service = DatabaseVectorizerService()

print("✅ Service initialized with default configuration")
print(f"Default region: {service.config.vectorization.embedding_model_config['aws_region_name']}")
print(f"Default model: {service.config.vectorization.embedding_model_config['model_id']}")
print(f"Default batch size: {service.config.vectorization.batch_size}")

# Check service health
health = service.health_check()
print(f"\n🏥 Service health check: {health['status']}")
if health['status'] == 'healthy':
    print(f"   ✅ Embeddings available: {health['embeddings_available']}")
    print(f"   ✅ Test embedding dimensions: {health['test_embedding_dimensions']}")
else:
    print(f"   ❌ Error: {health.get('error', 'Unknown error')}")

✅ Service initialized with default configuration
Default region: eu-central-1
Default model: amazon.titan-embed-text-v2:0
Default batch size: 100

🏥 Service health check: healthy
   ✅ Embeddings available: True
   ✅ Test embedding dimensions: 1024


In [9]:
# Process our sample CSV file with local storage
print("🚀 Processing sample CSV with DatabaseVectorizerService...")
start_time = time.time()

try:
    # Process the file - this creates an immutable config for this operation
    result = service.process_file(
        input_file_path=defra_input_file_path,
        target_column=target_column,
        output_path=defra_output_path,
        metadata_columns=metadata_columns,
        output_format=output_format,
        # Additional processing options
        deduplicate_text=deduplicate_text,
        text_cleaning=text_cleaning,
        min_text_length=min_text_length,
        batch_size=batch_size
    )
    
    processing_time = time.time() - start_time
    
    print(f"✅ Processing completed in {processing_time:.2f} seconds!")
    print(f"   📊 Status: {result['status']}")
    print(f"   📈 Rows processed: {result.get('rows_processed', 'N/A')}")
    print(f"   🎯 Unique texts: {result.get('unique_texts', 'N/A')}")
    print(f"   📁 Output saved to: {result.get('output_file', 'N/A')}")
    
except Exception as e:
    print(f"❌ Error during processing: {str(e)}")
    print(f"   Error type: {type(e).__name__}")


🚀 Processing sample CSV with DatabaseVectorizerService...
✅ Processing completed in 26.89 seconds!
   📊 Status: success
   📈 Rows processed: 134
   🎯 Unique texts: 134
   📁 Output saved to: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/features/v1.0.0/defra/titan_v2_embeddings_output.csv


### Titan-G1

In [10]:
embed_model_name = "titan_g1"
os.environ["EMBEDDING_MODEL_ID"] = "amazon.titan-embed-text-v1"

defra_output_path = defra_output_path_prefix.format(embed_model_name=embed_model_name)

In [11]:
print(f"Input file path: {defra_input_file_path}")
print(f"Output file path: {defra_output_path}")

Input file path: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/raw/csv/2025/08/13/20250308_DEFRA_S3_Emission_Factors_Database(Cat_1_&_2).csv
Output file path: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/features/v1.0.0/defra/titan_g1_embeddings_output.csv


In [12]:
# Option 1: Initialize with default configuration
# This will use environment variables for AWS region, embedding model, etc.
service = DatabaseVectorizerService()

print("✅ Service initialized with default configuration")
print(f"Default region: {service.config.vectorization.embedding_model_config['aws_region_name']}")
print(f"Default model: {service.config.vectorization.embedding_model_config['model_id']}")
print(f"Default batch size: {service.config.vectorization.batch_size}")

# Check service health
health = service.health_check()
print(f"\n🏥 Service health check: {health['status']}")
if health['status'] == 'healthy':
    print(f"   ✅ Embeddings available: {health['embeddings_available']}")
    print(f"   ✅ Test embedding dimensions: {health['test_embedding_dimensions']}")
else:
    print(f"   ❌ Error: {health.get('error', 'Unknown error')}")

✅ Service initialized with default configuration
Default region: eu-central-1
Default model: amazon.titan-embed-text-v1
Default batch size: 100

🏥 Service health check: healthy
   ✅ Embeddings available: True
   ✅ Test embedding dimensions: 1536


In [14]:
# Process our sample CSV file with local storage
print("🚀 Processing sample CSV with DatabaseVectorizerService...")
start_time = time.time()

try:
    # Process the file - this creates an immutable config for this operation
    result = service.process_file(
        input_file_path=defra_input_file_path,
        target_column=target_column,
        output_path=defra_output_path,
        metadata_columns=metadata_columns,
        output_format=output_format,
        # Additional processing options
        deduplicate_text=deduplicate_text,
        text_cleaning=text_cleaning,
        min_text_length=min_text_length,
        batch_size=batch_size
    )
    
    processing_time = time.time() - start_time
    
    print(f"✅ Processing completed in {processing_time:.2f} seconds!")
    print(f"   📊 Status: {result['status']}")
    print(f"   📈 Rows processed: {result.get('rows_processed', 'N/A')}")
    print(f"   🎯 Unique texts: {result.get('unique_texts', 'N/A')}")
    print(f"   📁 Output saved to: {result.get('output_file', 'N/A')}")
    
except Exception as e:
    print(f"❌ Error during processing: {str(e)}")
    print(f"   Error type: {type(e).__name__}")


🚀 Processing sample CSV with DatabaseVectorizerService...
✅ Processing completed in 27.21 seconds!
   📊 Status: success
   📈 Rows processed: 134
   🎯 Unique texts: 134
   📁 Output saved to: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/features/v1.0.0/defra/titan_g1_embeddings_output.csv


## 5. Ecoinvent

In [15]:
# Configuration
database = "ecoinvent"

# S3 vectorizer config
ecoinvent_input_file_path=f"s3://{os.getenv('READ_FILE_S3_BUCKET')}/{os.getenv('ECOINVENT_DB_S3_PREFIX')}"
target_column=database_to_text_column[database]
ecoinvent_output_path_prefix=f"s3://{os.getenv('SAVE_RESULT_S3_BUCKET')}/{os.getenv('OUTPUT_PREFIX').format(database=database)}/{os.getenv('OUTPUT_FILENAME')}"

metadata_columns=[]
output_format="csv"

# Additional processing options
deduplicate_text=True
text_cleaning=True
min_text_length=1
batch_size=50

### Titan-V2

In [16]:
embed_model_name = "titan_v2"
os.environ["EMBEDDING_MODEL_ID"] = "amazon.titan-embed-text-v2:0"

ecoinvent_output_path = ecoinvent_output_path_prefix.format(embed_model_name=embed_model_name)

In [17]:
print(f"Input file path: {ecoinvent_input_file_path}")
print(f"Output file path: {ecoinvent_output_path}")

Input file path: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/raw/csv/2025/08/13/ecoinvent_impact_indicators.csv
Output file path: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/features/v1.0.0/ecoinvent/titan_v2_embeddings_output.csv


In [18]:
# Option 1: Initialize with default configuration
# This will use environment variables for AWS region, embedding model, etc.
service = DatabaseVectorizerService()

print("✅ Service initialized with default configuration")
print(f"Default region: {service.config.vectorization.embedding_model_config['aws_region_name']}")
print(f"Default model: {service.config.vectorization.embedding_model_config['model_id']}")
print(f"Default batch size: {service.config.vectorization.batch_size}")

# Check service health
health = service.health_check()
print(f"\n🏥 Service health check: {health['status']}")
if health['status'] == 'healthy':
    print(f"   ✅ Embeddings available: {health['embeddings_available']}")
    print(f"   ✅ Test embedding dimensions: {health['test_embedding_dimensions']}")
else:
    print(f"   ❌ Error: {health.get('error', 'Unknown error')}")

✅ Service initialized with default configuration
Default region: eu-central-1
Default model: amazon.titan-embed-text-v2:0
Default batch size: 100

🏥 Service health check: healthy
   ✅ Embeddings available: True
   ✅ Test embedding dimensions: 1024


In [19]:
# Process our sample CSV file with local storage
print("🚀 Processing sample CSV with DatabaseVectorizerService...")
start_time = time.time()

try:
    # Process the file - this creates an immutable config for this operation
    result = service.process_file(
        input_file_path=ecoinvent_input_file_path,
        target_column=target_column,
        output_path=ecoinvent_output_path,
        metadata_columns=metadata_columns,
        output_format=output_format,
        # Additional processing options
        deduplicate_text=deduplicate_text,
        text_cleaning=text_cleaning,
        min_text_length=min_text_length,
        batch_size=batch_size
    )
    
    processing_time = time.time() - start_time
    
    print(f"✅ Processing completed in {processing_time:.2f} seconds!")
    print(f"   📊 Status: {result['status']}")
    print(f"   📈 Rows processed: {result.get('rows_processed', 'N/A')}")
    print(f"   🎯 Unique texts: {result.get('unique_texts', 'N/A')}")
    print(f"   📁 Output saved to: {result.get('output_file', 'N/A')}")
    
except Exception as e:
    print(f"❌ Error during processing: {str(e)}")
    print(f"   Error type: {type(e).__name__}")


🚀 Processing sample CSV with DatabaseVectorizerService...
✅ Processing completed in 1698.44 seconds!
   📊 Status: success
   📈 Rows processed: 9835
   🎯 Unique texts: 9835
   📁 Output saved to: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/features/v1.0.0/ecoinvent/titan_v2_embeddings_output.csv


### Titan-G1

In [20]:
embed_model_name = "titan_g1"
os.environ["EMBEDDING_MODEL_ID"] = "amazon.titan-embed-text-v1"

ecoinvent_output_path = ecoinvent_output_path_prefix.format(embed_model_name=embed_model_name)

In [21]:
print(f"Input file path: {ecoinvent_input_file_path}")
print(f"Output file path: {ecoinvent_output_path}")

Input file path: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/raw/csv/2025/08/13/ecoinvent_impact_indicators.csv
Output file path: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/features/v1.0.0/ecoinvent/titan_g1_embeddings_output.csv


In [22]:
# Option 1: Initialize with default configuration
# This will use environment variables for AWS region, embedding model, etc.
service = DatabaseVectorizerService()

print("✅ Service initialized with default configuration")
print(f"Default region: {service.config.vectorization.embedding_model_config['aws_region_name']}")
print(f"Default model: {service.config.vectorization.embedding_model_config['model_id']}")
print(f"Default batch size: {service.config.vectorization.batch_size}")

# Check service health
health = service.health_check()
print(f"\n🏥 Service health check: {health['status']}")
if health['status'] == 'healthy':
    print(f"   ✅ Embeddings available: {health['embeddings_available']}")
    print(f"   ✅ Test embedding dimensions: {health['test_embedding_dimensions']}")
else:
    print(f"   ❌ Error: {health.get('error', 'Unknown error')}")

✅ Service initialized with default configuration
Default region: eu-central-1
Default model: amazon.titan-embed-text-v1
Default batch size: 100

🏥 Service health check: healthy
   ✅ Embeddings available: True
   ✅ Test embedding dimensions: 1536


In [23]:
# Process our sample CSV file with local storage
print("🚀 Processing sample CSV with DatabaseVectorizerService...")
start_time = time.time()

try:
    # Process the file - this creates an immutable config for this operation
    result = service.process_file(
        input_file_path=ecoinvent_input_file_path,
        target_column=target_column,
        output_path=ecoinvent_output_path,
        metadata_columns=metadata_columns,
        output_format=output_format,
        # Additional processing options
        deduplicate_text=deduplicate_text,
        text_cleaning=text_cleaning,
        min_text_length=min_text_length,
        batch_size=batch_size
    )
    
    processing_time = time.time() - start_time
    
    print(f"✅ Processing completed in {processing_time:.2f} seconds!")
    print(f"   📊 Status: {result['status']}")
    print(f"   📈 Rows processed: {result.get('rows_processed', 'N/A')}")
    print(f"   🎯 Unique texts: {result.get('unique_texts', 'N/A')}")
    print(f"   📁 Output saved to: {result.get('output_file', 'N/A')}")
    
except Exception as e:
    print(f"❌ Error during processing: {str(e)}")
    print(f"   Error type: {type(e).__name__}")


🚀 Processing sample CSV with DatabaseVectorizerService...
✅ Processing completed in 1712.76 seconds!
   📊 Status: success
   📈 Rows processed: 9835
   🎯 Unique texts: 9835
   📁 Output saved to: s3://tracera-ml-prod-data/projects/emission_factor_matching/org_data/common/features/v1.0.0/ecoinvent/titan_g1_embeddings_output.csv


## End

## Sample CSV Data - Demo

First, let's create a sample CSV file with product data to demonstrate the vectorization process.


In [None]:
def create_sample_csv(num_products=100, output_path="sample_products.csv", add_duplicates=True):
    """Create a sample CSV file with product data."""
    
    # Create sample product data
    products = []
    
    # Define some categories
    categories = [
        "Electronics", "Clothing", "Home & Kitchen", "Sports & Outdoors",
        "Beauty & Personal Care", "Toys & Games", "Books", "Office Products"
    ]
    
    # Generate product data
    for i in range(1, num_products + 1):
        category = categories[i % len(categories)]
        product_title = f"Product {i} - {category} Item"
        
        # Add more descriptive titles for specific categories
        if category == "Electronics":
            product_title = f"Wireless Bluetooth Headphones - Model X{i}"
        elif category == "Clothing":
            product_title = f"Premium Cotton T-Shirt - Size {['S', 'M', 'L', 'XL'][i % 4]}"
        elif category == "Home & Kitchen":
            product_title = f"Stainless Steel Cookware Set - {i % 10 + 1} Piece"
        
        products.append({
            "product_id": i,
            "product_title": product_title,
            "category": category,
            "price": round((i % 50) + 9.99, 2)
        })
    
    # Add duplicate products with different product IDs to demonstrate grouping
    if add_duplicates:
        # Add duplicates for 20% of products
        num_duplicates = num_products // 5
        for i in range(num_duplicates):
            # Pick a random product to duplicate
            original_idx = i * 5  # Evenly distribute duplicates
            duplicate = products[original_idx].copy()
            duplicate["product_id"] = num_products + i + 1
            # Keep the same product title but vary other attributes
            duplicate["price"] = round(duplicate["price"] + 5.0, 2)  # Different price
            products.append(duplicate)
        
        print(f"Added {num_duplicates} duplicate product titles with different attributes")
    
    # Create DataFrame and save to CSV
    df = pd.DataFrame(products)
    df.to_csv(output_path, index=False)
    
    print(f"Created sample CSV with {len(products)} products at {output_path}")
    return df

# Create a sample CSV with 100 products plus duplicates
sample_df = create_sample_csv(num_products=100)
sample_df.head()


Added 20 duplicate product titles with different attributes
Created sample CSV with 120 products at sample_products.csv


Unnamed: 0,product_id,product_title,category,price
0,1,Premium Cotton T-Shirt - Size M,Clothing,10.99
1,2,Stainless Steel Cookware Set - 3 Piece,Home & Kitchen,11.99
2,3,Product 3 - Sports & Outdoors Item,Sports & Outdoors,12.99
3,4,Product 4 - Beauty & Personal Care Item,Beauty & Personal Care,13.99
4,5,Product 5 - Toys & Games Item,Toys & Games,14.99


In [None]:
# Duplicates
sample_df[sample_df.duplicated(subset=["product_title"])].head()

Unnamed: 0,product_id,product_title,category,price
8,9,Premium Cotton T-Shirt - Size M,Clothing,18.99
16,17,Premium Cotton T-Shirt - Size M,Clothing,26.99
24,25,Premium Cotton T-Shirt - Size M,Clothing,34.99
32,33,Premium Cotton T-Shirt - Size M,Clothing,42.99
40,41,Premium Cotton T-Shirt - Size M,Clothing,50.99


In [None]:
# Unique product titles
sample_df.product_title.nunique()

80

### Initialize the Service

The service can be initialized with default configuration or with a custom configuration.


In [14]:
# Option 1: Initialize with default configuration
# This will use environment variables for AWS region, embedding model, etc.
service = DatabaseVectorizerService()

print("✅ Service initialized with default configuration")
print(f"Default region: {service.default_config.vectorization.embedding_model_config['aws_region_name']}")
print(f"Default model: {service.default_config.vectorization.embedding_model_config['model_id']}")
print(f"Default batch size: {service.default_config.vectorization.batch_size}")

# Check service health
health = service.health_check()
print(f"\n🏥 Service health check: {health['status']}")
if health['status'] == 'healthy':
    print(f"   ✅ Embeddings available: {health['embeddings_available']}")
    print(f"   ✅ Test embedding dimensions: {health['test_embedding_dimensions']}")
else:
    print(f"   ❌ Error: {health.get('error', 'Unknown error')}")


✅ Service initialized with default configuration
Default region: eu-central-1
Default model: amazon.titan-embed-text-v1
Default batch size: 100

🏥 Service health check: healthy
   ✅ Embeddings available: True
   ✅ Test embedding dimensions: 1536


### Process File with Local Storage

The service provides a simple `process_file` method that handles the entire vectorization pipeline.


In [6]:
# Process our sample CSV file with local storage
print("🚀 Processing sample CSV with DatabaseVectorizerService...")
start_time = time.time()

try:
    # Process the file - this creates an immutable config for this operation
    result = service.process_file(
        input_file_path="sample_products.csv",
        target_column="product_title",
        output_path=os.path.join(output_dir, "service_demo_output.parquet"),
        metadata_columns=["product_id", "category", "price"],
        output_format="parquet",
        # Additional processing options
        deduplicate_text=True,
        text_cleaning=True,
        min_text_length=5,
        batch_size=50
    )
    
    processing_time = time.time() - start_time
    
    print(f"✅ Processing completed in {processing_time:.2f} seconds!")
    print(f"   📊 Status: {result['status']}")
    print(f"   📈 Rows processed: {result.get('rows_processed', 'N/A')}")
    print(f"   🎯 Unique texts: {result.get('unique_texts', 'N/A')}")
    print(f"   📁 Output saved to: {result.get('output_file', 'N/A')}")
    
    # Display the output
    if result['status'] == 'success' and 'output_file' in result:
        output_file = result['output_file']
        if os.path.exists(output_file):
            output_df = pd.read_parquet(output_file)
            print(f"\n📋 Output preview ({len(output_df)} rows):")
            display(output_df.head())
            
            print(f"\n🔍 Column information:")
            for col in output_df.columns:
                print(f"   - {col}: {output_df[col].dtype}")
        else:
            print(f"⚠️  Output file not found at: {output_file}")
            
except Exception as e:
    print(f"❌ Error during processing: {str(e)}")
    print(f"   Error type: {type(e).__name__}")


🚀 Processing sample CSV with DatabaseVectorizerService...
✅ Processing completed in 14.97 seconds!
   📊 Status: success
   📈 Rows processed: 80
   🎯 Unique texts: 80
   📁 Output saved to: /Users/sagarthacker/Tracera/repos/ml/cocoon/demo/output/service_demo_output.parquet

📋 Output preview (80 rows):


Unnamed: 0,label,embedding,index_ids,product_id,category,price
0,Premium Cotton T-Shirt - Size M,"[-0.1533203125, 0.439453125, 0.07421875, 0.679...","[0, 8, 16, 24, 32, 40, 48, 56, 64, 72, 80, 88,...",1,Clothing,10.99
1,Product 100 - Beauty & Personal Care Item,"[0.52734375, 0.5234375, 0.2734375, 0.51171875,...",[99],100,Beauty & Personal Care,9.99
2,Product 11 - Sports & Outdoors Item,"[0.61328125, 0.40625, -0.267578125, 0.11279296...","[10, 102]",11,Sports & Outdoors,20.99
3,Product 12 - Beauty & Personal Care Item,"[0.5390625, 0.390625, 0.1416015625, 0.05346679...",[11],12,Beauty & Personal Care,21.99
4,Product 13 - Toys & Games Item,"[0.4609375, 0.138671875, -0.462890625, 0.09082...",[12],13,Toys & Games,22.99



🔍 Column information:
   - label: object
   - embedding: object
   - index_ids: object
   - product_id: int64
   - category: object
   - price: float64


### Process File with S3 Storage

The same service can be used to process files and store results in S3.


In [7]:
# Process with S3 storage
print("🚀 Processing sample CSV with S3 storage...")
start_time = time.time()

try:
    # S3 output path
    s3_output_path = f"s3://esgflo-ml-test-scope3/test/database_vectorization_demo/service_demo_output.csv"
    
    # Process the file with S3 storage
    result = service.process_file(
        input_file_path="sample_products.csv",
        target_column="product_title", 
        output_path=s3_output_path,
        metadata_columns=["product_id", "category", "price"],
        output_format="csv",  # CSV format for S3
        # Processing options
        deduplicate_text=True,
        text_cleaning=True,
        batch_size=25
    )
    
    processing_time = time.time() - start_time
    
    print(f"✅ S3 processing completed in {processing_time:.2f} seconds!")
    print(f"   📊 Status: {result['status']}")
    print(f"   📈 Rows processed: {result.get('rows_processed', 'N/A')}")
    print(f"   🎯 Unique texts: {result.get('unique_texts', 'N/A')}")
    print(f"   📁 S3 output: {result.get('output_file', 'N/A')}")
    
    # The service automatically detects S3 paths and uses S3 storage
    if result['status'] == 'success':
        print(f"\n🌟 Successfully saved results to S3!")
        print(f"   📍 S3 location: {result.get('output_file')}")
    
except Exception as e:
    print(f"❌ Error during S3 processing: {str(e)}")
    print(f"   Error type: {type(e).__name__}")


🚀 Processing sample CSV with S3 storage...
✅ S3 processing completed in 16.62 seconds!
   📊 Status: success
   📈 Rows processed: 80
   🎯 Unique texts: 80
   📁 S3 output: s3://esgflo-ml-test-scope3/test/database_vectorization_demo/service_demo_output.csv

🌟 Successfully saved results to S3!
   📍 S3 location: s3://esgflo-ml-test-scope3/test/database_vectorization_demo/service_demo_output.csv


In [10]:
result

{'status': 'error',
 'error_message': "Unrecognized compression type: snappy\nValid compression types are ['infer', None, 'bz2', 'gzip', 'tar', 'xz', 'zip', 'zstd']",
 'error_type': 'ValueError',
 'input_file': 'sample_products.csv',
 'target_column': 'product_title',
 'output_path': 's3://esgflo-ml-test-scope3/test/database_vectorization_demo/service_demo_output.csv'}

### Initialize Service with Custom Configuration

You can also initialize the service with a custom configuration for more control.


In [8]:
# Create a custom configuration
from cocoon.core.config.models import (
    VectorizationPipelineConfig, FileInputConfig, ProcessingConfig,
    VectorizationConfig, OutputConfig
)

# Create custom configuration with specific settings
custom_config = VectorizationPipelineConfig(
    input=FileInputConfig(
        file_path="placeholder.csv",
        file_type="csv",
        csv_delimiter=",",
        csv_encoding="utf-8"
    ),
    processing=ProcessingConfig(
        deduplicate_text=True,
        preserve_original_indices=True,
        text_cleaning=True,
        min_text_length=10,  # Longer minimum text length
        max_text_length=200  # Maximum text length
    ),
    vectorization=VectorizationConfig(
        embedding_model_config={
            "model_id": "amazon.titan-embed-text-v1",
            "aws_region_name": "eu-central-1"
        },
        target_column="text",
        metadata_columns=[],
        batch_size=20  # Smaller batch size
    ),
    output=OutputConfig(
        output_path="output.parquet",
        output_format="parquet",
        compression="gzip",  # Different compression
        include_metadata=True
    )
)

# Initialize service with custom config
custom_service = DatabaseVectorizerService(custom_config)

print("✅ Custom service initialized")
print(f"   🔧 Min text length: {custom_service.default_config.processing.min_text_length}")
print(f"   🔧 Max text length: {custom_service.default_config.processing.max_text_length}")
print(f"   🔧 Batch size: {custom_service.default_config.vectorization.batch_size}")
print(f"   🔧 Compression: {custom_service.default_config.output.compression}")


✅ Custom service initialized
   🔧 Min text length: 10
   🔧 Max text length: 200
   🔧 Batch size: 20
   🔧 Compression: gzip


### Error Handling and Statistics

The service provides comprehensive error handling and statistics.


In [9]:
# Demonstrate error handling by trying to process a non-existent file
print("🧪 Testing error handling with non-existent file...")

error_result = service.process_file(
    input_file_path="non_existent_file.csv",
    target_column="text_column",
    output_path="error_output.parquet"
)

print(f"📋 Error handling result:")
print(f"   📊 Status: {error_result['status']}")
print(f"   ❌ Error type: {error_result.get('error_type', 'N/A')}")
print(f"   📝 Error message: {error_result.get('error_message', 'N/A')}")
print(f"   📁 Input file: {error_result.get('input_file', 'N/A')}")

# Demonstrate with invalid column name
print(f"\n🧪 Testing error handling with invalid column name...")

invalid_column_result = service.process_file(
    input_file_path="sample_products.csv",
    target_column="non_existent_column",
    output_path="invalid_output.parquet"
)

print(f"📋 Invalid column result:")
print(f"   📊 Status: {invalid_column_result['status']}")
print(f"   ❌ Error type: {invalid_column_result.get('error_type', 'N/A')}")
print(f"   📝 Error message: {invalid_column_result.get('error_message', 'N/A')[:100]}...")  # Truncate long messages


Error processing non_existent_file.csv: CSV file not found: non_existent_file.csv
Error processing sample_products.csv: Target column 'non_existent_column' not found in input file


🧪 Testing error handling with non-existent file...
📋 Error handling result:
   📊 Status: error
   ❌ Error type: FileNotFoundError
   📝 Error message: CSV file not found: non_existent_file.csv
   📁 Input file: non_existent_file.csv

🧪 Testing error handling with invalid column name...
📋 Invalid column result:
   📊 Status: error
   ❌ Error type: ValueError
   📝 Error message: Target column 'non_existent_column' not found in input file...
