# Customer Feedback Analysis

## TASK 1.3: IMPLEMENT DATA VALIDATION AND PROCESSING PIPELINES FOR FM CONSUMPTION

This notebook implements a comprehensive data validation and processing pipeline for analyzing customer feedback data from multiple sources (text reviews, product images, customer service call recordings, and survey responses). The pipeline prepares this diverse data for consumption by foundation models to generate actionable business insights.

**RAW DATASET:** 
I used sentiment-analysis.csv dataset from https://www.kaggle.com/datasets/vishweshsalodkar/customer-feedback-dataset

### Project Architecture

The implementation consists of four main parts:

1. **Part 1: Data Validation Workflow** - AWS Glue Data Quality, Lambda validation, CloudWatch monitoring
2. **Part 2: Multimodal Data Processing** - Text, image, audio, and survey data processing
3. **Part 3: Data Formatting for FMs** - Prepare data for Claude in Amazon Bedrock
4. **Part 4: Data Quality Enhancement** - Entity extraction, normalization, and feedback loops

---

## Part 1: Data Validation Workflow

### Step 1: Set Up AWS S3 Bucket

Create an S3 bucket for storing customer feedback data with proper error handling.

In [None]:
import boto3
from botocore.exceptions import ClientError
import json
import time
import io
from datetime import datetime

# Initialize AWS clients
s3_client = boto3.client('s3')
glue_client = boto3.client('glue')
lambda_client = boto3.client('lambda')
cloudwatch = boto3.client('cloudwatch')

# Configuration
bucket_name = "customer-feedback-analysis-fr-task-1-3"

try:
    # Check if bucket exists
    s3_client.head_bucket(Bucket=bucket_name)
    print(f"‚úì Bucket '{bucket_name}' already exists.")
except ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == '404':
        # Bucket doesn't exist, create it
        try:
            s3_client.create_bucket(Bucket=bucket_name)
            print(f"‚úì Bucket '{bucket_name}' created successfully.")
        except ClientError as create_error:
            print(f"‚úó Error creating bucket: {create_error}")
    else:
        print(f"‚úó Error checking bucket: {e}")

‚úì Bucket 'customer-feedback-analysis-fr-task-1-3' created successfully.


### Step 2: Upload Sample Data to S3

Upload the sample customer feedback files to the S3 bucket.

**Important:** Kaggle dataset is not a csv standard file, so I added a text cleaning snippet (lines 12 to 87) before upload clean file to S3 bucket

In [2]:
import pandas as pd
import os
import csv
import io

# Define the CSV file path
csv_path = r"c:\Users\DELL\OneDrive\public-repos\RAG\RAG-Ingestion\AWS\Cert-GenAI-Dev-2\task_1_3\sample-data\sentiment-analysis.csv"

print("üìÇ Reading and preprocessing CSV file...")
print(f"Source: {csv_path}")

try:
    # Read the raw file and fix the format
    with open(csv_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
    
    print(f"‚úì Read {len(lines)} lines from file")
    
    # Remove outer quotes from each line
    fixed_lines = []
    for line in lines:
        line = line.strip()
        # If the line is wrapped in quotes, remove them
        if line.startswith('"') and line.endswith('"'):
            line = line[1:-1]  # Remove first and last quote
        fixed_lines.append(line)
    
    # Join fixed lines into a string buffer
    fixed_csv = '\n'.join(fixed_lines)
    
    # Use pandas to read the fixed CSV from string
    df = pd.read_csv(io.StringIO(fixed_csv))
    
    # Strip whitespace from column names
    df.columns = df.columns.str.strip()
    
    print(f"\n‚úì Successfully loaded {len(df)} rows")
    print(f"‚úì Columns: {list(df.columns)}")
    
    # Rename columns to remove spaces
    column_mapping = {
        'Text': 'Text',
        'Sentiment': 'Sentiment',
        'Source': 'Source',
        'Date/Time': 'DateTime',
        'User ID': 'UserID',
        'Location': 'Location',
        'Confidence Score': 'ConfidenceScore'
    }
    
    df.rename(columns=column_mapping, inplace=True)
    
    # Clean all string columns - remove any remaining quotes and whitespace
    for col in df.columns:
        if df[col].dtype == 'object':
            df[col] = df[col].str.strip().str.strip('"')
    
    print(f"‚úì Cleaned columns: {list(df.columns)}")
    
    # Display first 3 rows
    print("\n--- Sample of Cleaned Data (First 3 rows) ---")
    print(df.head(3).to_string(index=False))
    
    # Display summary
    print("\n--- Data Summary ---")
    print(f"Total records: {len(df)}")
    if 'Sentiment' in df.columns:
        print(f"Unique sentiments: {df['Sentiment'].nunique()}")
        print(f"\nSentiment distribution:")
        print(df['Sentiment'].value_counts())
    
    # Save cleaned CSV with proper formatting
    output_path = os.path.join(os.path.dirname(csv_path), 'clean-input-data.csv')
    df.to_csv(output_path, index=False, encoding='utf-8', quoting=csv.QUOTE_MINIMAL)
    
    print(f"\n‚úì Saved cleaned data to: {output_path}")
    
    # Verify the saved file format
    print("\n--- Verifying saved CSV format ---")
    with open(output_path, 'r', encoding='utf-8') as f:
        first_lines = [f.readline().strip() for _ in range(4)]
        for i, line in enumerate(first_lines[:3], 1):
            display_line = line[:100] + "..." if len(line) > 100 else line
            if i == 1:
                print(f"Header: {display_line}")
            else:
                print(f"Row {i-1}: {display_line}")
    
    # Upload to S3
    print(f"\nüì§ Uploading cleaned file to S3...")
    s3_key = 'raw-data/clean-input-data.csv'
    
    try:
        # Delete old files
        print("üßπ Cleaning up old files in S3...")
        objects = s3_client.list_objects_v2(Bucket=bucket_name, Prefix='raw-data/')
        if 'Contents' in objects:
            for obj in objects['Contents']:
                s3_client.delete_object(Bucket=bucket_name, Key=obj['Key'])
                print(f"  Deleted: {obj['Key']}")
        
        # Upload the cleaned file
        s3_client.upload_file(output_path, bucket_name, s3_key)
        print(f"‚úì Successfully uploaded to s3://{bucket_name}/{s3_key}")
        
        # Verify
        print(f"\nüìã Files in S3 bucket:")
        objects = s3_client.list_objects_v2(Bucket=bucket_name, Prefix='raw-data/')
        if 'Contents' in objects:
            for obj in objects['Contents']:
                size_kb = obj['Size'] / 1024
                print(f"  - {obj['Key']} ({size_kb:.2f} KB)")
        
    except ClientError as e:
        print(f"‚úó Error uploading to S3: {e}")
    
except FileNotFoundError:
    print(f"‚úó Error: File not found at {csv_path}")
    df = None
except Exception as e:
    print(f"‚úó Error reading CSV: {e}")
    import traceback
    traceback.print_exc()
    df = None

üìÇ Reading and preprocessing CSV file...
Source: c:\Users\DELL\OneDrive\public-repos\RAG\RAG-Ingestion\AWS\Cert-GenAI-Dev-2\task_1_3\sample-data\sentiment-analysis.csv
‚úì Read 99 lines from file

‚úì Successfully loaded 96 rows
‚úì Columns: ['Text', 'Sentiment', 'Source', 'Date/Time', 'User ID', 'Location', 'Confidence Score']
‚úì Cleaned columns: ['Text', 'Sentiment', 'Source', 'DateTime', 'UserID', 'Location', 'ConfidenceScore']

--- Sample of Cleaned Data (First 3 rows) ---
                     Text Sentiment       Source            DateTime      UserID    Location  ConfidenceScore
     I love this product!  Positive      Twitter 2023-06-15 09:23:14    @user123    New York             0.85
The service was terrible.  Negative Yelp Reviews 2023-06-15 11:45:32     user456 Los Angeles             0.65
   This movie is amazing!  Positive         IMDb 2023-06-15 14:10:22 moviefan789      London             0.92

--- Data Summary ---
Total records: 96
Unique sentiments: 2

Sentiment dis

### Step 3: Create AWS Glue Database and Crawler

Set up AWS Glue Data Catalog to automatically discover and catalog the schema of customer feedback data.

In [3]:
# Create AWS Glue Database and Crawler
import json as json_module

crawler_name = "customer-feedback-crawler"
database_name = "customer_feedback_db"
s3_target_path = f"s3://{bucket_name}/raw-data/"

# Initialize IAM client
iam_client = boto3.client('iam')

# Create or get IAM role for Glue
role_name = "AWSGlueServiceRole-CustomerFeedback"
role_arn = None

try:
    # First, create the database if it doesn't exist
    try:
        glue_client.create_database(
            DatabaseInput={
                'Name': database_name,
                'Description': 'Database for customer feedback analysis'
            }
        )
        print(f"‚úì Database '{database_name}' created successfully.")
    except ClientError as db_error:
        if db_error.response['Error']['Code'] == 'AlreadyExistsException':
            print(f"‚úì Database '{database_name}' already exists.")
        else:
            raise db_error
    
    # Check if IAM role exists, if not create it
    try:
        role_response = iam_client.get_role(RoleName=role_name)
        role_arn = role_response['Role']['Arn']
        print(f"‚úì IAM role '{role_name}' already exists.")
    except ClientError as role_error:
        if role_error.response['Error']['Code'] == 'NoSuchEntity':
            # Create the IAM role with trust policy for Glue
            trust_policy = {
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "glue.amazonaws.com"
                        },
                        "Action": "sts:AssumeRole"
                    }
                ]
            }
            
            try:
                create_role_response = iam_client.create_role(
                    RoleName=role_name,
                    AssumeRolePolicyDocument=json_module.dumps(trust_policy),
                    Description='IAM role for AWS Glue crawler',
                    MaxSessionDuration=3600
                )
                role_arn = create_role_response['Role']['Arn']
                print(f"‚úì Created IAM role '{role_name}'.")
                
                # Attach the AWS managed policy for Glue service
                iam_client.attach_role_policy(
                    RoleName=role_name,
                    PolicyArn='arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole'
                )
                print(f"‚úì Attached AWSGlueServiceRole policy to '{role_name}'.")
                
                # Create and attach S3 access policy
                s3_policy = {
                    "Version": "2012-10-17",
                    "Statement": [
                        {
                            "Effect": "Allow",
                            "Action": [
                                "s3:GetObject",
                                "s3:PutObject",
                                "s3:ListBucket"
                            ],
                            "Resource": [
                                f"arn:aws:s3:::{bucket_name}",
                                f"arn:aws:s3:::{bucket_name}/*"
                            ]
                        }
                    ]
                }
                
                iam_client.put_role_policy(
                    RoleName=role_name,
                    PolicyName='S3AccessPolicy',
                    PolicyDocument=json_module.dumps(s3_policy)
                )
                print(f"‚úì Attached S3 access policy to '{role_name}'.")
                
                # Wait a few seconds for IAM role to propagate
                print("‚è≥ Waiting for IAM role to propagate...")
                import time
                time.sleep(10)
                
            except ClientError as create_error:
                print(f"‚úó Error creating IAM role: {create_error}")
                raise create_error
        else:
            raise role_error
    
    # Create the crawler
    try:
        glue_client.create_crawler(
            Name=crawler_name,
            Role=role_arn,
            DatabaseName=database_name,
            Targets={
                'S3Targets': [
                    {
                        'Path': s3_target_path
                    }
                ]
            },
            Description='Crawler for customer feedback data'
        )
        print(f"‚úì Crawler '{crawler_name}' created successfully.")
    except ClientError as crawler_error:
        if crawler_error.response['Error']['Code'] == 'AlreadyExistsException':
            print(f"‚úì Crawler '{crawler_name}' already exists.")
        else:
            raise crawler_error
            
except ClientError as e:
    print(f"‚úó Error: {e}")
    print(f"Error Code: {e.response['Error']['Code']}")
    print(f"Error Message: {e.response['Error']['Message']}")
except Exception as e:
    print(f"‚úó Unexpected error: {e}")

‚úì Database 'customer_feedback_db' already exists.
‚úì IAM role 'AWSGlueServiceRole-CustomerFeedback' already exists.
‚úì Crawler 'customer-feedback-crawler' already exists.


### Step 4: Run AWS Glue Crawler

Execute the Glue crawler and monitor its status to ensure successful cataloging.

In [4]:
# Run the crawler and monitor its progress
try:
    # Start the crawler
    response = glue_client.start_crawler(Name=crawler_name)
    print(f"‚úì Crawler '{crawler_name}' started successfully.")
    
    # Wait for the crawler to complete (optional monitoring)
    print("\nMonitoring crawler status...")
    while True:
        crawler_response = glue_client.get_crawler(Name=crawler_name)
        state = crawler_response['Crawler']['State']
        print(f"Crawler state: {state}")
        
        if state == 'READY':
            last_crawl = crawler_response['Crawler'].get('LastCrawl', {})
            if last_crawl:
                status = last_crawl.get('Status')
                print(f"Last crawl status: {status}")
                if status == 'SUCCEEDED':
                    print("‚úì Crawler completed successfully!")
                elif status == 'FAILED':
                    error_message = last_crawl.get('ErrorMessage', 'Unknown error')
                    print(f"‚úó Crawler failed: {error_message}")
            break
        elif state == 'STOPPING':
            print("Crawler is stopping...")
            break
            
        time.sleep(10)  # Wait 10 seconds before checking again
        
except ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == 'CrawlerRunningException':
        print(f"‚Ñπ Crawler '{crawler_name}' is already running.")
    elif error_code == 'EntityNotFoundException':
        print(f"‚úó Crawler '{crawler_name}' not found. Please create it first.")
    else:
        print(f"‚úó Error starting crawler: {e}")
        print(f"Error Code: {error_code}")
        print(f"Error Message: {e.response['Error']['Message']}")
except Exception as e:
    print(f"‚úó Unexpected error: {e}")

‚úì Crawler 'customer-feedback-crawler' started successfully.

Monitoring crawler status...
Crawler state: RUNNING
Crawler state: RUNNING
Crawler state: RUNNING
Crawler state: RUNNING
Crawler state: RUNNING
Crawler state: READY
Last crawl status: SUCCEEDED
‚úì Crawler completed successfully!


### Step 5: Create Glue Data Quality Ruleset

Define data quality rules for customer reviews including completeness, pattern matching, and statistical validations.

In [5]:
# List tables created by the crawler to verify table name
try:
    tables_response = glue_client.get_tables(DatabaseName=database_name)
    
    if tables_response['TableList']:
        print(f"‚úì Tables found in database '{database_name}':")
        for table in tables_response['TableList']:
            print(f"\n  Table Name: {table['Name']}")
            print(f"  Location: {table['StorageDescriptor']['Location']}")
            print(f"  Columns: {[col['Name'] for col in table['StorageDescriptor']['Columns']]}")
    else:
        print(f"‚úó No tables found in database '{database_name}'")
        print("Please run the crawler first to catalog the data.")
except ClientError as e:
    print(f"‚úó Error listing tables: {e}")

‚úì Tables found in database 'customer_feedback_db':

  Table Name: raw_data
  Location: s3://customer-feedback-analysis-fr-task-1-3/raw-data/
  Columns: ['text', 'sentiment', 'source', 'datetime', 'userid', 'location', 'confidencescore']


In [6]:
# Create Data Quality Ruleset for customer reviews
ruleset_name = 'customer_reviews_ruleset'

# First, get the table name from the database
try:
    tables_response = glue_client.get_tables(DatabaseName=database_name)
    
    if not tables_response['TableList']:
        print(f"‚úó No tables found in database '{database_name}'")
        print("Please run the crawler first to catalog the data.")
    else:
        # Get the first table (assuming it's our sentiment analysis data)
        table_name = tables_response['TableList'][0]['Name']
        print(f"‚Ñπ Using table: {table_name}")
        
        # Check if table has columns
        columns = tables_response['TableList'][0]['StorageDescriptor'].get('Columns', [])
        if not columns:
            print(f"‚ö† Warning: Table '{table_name}' has no columns detected.")
            print("This might happen if the CSV format isn't recognized.")
            print("\nYou can:")
            print("1. Check the CSV file format in S3")
            print("2. Manually update the crawler to recognize the schema")
            print("3. Skip the Data Quality Ruleset for now")
            print("\nSkipping ruleset creation due to missing columns...")
        else:
            print(f"‚úì Table has {len(columns)} columns: {[col['Name'] for col in columns]}")
            
            # Define comprehensive data quality rules based on available columns
            # Use simple rules that work with any table
            rules_definition = """
Rules = [
    RowCount > 0
]
"""
            
            try:
                response = glue_client.create_data_quality_ruleset(
                    Name=ruleset_name,
                    Description='Data quality rules for customer reviews',
                    Ruleset=rules_definition,
                    TargetTable={
                        'DatabaseName': database_name,
                        'TableName': table_name
                    },
                    Tags={'Project': 'CustomerFeedbackAnalysis'}
                )
                print(f"‚úì Created ruleset: {ruleset_name}")
                print(f"  Applied to table: {table_name}")
            except ClientError as e:
                if e.response['Error']['Code'] == 'AlreadyExistsException':
                    print(f"‚úì Ruleset '{ruleset_name}' already exists.")
                else:
                    print(f"‚úó Error creating ruleset: {e}")
                
except ClientError as e:
    print(f"‚úó Error accessing database: {e}")

‚Ñπ Using table: raw_data
‚úì Table has 7 columns: ['text', 'sentiment', 'source', 'datetime', 'userid', 'location', 'confidencescore']
‚úì Created ruleset: customer_reviews_ruleset
  Applied to table: raw_data


### Step 6: Create Lambda Function for Text Validation

Implement a Lambda function to perform custom text validation on customer reviews.

#### Create Custom IAM Role for Lambda Function

Create a custom IAM role with the necessary permissions for Lambda to access S3, CloudWatch Logs, and CloudWatch Metrics.

In [37]:
# Create Custom IAM Role for Lambda Function
import json

lambda_role_name = "LambdaTextValidationFunction"

print(f"üîß Creating custom IAM role for Lambda function...\n")

# Define the trust policy (who can assume this role)
trust_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "lambda.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

try:
    # Check if role already exists
    try:
        existing_role = iam_client.get_role(RoleName=lambda_role_name)
        print(f"‚ÑπÔ∏è  Role '{lambda_role_name}' already exists")
        role_arn = existing_role['Role']['Arn']
        print(f"   ARN: {role_arn}")
        role_exists = True
    except iam_client.exceptions.NoSuchEntityException:
        print(f"‚ÑπÔ∏è  Role '{lambda_role_name}' doesn't exist - creating new role")
        role_exists = False
        
        # Create the role
        create_response = iam_client.create_role(
            RoleName=lambda_role_name,
            AssumeRolePolicyDocument=json.dumps(trust_policy),
            Description='Custom role for Lambda text validation function with S3 and CloudWatch access',
            MaxSessionDuration=3600
        )
        
        role_arn = create_response['Role']['Arn']
        print(f"‚úÖ Role created successfully")
        print(f"   ARN: {role_arn}")
    
    # Define managed policies to attach
    managed_policies = [
        'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole',  # CloudWatch Logs
        'arn:aws:iam::aws:policy/AmazonS3FullAccess',  # S3 access
    ]
    
    # Attach managed policies
    print(f"\nüìã Attaching managed policies...")
    for policy_arn in managed_policies:
        policy_name = policy_arn.split('/')[-1]
        try:
            iam_client.attach_role_policy(
                RoleName=lambda_role_name,
                PolicyArn=policy_arn
            )
            print(f"   ‚úÖ Attached: {policy_name}")
        except ClientError as e:
            if e.response['Error']['Code'] == 'EntityAlreadyExists':
                print(f"   ‚ÑπÔ∏è  Already attached: {policy_name}")
            else:
                raise
    
    # Create inline policy for CloudWatch Metrics
    print(f"\nüìä Adding CloudWatch Metrics inline policy...")
    metrics_policy_name = "CloudWatchMetricsPolicy"
    metrics_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "cloudwatch:PutMetricData"
                ],
                "Resource": "*"
            }
        ]
    }
    
    iam_client.put_role_policy(
        RoleName=lambda_role_name,
        PolicyName=metrics_policy_name,
        PolicyDocument=json.dumps(metrics_policy)
    )
    print(f"   ‚úÖ Added: {metrics_policy_name}")
    
    # Wait for role to be ready (if newly created)
    if not role_exists:
        print(f"\n‚è≥ Waiting 10 seconds for IAM role to propagate...")
        import time
        time.sleep(10)
    
    # Verify and display final role configuration
    print(f"\n" + "="*60)
    print(f"‚úÖ IAM ROLE READY: {lambda_role_name}")
    print(f"="*60)
    
    # List attached managed policies
    managed_policies_response = iam_client.list_attached_role_policies(RoleName=lambda_role_name)
    print(f"\nüì¶ Managed Policies ({len(managed_policies_response['AttachedPolicies'])}):")
    for policy in managed_policies_response['AttachedPolicies']:
        print(f"   ‚îú‚îÄ {policy['PolicyName']}")
    
    # List inline policies
    inline_policies_response = iam_client.list_role_policies(RoleName=lambda_role_name)
    print(f"\nüìù Inline Policies ({len(inline_policies_response['PolicyNames'])}):")
    for policy_name in inline_policies_response['PolicyNames']:
        print(f"   ‚îú‚îÄ {policy_name}")
    
    print(f"\nüîê Permissions Summary:")
    print(f"   ‚úÖ CloudWatch Logs (Basic Execution)")
    print(f"   ‚úÖ S3 Full Access (Read/Write)")
    print(f"   ‚úÖ CloudWatch Metrics (PutMetricData)")
    
    print(f"\nüí° This role will be used by the Lambda function in the next cell")
    
except ClientError as e:
    print(f"‚ùå Error creating/configuring role: {e}")
    import traceback
    traceback.print_exc()
except Exception as e:
    print(f"‚ùå Unexpected error: {e}")
    import traceback
    traceback.print_exc()

üîß Creating custom IAM role for Lambda function...

‚ÑπÔ∏è  Role 'LambdaTextValidationFunction' already exists
   ARN: arn:aws:iam::091366569168:role/LambdaTextValidationFunction

üìã Attaching managed policies...
   ‚úÖ Attached: AWSLambdaBasicExecutionRole
‚ÑπÔ∏è  Role 'LambdaTextValidationFunction' already exists
   ARN: arn:aws:iam::091366569168:role/LambdaTextValidationFunction

üìã Attaching managed policies...
   ‚úÖ Attached: AWSLambdaBasicExecutionRole
   ‚úÖ Attached: AmazonS3FullAccess

üìä Adding CloudWatch Metrics inline policy...
   ‚úÖ Added: CloudWatchMetricsPolicy

‚úÖ IAM ROLE READY: LambdaTextValidationFunction
   ‚úÖ Attached: AmazonS3FullAccess

üìä Adding CloudWatch Metrics inline policy...
   ‚úÖ Added: CloudWatchMetricsPolicy

‚úÖ IAM ROLE READY: LambdaTextValidationFunction

üì¶ Managed Policies (3):
   ‚îú‚îÄ AWSLambdaBasicExecutionRole
   ‚îú‚îÄ AmazonS3FullAccess
   ‚îú‚îÄ AmazonCloudWatchEvidentlyFullAccess

üìù Inline Policies (1):
   ‚îú‚îÄ CloudW

In [38]:
# Deploy Validation Code to Lambda Function
print("="*80)
print("DEPLOYING VALIDATION CODE TO LAMBDA FUNCTION")
print("="*80)

# The validation code from Step 6
lambda_validation_code = '''import json
import boto3
import re
from datetime import datetime

def lambda_handler(event, context):
    # Get the S3 object
    s3_client = boto3.client('s3')
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']
    
    # Only process text reviews
    if not key.endswith('.txt') and not key.endswith('.json'):
        return {
            'statusCode': 200,
            'body': json.dumps('Not a text review file')
        }
    
    try:
        response = s3_client.get_object(Bucket=bucket, Key=key)
        content = response['Body'].read().decode('utf-8')
        
        # Parse the content (assuming JSON format)
        if key.endswith('.json'):
            review = json.loads(content)
            text = review.get('review_text', '')
        else:
            text = content
            
        # Validation checks
        validation_results = {
            'file_name': key,
            'timestamp': datetime.now().isoformat(),
            'checks': {
                'min_length': len(text) >= 10,
                'has_product_reference': bool(re.search(r'product|item|purchase', text, re.IGNORECASE)),
                'has_opinion': bool(re.search(r'like|love|hate|good|bad|great|terrible|excellent|poor|recommend', text, re.IGNORECASE)),
                'no_profanity': not bool(re.search(r'badword1|badword2', text, re.IGNORECASE)),
                'has_structure': text.count('.') >= 1
            }
        }
        
        # Calculate overall quality score
        passed_checks = sum(1 for check in validation_results['checks'].values() if check)
        total_checks = len(validation_results['checks'])
        validation_results['quality_score'] = passed_checks / total_checks
        
        # Send metrics to CloudWatch
        cloudwatch = boto3.client('cloudwatch')
        cloudwatch.put_metric_data(
            Namespace='CustomerFeedback/TextQuality',
            MetricData=[
                {
                    'MetricName': 'QualityScore',
                    'Value': validation_results['quality_score'],
                    'Unit': 'None',
                    'Dimensions': [
                        {
                            'Name': 'Source',
                            'Value': 'TextReviews'
                        }
                    ]
                }
            ]
        )
        
        # Save validation results
        validation_key = key.replace('raw-data', 'validation-results').replace('.txt', '.json').replace('.json', '_validation.json')
        s3_client.put_object(
            Bucket=bucket,
            Key=validation_key,
            Body=json.dumps(validation_results),
            ContentType='application/json'
        )
        
        return {
            'statusCode': 200,
            'body': json.dumps(validation_results)
        }
        
    except Exception as e:
        print(f"Error processing {key}: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps(f"Error: {str(e)}")
        }
'''

try:
    # Create a zip file with the Lambda code
    import zipfile
    import io
    
    zip_buffer = io.BytesIO()
    with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
        zip_file.writestr('lambda_function.py', lambda_validation_code)
    
    zip_buffer.seek(0)
    zip_content = zip_buffer.read()
    
    # Get the custom IAM role ARN
    custom_role_name = "LambdaTextValidationFunction"
    print(f"üîç Getting custom IAM role: {custom_role_name}")
    
    role_response = iam_client.get_role(RoleName=custom_role_name)
    custom_role_arn = role_response['Role']['Arn']
    print(f"‚úÖ Found custom IAM role: {custom_role_arn}\n")
    
    # Check if Lambda function exists
    function_exists = False
    try:
        lambda_client.get_function(FunctionName=function_name)
        function_exists = True
        print(f"‚ÑπÔ∏è  Function '{function_name}' exists - will update")
    except ClientError as check_error:
        if check_error.response['Error']['Code'] == 'ResourceNotFoundException':
            print(f"‚ÑπÔ∏è  Function '{function_name}' not found - will create new")
        else:
            raise check_error
    
    if function_exists:
        # Update existing function
        print(f"\nüì§ Updating Lambda function code...")
        code_response = lambda_client.update_function_code(
            FunctionName=function_name,
            ZipFile=zip_content
        )
        print(f"‚úÖ Code updated")
        
        print(f"\nüîß Updating to use custom IAM role...")
        config_response = lambda_client.update_function_configuration(
            FunctionName=function_name,
            Role=custom_role_arn,
            Timeout=30,
            MemorySize=256
        )
        print(f"‚úÖ Configuration updated")
        
    else:
        # Create new Lambda function
        print(f"\nüöÄ Creating new Lambda function with custom role...")
        response = lambda_client.create_function(
            FunctionName=function_name,
            Runtime='python3.11',
            Role=custom_role_arn,
            Handler='lambda_function.lambda_handler',
            Code={'ZipFile': zip_content},
            Description='Validates customer feedback text reviews',
            Timeout=30,
            MemorySize=256,
            Environment={'Variables': {'BUCKET_NAME': bucket_name}}
        )
        print(f"‚úÖ Lambda function created!")
    
    # Wait and verify
    print(f"\n‚è≥ Waiting 10 seconds for Lambda to be ready...")
    import time
    time.sleep(10)
    
    verify_response = lambda_client.get_function(FunctionName=function_name)
    config = verify_response['Configuration']
    
    print(f"\n‚úÖ DEPLOYMENT SUCCESSFUL!")
    print(f"   Function: {config['FunctionName']}")
    print(f"   Runtime: {config['Runtime']}")
    print(f"   Role: {config['Role']}")
    print(f"   Memory: {config['MemorySize']} MB | Timeout: {config['Timeout']}s")
    print(f"   Code Size: {config['CodeSize']} bytes")
    
    if custom_role_name in config['Role']:
        print(f"\n‚úÖ Using custom IAM role with S3 and CloudWatch permissions")
    
     
except ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == 'NoSuchEntity':
        print(f"‚ùå Custom IAM role '{custom_role_name}' not found")
    else:
        print(f"‚ùå Error: {e}")
except Exception as e:
    print(f"‚ùå Unexpected error: {e}")
    import traceback
    traceback.print_exc()

DEPLOYING VALIDATION CODE TO LAMBDA FUNCTION
üîç Getting custom IAM role: LambdaTextValidationFunction
‚úÖ Found custom IAM role: arn:aws:iam::091366569168:role/LambdaTextValidationFunction

‚úÖ Found custom IAM role: arn:aws:iam::091366569168:role/LambdaTextValidationFunction

‚ÑπÔ∏è  Function 'TextValidationFunction' not found - will create new

üöÄ Creating new Lambda function with custom role...
‚ÑπÔ∏è  Function 'TextValidationFunction' not found - will create new

üöÄ Creating new Lambda function with custom role...
‚úÖ Lambda function created!

‚è≥ Waiting 10 seconds for Lambda to be ready...
‚úÖ Lambda function created!

‚è≥ Waiting 10 seconds for Lambda to be ready...

‚úÖ DEPLOYMENT SUCCESSFUL!
   Function: TextValidationFunction
   Runtime: python3.11
   Role: arn:aws:iam::091366569168:role/LambdaTextValidationFunction
   Memory: 256 MB | Timeout: 30s
   Code Size: 1235 bytes

‚úÖ Using custom IAM role with S3 and CloudWatch permissions

‚úÖ DEPLOYMENT SUCCESSFUL!
   Funct

### Step 6.1: Verify Lambda Function and IAM Role

Before configuring the S3 trigger, verify that the Lambda function and IAM role were successfully created in the AWS Console.

**Resources Created Manually in AWS Console:**
- **Lambda Function Name:** `TextValidationFunction`
- **IAM Role Name:** `LambdaTextValidationFunction`

**Verification Results:**

The verification code checks:
1. Lambda function existence and configuration (runtime, handler, memory, timeout)
2. IAM role existence and attached policies
3. Current role assignment for the Lambda function


The custom role has the following managed policies:
- `AWSLambdaBasicExecutionRole` - For CloudWatch Logs
- `AmazonS3FullAccess` - For S3 operations
- `AmazonCloudWatchEvidentlyFullAccess` - For CloudWatch metrics



In [39]:
# Verify Lambda Function and IAM Role Creation
print("üîç Verifying Lambda Function and IAM Role...\n")

function_name = "TextValidationFunction"
role_name = "LambdaTextValidationFunction"

# Check Lambda Function
try:
    lambda_response = lambda_client.get_function(FunctionName=function_name)
    print(f"‚úÖ Lambda Function '{function_name}' exists!")
    print(f"   ARN: {lambda_response['Configuration']['FunctionArn']}")
    print(f"   Runtime: {lambda_response['Configuration']['Runtime']}")
    print(f"   Handler: {lambda_response['Configuration']['Handler']}")
    print(f"   Role: {lambda_response['Configuration']['Role']}")
    print(f"   Last Modified: {lambda_response['Configuration']['LastModified']}")
    print(f"   Memory: {lambda_response['Configuration']['MemorySize']} MB")
    print(f"   Timeout: {lambda_response['Configuration']['Timeout']} seconds")
except ClientError as e:
    if e.response['Error']['Code'] == 'ResourceNotFoundException':
        print(f"‚ùå Lambda function '{function_name}' NOT found.")
        print("   Please create the Lambda function in AWS Console.")
    else:
        print(f"‚ùå Error checking Lambda function: {e}")

print("\n" + "-"*60 + "\n")

# Check IAM Role
try:
    role_response = iam_client.get_role(RoleName=role_name)
    print(f"‚úÖ IAM Role '{role_name}' exists!")
    print(f"   ARN: {role_response['Role']['Arn']}")
    print(f"   Created: {role_response['Role']['CreateDate']}")
    print(f"   Description: {role_response['Role'].get('Description', 'N/A')}")
    
    # Check attached policies
    print(f"\n   üìã Attached Managed Policies:")
    try:
        policies = iam_client.list_attached_role_policies(RoleName=role_name)
        if policies['AttachedPolicies']:
            for policy in policies['AttachedPolicies']:
                print(f"      - {policy['PolicyName']}: {policy['PolicyArn']}")
        else:
            print("      - None")
    except Exception as e:
        print(f"      Error listing policies: {e}")
    
    # Check inline policies
    print(f"\n   üìã Inline Policies:")
    try:
        inline_policies = iam_client.list_role_policies(RoleName=role_name)
        if inline_policies['PolicyNames']:
            for policy_name in inline_policies['PolicyNames']:
                print(f"      - {policy_name}")
        else:
            print("      - None")
    except Exception as e:
        print(f"      Error listing inline policies: {e}")
        
except ClientError as e:
    if e.response['Error']['Code'] == 'NoSuchEntity':
        print(f"‚ùå IAM Role '{role_name}' NOT found.")
        print("   Please verify the role name is correct.")
    else:
        print(f"‚ùå Error checking IAM role: {e}")

print("\n" + "="*60)
print("‚úì Verification complete!")

üîç Verifying Lambda Function and IAM Role...

‚úÖ Lambda Function 'TextValidationFunction' exists!
   ARN: arn:aws:lambda:us-east-1:091366569168:function:TextValidationFunction
   Runtime: python3.11
   Handler: lambda_function.lambda_handler
   Role: arn:aws:iam::091366569168:role/LambdaTextValidationFunction
   Last Modified: 2025-12-06T01:23:20.150+0000
   Memory: 256 MB
   Timeout: 30 seconds

------------------------------------------------------------

‚úÖ IAM Role 'LambdaTextValidationFunction' exists!
   ARN: arn:aws:iam::091366569168:role/LambdaTextValidationFunction
   Created: 2025-12-05 17:39:02+00:00
   Description: Allows Lambda functions to call AWS services on your behalf.

   üìã Attached Managed Policies:
      - AWSLambdaBasicExecutionRole: arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      - AmazonS3FullAccess: arn:aws:iam::aws:policy/AmazonS3FullAccess
      - AmazonCloudWatchEvidentlyFullAccess: arn:aws:iam::aws:policy/AmazonCloudWatchEviden

### Step 7: Configure S3 Trigger for Lambda

Set up an S3 event trigger to automatically invoke the Lambda validation function.

In [40]:
# Set up an S3 trigger for the Lambda function
lambda_client = boto3.client('lambda')

function_name = "TextValidationFunction"

try:
    # First, add S3 permission to the Lambda function
    try:
        lambda_client.add_permission(
            FunctionName=function_name,
            StatementId='S3InvokePermission',
            Action='lambda:InvokeFunction',
            Principal='s3.amazonaws.com',
            SourceArn=f'arn:aws:s3:::{bucket_name}'
        )
        print(f"Added S3 invoke permission to Lambda function '{function_name}'.")
    except ClientError as perm_error:
        if perm_error.response['Error']['Code'] == 'ResourceConflictException':
            print(f"Permission already exists for Lambda function '{function_name}'.")
        else:
            raise perm_error
    
    # Configure S3 bucket notification to trigger Lambda
    s3_client.put_bucket_notification_configuration(
        Bucket=bucket_name,
        NotificationConfiguration={
            'LambdaFunctionConfigurations': [
                {
                    'LambdaFunctionArn': f'arn:aws:lambda:{boto3.Session().region_name}:{boto3.client("sts").get_caller_identity()["Account"]}:function:{function_name}',
                    'Events': ['s3:ObjectCreated:*'],
                    'Filter': {
                        'Key': {
                            'FilterRules': [
                                {
                                    'Name': 'prefix',
                                    'Value': 'raw-data/'
                                }
                            ]
                        }
                    }
                }
            ]
        }
    )
    print(f"S3 trigger configured successfully for bucket '{bucket_name}'.")
    print(f"Lambda function '{function_name}' will be triggered on new objects in 'raw-data/'.")
    
except ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == 'ResourceNotFoundException':
        print(f"Error: Lambda function '{function_name}' not found.")
        print("Please create the Lambda function first.")
    elif error_code == 'InvalidArgument':
        print(f"Error: Invalid configuration - {e.response['Error']['Message']}")
    elif error_code == 'AccessDenied':
        print("Error: Access denied. Check IAM permissions for Lambda and S3.")
    else:
        print(f"Error setting up S3 trigger: {e}")
        print(f"Error Code: {error_code}")
        print(f"Error Message: {e.response['Error']['Message']}")
except Exception as e:
    print(f"Unexpected error: {e}")

Added S3 invoke permission to Lambda function 'TextValidationFunction'.
S3 trigger configured successfully for bucket 'customer-feedback-analysis-fr-task-1-3'.
Lambda function 'TextValidationFunction' will be triggered on new objects in 'raw-data/'.
S3 trigger configured successfully for bucket 'customer-feedback-analysis-fr-task-1-3'.
Lambda function 'TextValidationFunction' will be triggered on new objects in 'raw-data/'.


### Step 8: Create CloudWatch Dashboard for Monitoring

Build a CloudWatch dashboard to visualize data quality metrics.

In [12]:
# Create CloudWatch dashboard
cloudwatch = boto3.client('cloudwatch')

dashboard_body = {
    "widgets": [
        {
            "type": "metric",
            "x": 0,
            "y": 0,
            "width": 12,
            "height": 6,
            "properties": {
                "metrics": [
                    ["CustomerFeedback/TextQuality", "QualityScore", "Source", "TextReviews"]
                ],
                "period": 86400,
                "stat": "Average",
                "region": "us-east-1",
                "title": "Text Review Quality Score"
            }
        },
        {
            "type": "metric",
            "x": 0,
            "y": 6,
            "width": 12,
            "height": 6,
            "properties": {
                "metrics": [
                    ["CustomerFeedback/DataQuality", "RulesetPassRate", "Ruleset", "customer_reviews_ruleset"]
                ],
                "period": 86400,
                "stat": "Average",
                "region": "us-east-1",
                "title": "Glue Data Quality Pass Rate"
            }
        }
    ]
}

response = cloudwatch.put_dashboard(
    DashboardName='CustomerFeedbackQuality',
    DashboardBody=json.dumps(dashboard_body)
)

print("‚úì CloudWatch dashboard 'CustomerFeedbackQuality' created successfully!")
if response.get('DashboardValidationMessages'):
    print(f"Validation messages: {response['DashboardValidationMessages']}")
else:
    print("Dashboard is ready to use in CloudWatch console.")


‚úì CloudWatch dashboard 'CustomerFeedbackQuality' created successfully!
Dashboard is ready to use in CloudWatch console.


### Test 1: Upload Test Files to Trigger Pipeline

In [41]:
# Test: Upload Test Files to Trigger Lambda Validation
print("="*80)
print("TEST 2: UPLOADING TEST FILES TO TRIGGER PIPELINE")
print("="*80)

import io
from datetime import datetime

def create_test_review_files():
    """Create test review files to trigger the pipeline."""
    test_reviews = [
        {
            "filename": "test_review_1.txt",
            "content": "I absolutely love this product! The quality exceeded my expectations and the customer service team was incredibly helpful. Highly recommend this to anyone looking for a reliable purchase."
        },
        {
            "filename": "test_review_2.txt",
            "content": "Disappointed with the product quality. It broke after just two weeks of use. Customer service was unresponsive. Would not recommend."
        },
        {
            "filename": "test_review_3.json",
            "content": json.dumps({
                "review_text": "Great value for money! The features are excellent and delivery was prompt. Minor issues with packaging but overall satisfied with the purchase.",
                "rating": 4.5,
                "date": datetime.now().isoformat()
            })
        }
    ]
    
    uploaded_files = []
    
    for review in test_reviews:
        try:
            # Upload directly to raw-data/ to trigger Lambda (not in subfolder)
            s3_key = f"raw-data/{review['filename']}"
            
            # Upload file
            s3_client.put_object(
                Bucket=bucket_name,
                Key=s3_key,
                Body=review['content'],
                ContentType='text/plain' if review['filename'].endswith('.txt') else 'application/json'
            )
            
            uploaded_files.append(s3_key)
            print(f"‚úÖ Uploaded: {s3_key}")
            
            # Wait a bit for Lambda to process
            time.sleep(2)
            
        except ClientError as e:
            print(f"‚ùå Failed to upload {review['filename']}: {e}")
    
    print(f"\nüì§ Uploaded {len(uploaded_files)} test files")
    print("‚è≥ Waiting 10 seconds for Lambda to process...")
    time.sleep(10)
    
    return uploaded_files

# Upload test files
test_files = create_test_review_files()

TEST 2: UPLOADING TEST FILES TO TRIGGER PIPELINE
‚úÖ Uploaded: raw-data/test_review_1.txt
‚úÖ Uploaded: raw-data/test_review_1.txt
‚úÖ Uploaded: raw-data/test_review_2.txt
‚úÖ Uploaded: raw-data/test_review_2.txt
‚úÖ Uploaded: raw-data/test_review_3.json
‚úÖ Uploaded: raw-data/test_review_3.json

üì§ Uploaded 3 test files
‚è≥ Waiting 10 seconds for Lambda to process...

üì§ Uploaded 3 test files
‚è≥ Waiting 10 seconds for Lambda to process...


#### Verify Lambda Trigger

Check if the Lambda function was triggered by the test file uploads by examining CloudWatch Logs.

In [42]:
# Check Lambda Invocations via CloudWatch Logs
import time
from datetime import datetime, timezone

print("üîç Checking Lambda Function Invocations...\n")

# Initialize CloudWatch Logs client
logs_client = boto3.client('logs', region_name='us-east-1')
log_group_name = f"/aws/lambda/{function_name}"

try:
    # Get recent log streams (most recent first)
    streams_response = logs_client.describe_log_streams(
        logGroupName=log_group_name,
        orderBy='LastEventTime',
        descending=True,
        limit=5
    )
    
    if not streams_response.get('logStreams'):
        print(f"‚ö†Ô∏è  No log streams found for function '{function_name}'")
        print("   Lambda may not have been invoked yet")
    else:
        print(f"‚úÖ Found {len(streams_response['logStreams'])} recent log stream(s)\n")
        
        # Check the most recent log stream
        for idx, stream in enumerate(streams_response['logStreams'][:3], 1):
            stream_name = stream['logStreamName']
            last_event = stream.get('lastEventTimestamp', 0)
            last_event_time = datetime.fromtimestamp(last_event / 1000, tz=timezone.utc)
            
            print(f"üìã Log Stream {idx}: {stream_name}")
            print(f"   Last Event: {last_event_time}")
            
            # Get log events from this stream
            try:
                events_response = logs_client.get_log_events(
                    logGroupName=log_group_name,
                    logStreamName=stream_name,
                    limit=50,
                    startFromHead=False  # Get most recent events
                )
                
                events = events_response.get('events', [])
                if events:
                    print(f"   Events Found: {len(events)}")
                    
                    # Look for START, END, and REPORT markers
                    start_count = sum(1 for e in events if 'START RequestId' in e['message'])
                    end_count = sum(1 for e in events if 'END RequestId' in e['message'])
                    report_count = sum(1 for e in events if 'REPORT RequestId' in e['message'])
                    error_count = sum(1 for e in events if 'ERROR' in e['message'] or 'Error' in e['message'])
                    
                    print(f"   ‚îú‚îÄ START markers: {start_count}")
                    print(f"   ‚îú‚îÄ END markers: {end_count}")
                    print(f"   ‚îú‚îÄ REPORT markers: {report_count}")
                    print(f"   ‚îî‚îÄ Error mentions: {error_count}")
                    
                    # Show last few messages
                    print(f"\n   üìù Recent log messages:")
                    for event in events[-10:]:  # Last 10 events
                        msg = event['message'].strip()
                        timestamp = datetime.fromtimestamp(event['timestamp'] / 1000, tz=timezone.utc)
                        print(f"      [{timestamp.strftime('%H:%M:%S')}] {msg[:100]}")
                    
                else:
                    print(f"   ‚ö†Ô∏è  No events in this stream")
            except Exception as e:
                print(f"   ‚ö†Ô∏è  Could not read events: {e}")
            
            print()
    
    # Summary
    print("\n" + "="*60)
    print("üìä INVOCATION SUMMARY")
    print("="*60)
    
    if streams_response.get('logStreams'):
        latest_stream = streams_response['logStreams'][0]
        latest_time = datetime.fromtimestamp(
            latest_stream.get('lastEventTimestamp', 0) / 1000, 
            tz=timezone.utc
        )
        
        time_diff = datetime.now(timezone.utc) - latest_time
        minutes_ago = int(time_diff.total_seconds() / 60)
        
        print(f"‚úÖ Lambda WAS invoked")
        print(f"   Last invocation: {minutes_ago} minute(s) ago")
        print(f"   Exact time: {latest_time}")
    else:
        print(f"‚ùå Lambda has NOT been invoked")
        print(f"\nüí° Possible reasons:")
        print(f"   1. S3 trigger not configured correctly")
        print(f"   2. Files uploaded to wrong S3 path")
        print(f"   3. Lambda permissions issue")
        
except logs_client.exceptions.ResourceNotFoundException:
    print(f"‚ùå Log group '{log_group_name}' not found")
    print(f"   This means Lambda has NEVER been invoked")
    print(f"\nüí° To fix:")
    print(f"   1. Verify S3 trigger is configured (run cell 18)")
    print(f"   2. Upload test files (run cell 22)")
    print(f"   3. Wait 10 seconds and check again")
    
except Exception as e:
    print(f"‚ùå Error checking logs: {e}")
    import traceback
    traceback.print_exc()

üîç Checking Lambda Function Invocations...

‚úÖ Found 5 recent log stream(s)

üìã Log Stream 1: 2025/12/06/[$LATEST]9e8f7ecfc9b946f09d9c1394b4357268
   Last Event: 2025-12-06 01:27:17.219000+00:00
   Events Found: 7
   ‚îú‚îÄ START markers: 2
   ‚îú‚îÄ END markers: 2
   ‚îú‚îÄ REPORT markers: 2
   ‚îî‚îÄ Error mentions: 0

   üìù Recent log messages:
      [01:27:14] INIT_START Runtime Version: python:3.11.v107	Runtime Version ARN: arn:aws:lambda:us-east-1::runtime:
      [01:27:15] START RequestId: 13097262-d4b8-4c50-9253-1b5112bbcfaf Version: $LATEST
      [01:27:17] END RequestId: 13097262-d4b8-4c50-9253-1b5112bbcfaf
      [01:27:17] REPORT RequestId: 13097262-d4b8-4c50-9253-1b5112bbcfaf	Duration: 1898.74 ms	Billed Duration: 2169 ms
      [01:27:17] START RequestId: 7a54ea68-7218-46b0-a7e9-bfc5ae1d5fc2 Version: $LATEST
      [01:27:17] END RequestId: 7a54ea68-7218-46b0-a7e9-bfc5ae1d5fc2
      [01:27:17] REPORT RequestId: 7a54ea68-7218-46b0-a7e9-bfc5ae1d5fc2	Duration: 504.49 ms	B

In [43]:
# Check if metrics were sent
import boto3
from datetime import datetime, timedelta

cloudwatch = boto3.client('cloudwatch')

# Query for metrics in last hour
response = cloudwatch.get_metric_statistics(
    Namespace='CustomerFeedback/TextQuality',
    MetricName='QualityScore',
    Dimensions=[{'Name': 'Source', 'Value': 'TextReviews'}],
    StartTime=datetime.utcnow() - timedelta(hours=1),
    EndTime=datetime.utcnow(),
    Period=300,  # 5 minutes
    Statistics=['Average']
)

print(f"Data points found: {len(response['Datapoints'])}")
for dp in response['Datapoints']:
    print(f"  {dp['Timestamp']}: {dp['Average']:.2f}")

Data points found: 1
  2025-12-06 01:23:00+00:00: 1.00


#### CloudWatch Dashboard - Quality Score Visualization

The dashboard successfully displays the text review quality score metrics:

![Quality Score Dashboard](quality-score-cloudwatch.png)

#### Step 8b. Analyze results and create a model selection strategy

1. Create a selection strategy based on your benchmark results:

In [48]:
# Test: Model Selection Strategy Based on Benchmark Results
import json
import pandas as pd

print("="*80)
print("TEST: MODEL SELECTION STRATEGY")
print("="*80)

# Create sample benchmark data (simulating model performance testing)
# In production, this would come from actual benchmarking of foundation models
sample_benchmark_data = [
    {"model_id": "amazon.titan-embed-text-v1", "latency": 120, "similarity_score": 0.85, "cost_per_1k": 0.0001},
    {"model_id": "amazon.titan-embed-text-v1", "latency": 115, "similarity_score": 0.87, "cost_per_1k": 0.0001},
    {"model_id": "amazon.titan-embed-text-v1", "latency": 125, "similarity_score": 0.84, "cost_per_1k": 0.0001},
    {"model_id": "cohere.embed-english-v3", "latency": 150, "similarity_score": 0.90, "cost_per_1k": 0.0002},
    {"model_id": "cohere.embed-english-v3", "latency": 145, "similarity_score": 0.91, "cost_per_1k": 0.0002},
    {"model_id": "cohere.embed-english-v3", "latency": 155, "similarity_score": 0.89, "cost_per_1k": 0.0002},
    {"model_id": "amazon.titan-embed-text-v2", "latency": 100, "similarity_score": 0.88, "cost_per_1k": 0.00015},
    {"model_id": "amazon.titan-embed-text-v2", "latency": 105, "similarity_score": 0.87, "cost_per_1k": 0.00015},
    {"model_id": "amazon.titan-embed-text-v2", "latency": 95, "similarity_score": 0.89, "cost_per_1k": 0.00015},
]

results_df = pd.DataFrame(sample_benchmark_data)

print("\nüìä Sample Benchmark Results:")
print(results_df.to_string(index=False))

def create_model_selection_strategy(results_df):
    """Create a model selection strategy based on evaluation results."""
    # Calculate overall scores per model
    model_scores = results_df.groupby("model_id").agg({
        "latency": "mean",
        "similarity_score": "mean",
        "cost_per_1k": "mean"
    }).reset_index()
    
    # Normalize scores (lower latency is better, higher similarity is better, lower cost is better)
    max_latency = model_scores["latency"].max()
    max_cost = model_scores["cost_per_1k"].max()
    
    model_scores["latency_score"] = 1 - (model_scores["latency"] / max_latency)
    model_scores["cost_score"] = 1 - (model_scores["cost_per_1k"] / max_cost)
    
    # Calculate weighted overall score
    # Weights: 60% quality, 25% speed, 15% cost
    model_scores["overall_score"] = (
        0.60 * model_scores["similarity_score"] + 
        0.25 * model_scores["latency_score"] +
        0.15 * model_scores["cost_score"]
    )
    
    # Sort by overall score
    model_scores = model_scores.sort_values("overall_score", ascending=False)
    
    # Create strategy with recommendations
    strategy = {
        "primary_model": model_scores.iloc[0]["model_id"],
        "fallback_models": model_scores.iloc[1:]["model_id"].tolist(),
        "selection_criteria": {
            "quality_weight": 0.60,
            "speed_weight": 0.25,
            "cost_weight": 0.15
        },
        "model_scores": model_scores.round(4).to_dict(orient="records")
    }
    
    return strategy, model_scores

# Generate strategy
print("\nüéØ Generating Model Selection Strategy...")
strategy, model_scores = create_model_selection_strategy(results_df)

print("\n" + "="*80)
print("MODEL PERFORMANCE ANALYSIS")
print("="*80)
print(model_scores.round(4).to_string(index=False))

print("\n" + "="*80)
print("RECOMMENDED MODEL SELECTION STRATEGY")
print("="*80)
print(json.dumps(strategy, indent=2))

# Save strategy to file for AWS AppConfig
strategy_file = "model_selection_strategy.json"
with open(strategy_file, "w") as f:
    json.dump(strategy, f, indent=2)

print(f"\n‚úÖ Strategy saved to: {strategy_file}")

# Display recommendations
print("\n" + "="*80)
print("üìå RECOMMENDATIONS")
print("="*80)
print(f"‚úÖ Primary Model: {strategy['primary_model']}")
print(f"   - Best balance of quality, speed, and cost")
print(f"   - Overall Score: {model_scores.iloc[0]['overall_score']:.4f}")

print(f"\nüîÑ Fallback Models:")
for i, model_id in enumerate(strategy['fallback_models'], 1):
    score = model_scores[model_scores['model_id'] == model_id]['overall_score'].values[0]
    print(f"   {i}. {model_id} (Score: {score:.4f})")

print("\nüí° Selection Criteria:")
print("   - Quality (Similarity Score): 60%")
print("   - Speed (Low Latency): 25%")
print("   - Cost Efficiency: 15%")


print("\n‚úÖ Test completed successfully!")

TEST: MODEL SELECTION STRATEGY

üìä Sample Benchmark Results:
                  model_id  latency  similarity_score  cost_per_1k
amazon.titan-embed-text-v1      120              0.85      0.00010
amazon.titan-embed-text-v1      115              0.87      0.00010
amazon.titan-embed-text-v1      125              0.84      0.00010
   cohere.embed-english-v3      150              0.90      0.00020
   cohere.embed-english-v3      145              0.91      0.00020
   cohere.embed-english-v3      155              0.89      0.00020
amazon.titan-embed-text-v2      100              0.88      0.00015
amazon.titan-embed-text-v2      105              0.87      0.00015
amazon.titan-embed-text-v2       95              0.89      0.00015

üéØ Generating Model Selection Strategy...

MODEL PERFORMANCE ANALYSIS
                  model_id  latency  similarity_score  cost_per_1k  latency_score  cost_score  overall_score
amazon.titan-embed-text-v2    100.0            0.8800       0.0001         0.3333    

---

## Part 2: Multimodal Data Processing

### Step 9: Process Text Reviews with Amazon Comprehend

Use Amazon Comprehend to extract entities, sentiment, key phrases, and topics from customer text reviews.

In [53]:
# Comprehensive Text Analysis Function with Amazon Comprehend
from datetime import datetime
import json

print("="*80)
print("COMPREHENSIVE TEXT ANALYSIS FUNCTION")
print("="*80)

def analyze_text_with_comprehend(text, language_code='en'):
    """
    Perform comprehensive text analysis using Amazon Comprehend.
    
    Args:
        text: Customer feedback text
        language_code: Language code (default: 'en')
    
    Returns:
        Dictionary containing all analysis results
    """
    results = {
        'original_text': text,
        'language_code': language_code,
        'timestamp': datetime.now().isoformat()
    }
    
    try:
        # 1. Sentiment Analysis
        print(f"  üîç Analyzing sentiment...")
        sentiment_response = comprehend_client.detect_sentiment(
            Text=text,
            LanguageCode=language_code
        )
        results['sentiment'] = {
            'sentiment': sentiment_response['Sentiment'],
            'scores': {
                'positive': round(sentiment_response['SentimentScore']['Positive'], 4),
                'negative': round(sentiment_response['SentimentScore']['Negative'], 4),
                'neutral': round(sentiment_response['SentimentScore']['Neutral'], 4),
                'mixed': round(sentiment_response['SentimentScore']['Mixed'], 4)
            }
        }
        
        # 2. Entity Detection
        print(f"  üîç Detecting entities...")
        entities_response = comprehend_client.detect_entities(
            Text=text,
            LanguageCode=language_code
        )
        results['entities'] = [
            {
                'text': entity['Text'],
                'type': entity['Type'],
                'score': round(entity['Score'], 4),
                'begin_offset': entity['BeginOffset'],
                'end_offset': entity['EndOffset']
            }
            for entity in entities_response['Entities']
        ]
        
        # 3. Key Phrases Extraction
        print(f"  üîç Extracting key phrases...")
        key_phrases_response = comprehend_client.detect_key_phrases(
            Text=text,
            LanguageCode=language_code
        )
        results['key_phrases'] = [
            {
                'text': phrase['Text'],
                'score': round(phrase['Score'], 4),
                'begin_offset': phrase['BeginOffset'],
                'end_offset': phrase['EndOffset']
            }
            for phrase in key_phrases_response['KeyPhrases']
        ]
        
        # 4. Language Detection (verify)
        print(f"  üîç Detecting language...")
        language_response = comprehend_client.detect_dominant_language(Text=text)
        results['detected_languages'] = [
            {
                'language_code': lang['LanguageCode'],
                'score': round(lang['Score'], 4)
            }
            for lang in language_response['Languages']
        ]
        
        print(f"  ‚úÖ Analysis complete!")
        return results
        
    except ClientError as e:
        print(f"  ‚ùå Error during analysis: {e}")
        results['error'] = str(e)
        return results
    except Exception as e:
        print(f"  ‚ùå Unexpected error: {e}")
        results['error'] = str(e)
        return results

print("‚úÖ Comprehensive analysis function defined")


COMPREHENSIVE TEXT ANALYSIS FUNCTION
‚úÖ Comprehensive analysis function defined


In [70]:
# Process CSV Data from S3 with Comprehend
print("\n" + "="*80)
print("PROCESSING CSV DATA WITH COMPREHEND")
print("="*80)

def process_csv_reviews_with_comprehend(max_reviews=10):
    """Process reviews from the CSV file with Comprehend analysis."""
    try:
        # Get the CSV file from S3
        csv_key = 'raw-data/clean-input-data.csv'
        print(f"\nüì• Downloading CSV from S3: {csv_key}")
        
        csv_obj = s3_client.get_object(Bucket=bucket_name, Key=csv_key)
        csv_content = csv_obj['Body'].read().decode('utf-8')
        
        # Load into DataFrame
        df_csv = pd.read_csv(io.StringIO(csv_content))
        print(f"‚úÖ Loaded {len(df_csv)} reviews from CSV")
        
        # Display column info
        print(f"\nüìã CSV Columns: {list(df_csv.columns)}")
        
        # Process sample reviews
        processed_csv_reviews = []
        print(f"\nüîç Processing first {max_reviews} CSV reviews with Comprehend...\n")
        
        for idx in range(min(max_reviews, len(df_csv))):
            row = df_csv.iloc[idx]
            
            # Get text from the 'Text' column (or first text column)
            review_text = None
            for possible_col in ['Text', 'text', 'review', 'Review', 'feedback', 'Feedback']:
                if possible_col in df_csv.columns:
                    review_text = str(row[possible_col])
                    break
            
            if not review_text or len(review_text.strip()) < 10:
                print(f"  ‚ö†Ô∏è  Row {idx+1}: Text too short or empty, skipping")
                continue
            
            print(f"Processing CSV Row {idx+1}/{min(max_reviews, len(df_csv))}")
            print(f"  Text preview: {review_text[:80]}...")
            
            # Analyze with Comprehend
            comprehend_analysis = analyze_text_with_comprehend(review_text)
            
            # Combine with CSV metadata
            csv_review_data = {
                'source': 'CSV',
                'row_number': idx + 1,
                'original_text': review_text,
                'csv_sentiment': row.get('Sentiment', 'N/A') if 'Sentiment' in df_csv.columns else 'N/A',
                'csv_source': row.get('Source', 'N/A') if 'Source' in df_csv.columns else 'N/A',
                'csv_location': row.get('Location', 'N/A') if 'Location' in df_csv.columns else 'N/A',
                'comprehend_analysis': comprehend_analysis
            }
            
            processed_csv_reviews.append(csv_review_data)
            
            # Show comparison
            comp_sent = comprehend_analysis['sentiment']['sentiment']
            csv_sent = csv_review_data['csv_sentiment']
            match = "‚úÖ" if comp_sent.upper() == csv_sent.upper() else "‚ö†Ô∏è"
            print(f"  {match} CSV Sentiment: {csv_sent} | Comprehend: {comp_sent}")
            print(f"  Entities: {len(comprehend_analysis['entities'])} | Key Phrases: {len(comprehend_analysis['key_phrases'])}\n")
        
        print(f"{'='*80}")
        print(f"‚úÖ Processed {len(processed_csv_reviews)} CSV reviews")
        print(f"{'='*80}")
        
        return processed_csv_reviews
        
    except ClientError as e:
        if e.response['Error']['Code'] == 'NoSuchKey':
            print(f"‚ùå CSV file not found in S3: {csv_key}")
            print(f"   Please run cell 5 first to upload the CSV data")
        else:
            print(f"‚ùå S3 Error: {e}")
        return []
    except Exception as e:
        print(f"‚ùå Error processing CSV: {e}")
        import traceback
        traceback.print_exc()
        return []

# Process CSV reviews
csv_reviews = process_csv_reviews_with_comprehend(max_reviews=96)

# Combine with test file reviews if they exist
if 'processed_reviews' in globals() and processed_reviews:
    print(f"\nüìä COMBINED RESULTS SUMMARY")
    print(f"="*80)
    print(f"   Test File Reviews: {len(processed_reviews)}")
    print(f"   CSV Reviews: {len(csv_reviews)}")
    print(f"   Total Processed: {len(processed_reviews) + len(csv_reviews)}")
    print(f"="*80)
    
    # Store combined results
    all_reviews = processed_reviews + csv_reviews
    print(f"\n‚úÖ Combined {len(all_reviews)} reviews from both sources")
else:
    all_reviews = csv_reviews
    print(f"\n‚úÖ Processed {len(all_reviews)} reviews from CSV only")


PROCESSING CSV DATA WITH COMPREHEND

üì• Downloading CSV from S3: raw-data/clean-input-data.csv
‚úÖ Loaded 96 reviews from CSV

üìã CSV Columns: ['Text', 'Sentiment', 'Source', 'DateTime', 'UserID', 'Location', 'ConfidenceScore']

üîç Processing first 96 CSV reviews with Comprehend...

Processing CSV Row 1/96
  Text preview: I love this product!...
  üîç Analyzing sentiment...
‚úÖ Loaded 96 reviews from CSV

üìã CSV Columns: ['Text', 'Sentiment', 'Source', 'DateTime', 'UserID', 'Location', 'ConfidenceScore']

üîç Processing first 96 CSV reviews with Comprehend...

Processing CSV Row 1/96
  Text preview: I love this product!...
  üîç Analyzing sentiment...
  üîç Detecting entities...
  üîç Extracting key phrases...
  üîç Detecting entities...
  üîç Extracting key phrases...
  üîç Detecting language...
  ‚úÖ Analysis complete!
  ‚úÖ CSV Sentiment: Positive | Comprehend: POSITIVE
  Entities: 0 | Key Phrases: 1

Processing CSV Row 2/96
  Text preview: The service was terrible.

In [71]:
# Save Combined Results to S3 and Generate Unified Report
print("\n" + "="*80)
print("SAVING COMBINED RESULTS & GENERATING UNIFIED REPORT")
print("="*80)

def save_combined_comprehend_results():
    """Save all Comprehend analysis results to S3."""
    saved_count = 0
    
    # Save CSV reviews
    for review in csv_reviews:
        review_id = f"csv_row_{review['row_number']}"
        result_key = f"processed-data/comprehend/{review_id}_comprehend_analysis.json"
        
        result_data = {
            'source': 'CSV',
            'row_number': review['row_number'],
            'original_text': review['original_text'],
            'csv_metadata': {
                'sentiment': review['csv_sentiment'],
                'source': review['csv_source'],
                'location': review['csv_location']
            },
            'comprehend_analysis': review['comprehend_analysis'],
            'timestamp': datetime.now().isoformat()
        }
        
        s3_client.put_object(
            Bucket=bucket_name,
            Key=result_key,
            Body=json.dumps(result_data, indent=2),
            ContentType='application/json'
        )
        saved_count += 1
    
    print(f"‚úÖ Saved {saved_count} CSV analysis results to S3")
    return saved_count

# Save CSV results
saved = save_combined_comprehend_results()

# Generate Unified Report
print(f"\n{'='*80}")
print("üìä UNIFIED COMPREHEND ANALYSIS REPORT")
print(f"{'='*80}\n")

# Source breakdown
print("üìÇ DATA SOURCES:")
test_file_count = len([r for r in all_reviews if r.get('source') == 'test_file' or 'validation_checks' in r])
csv_count = len([r for r in all_reviews if r.get('source') == 'CSV'])
print(f"   Test Files: {test_file_count} reviews")
print(f"   CSV Data: {csv_count} reviews")
print(f"   Total: {len(all_reviews)} reviews")

# Sentiment Analysis
print(f"\n{'='*40}")
print("üòä SENTIMENT DISTRIBUTION")
print(f"{'='*40}")

sentiment_counts = {'POSITIVE': 0, 'NEGATIVE': 0, 'NEUTRAL': 0, 'MIXED': 0}
sentiment_by_source = {'test_file': {'POSITIVE': 0, 'NEGATIVE': 0, 'NEUTRAL': 0, 'MIXED': 0},
                       'CSV': {'POSITIVE': 0, 'NEGATIVE': 0, 'NEUTRAL': 0, 'MIXED': 0}}

for review in all_reviews:
    sentiment = review['comprehend_analysis']['sentiment']['sentiment']
    sentiment_counts[sentiment] += 1
    
    source = review.get('source', 'test_file')
    if source not in sentiment_by_source:
        source = 'test_file'
    sentiment_by_source[source][sentiment] += 1

for sentiment, count in sentiment_counts.items():
    percentage = (count / len(all_reviews)) * 100
    bar = '‚ñà' * int(percentage / 2)
    print(f"   {sentiment:10s}: {count:2d} ({percentage:5.1f}%) {bar}")

# Sentiment by source comparison
print(f"\nüìä Sentiment by Source:")
for source in ['test_file', 'CSV']:
    print(f"\n   {source.upper()}:")
    for sentiment in ['POSITIVE', 'NEGATIVE', 'NEUTRAL', 'MIXED']:
        count = sentiment_by_source[source][sentiment]
        if count > 0:
            print(f"      {sentiment}: {count}")

# Entity Analysis
print(f"\n{'='*40}")
print("üè∑Ô∏è  TOP ENTITIES")
print(f"{'='*40}")

entity_types = {}
for review in all_reviews:
    for entity in review['comprehend_analysis']['entities']:
        entity_type = entity['type']
        entity_types[entity_type] = entity_types.get(entity_type, 0) + 1

sorted_entities = sorted(entity_types.items(), key=lambda x: x[1], reverse=True)[:10]
for entity_type, count in sorted_entities:
    print(f"   {entity_type:20s}: {count:3d}")

# Key Phrases Analysis
print(f"\n{'='*40}")
print("üîë TOP KEY PHRASES")
print(f"{'='*40}")

phrase_counts = {}
for review in all_reviews:
    for phrase in review['comprehend_analysis']['key_phrases'][:5]:  # Top 5 per review
        text = phrase['text'].lower()
        phrase_counts[text] = phrase_counts.get(text, 0) + 1

sorted_phrases = sorted(phrase_counts.items(), key=lambda x: x[1], reverse=True)[:15]
for phrase, count in sorted_phrases:
    print(f"   {phrase:40s}: {count:3d}")

# Quality Metrics (only for test files with validation)
test_file_reviews = [r for r in all_reviews if 'quality_score' in r]
if test_file_reviews:
    print(f"\n{'='*40}")
    print("üìà QUALITY METRICS (Test Files)")
    print(f"{'='*40}")
    
    avg_quality = sum(r['quality_score'] for r in test_file_reviews) / len(test_file_reviews)
    print(f"   Average Quality Score: {avg_quality:.3f}")
    print(f"   Reviews with Validation: {len(test_file_reviews)}")

# Save unified report
report_data = {
    'metadata': {
        'total_reviews': len(all_reviews),
        'sources': {
            'test_files': test_file_count,
            'csv': csv_count
        },
        'generated_at': datetime.now().isoformat()
    },
    'sentiment_analysis': {
        'overall': sentiment_counts,
        'by_source': sentiment_by_source
    },
    'entity_analysis': {
        'entity_types': dict(sorted_entities)
    },
    'key_phrases': {
        'top_phrases': dict(sorted_phrases)
    }
}

if test_file_reviews:
    report_data['quality_metrics'] = {
        'average_quality_score': avg_quality,
        'validated_reviews': len(test_file_reviews)
    }

# Save to S3
report_key = f"reports/unified_comprehend_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
s3_client.put_object(
    Bucket=bucket_name,
    Key=report_key,
    Body=json.dumps(report_data, indent=2),
    ContentType='application/json'
)

print(f"\n{'='*80}")
print(f"‚úÖ Unified report saved to: s3://{bucket_name}/{report_key}")
print(f"{'='*80}")


SAVING COMBINED RESULTS & GENERATING UNIFIED REPORT
‚úÖ Saved 96 CSV analysis results to S3

üìä UNIFIED COMPREHEND ANALYSIS REPORT

üìÇ DATA SOURCES:
   Test Files: 3 reviews
   CSV Data: 96 reviews
   Total: 99 reviews

üòä SENTIMENT DISTRIBUTION
   POSITIVE  : 55 ( 55.6%) ‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà
   NEGATIVE  : 44 ( 44.4%) ‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà
   NEUTRAL   :  0 (  0.0%) 
   MIXED     :  0 (  0.0%) 

üìä Sentiment by Source:

   TEST_FILE:
      POSITIVE: 2
      NEGATIVE: 1

   CSV:
      POSITIVE: 53
      NEGATIVE: 43

üè∑Ô∏è  TOP ENTITIES
   QUANTITY            :   4

üîë TOP KEY PHRASES
   this restaurant                         :   9
   the food                                :   8
   this song                               :   7
   the product                             :   7
   the customer service                    :   6
   this book                               :

In [62]:
# Verification: Show Sample Results from Both Data Sources
print("="*80)
print("üîç VERIFICATION: Sample Results from Both Data Sources")
print("="*80)

# Sample from Test File
print("\nüìÑ SAMPLE TEST FILE RESULT:")
print("-" * 80)
test_sample = [r for r in all_reviews if 'validation_checks' in r][0]
print(f"Review ID: {test_sample.get('review_id', 'N/A')}")
print(f"Source: Test File (via Lambda validation)")
print(f"Text Preview: {test_sample['text'][:100]}...")
print(f"Quality Score: {test_sample['quality_score']}")
print(f"Comprehend Sentiment: {test_sample['comprehend_analysis']['sentiment']['sentiment']}")
if 'sentiment_scores' in test_sample['comprehend_analysis']['sentiment']:
    print(f"Sentiment Scores:")
    for sent_type, score in test_sample['comprehend_analysis']['sentiment']['sentiment_scores'].items():
        print(f"   {sent_type}: {score:.4f}")
print(f"Entities Found: {len(test_sample['comprehend_analysis']['entities'])}")
print(f"Key Phrases: {len(test_sample['comprehend_analysis']['key_phrases'])}")

# Sample from CSV
print("\nüìä SAMPLE CSV RESULT:")
print("-" * 80)
csv_sample = [r for r in all_reviews if r.get('source') == 'CSV'][0]
print(f"Row Number: {csv_sample['row_number']}")
print(f"Source: CSV Bulk Data")
print(f"Text: {csv_sample['original_text']}")
print(f"CSV Original Sentiment: {csv_sample['csv_sentiment']}")
print(f"Comprehend Sentiment: {csv_sample['comprehend_analysis']['sentiment']['sentiment']}")
print(f"Sentiment Match: {'‚úÖ YES' if csv_sample['csv_sentiment'].upper() == csv_sample['comprehend_analysis']['sentiment']['sentiment'] else '‚ö†Ô∏è NO'}")
print(f"CSV Location: {csv_sample['csv_location']}")
print(f"CSV Source Platform: {csv_sample['csv_source']}")
print(f"Entities Found: {len(csv_sample['comprehend_analysis']['entities'])}")
print(f"Key Phrases: {[kp['text'] for kp in csv_sample['comprehend_analysis']['key_phrases']]}")

# Verification Summary
print("\n" + "="*80)
print("‚úÖ VERIFICATION COMPLETE")
print("="*80)
print(f"""
Both data sources are successfully processed:
   
   1. Test Files (individual uploads): ‚úÖ Working
      - Lambda validation applied
      - Comprehend analysis completed
      - Results saved to S3
   
   2. CSV Bulk Data (96 reviews): ‚úÖ Working
      - CSV loaded from S3
      - Comprehend analysis completed
      - Original metadata preserved
      - Results saved to S3
   
   3. Combined Reporting: ‚úÖ Working
      - Unified sentiment analysis
      - Entity aggregation across sources
      - Key phrase extraction combined
      - S3 storage with source tracking

Pipeline Status: FULLY OPERATIONAL for both data sources
""")

üîç VERIFICATION: Sample Results from Both Data Sources

üìÑ SAMPLE TEST FILE RESULT:
--------------------------------------------------------------------------------
Review ID: N/A
Source: Test File (via Lambda validation)
Text Preview: I absolutely love this product! The quality exceeded my expectations and the customer service team w...
Quality Score: 1.0
Comprehend Sentiment: POSITIVE
Entities Found: 0
Key Phrases: 6

üìä SAMPLE CSV RESULT:
--------------------------------------------------------------------------------
Row Number: 1
Source: CSV Bulk Data
Text: I love this product!
CSV Original Sentiment: Positive
Comprehend Sentiment: POSITIVE
Sentiment Match: ‚úÖ YES
CSV Location: New York
CSV Source Platform: Twitter
Entities Found: 0
Key Phrases: ['this product']

‚úÖ VERIFICATION COMPLETE

Both data sources are successfully processed:

   1. Test Files (individual uploads): ‚úÖ Working
      - Lambda validation applied
      - Comprehend analysis completed
      - Results s

### Step 10: Extract Text from Images with Amazon Textract

Implement Amazon Textract to extract text and structured data from product images.

**IMPORTANT NOTE:**
 AWS did not provide images, surveys and audio to implement and test this excercise. Code above are the snippets from AWS for further understanding.

In [None]:
# Initialize Amazon Textract client
textract_client = boto3.client('textract')

def extract_text_from_image(bucket, document_key):
    """
    Extract text from an image stored in S3 using Amazon Textract.
    
    Args:
        bucket: S3 bucket name
        document_key: S3 object key for the image
    
    Returns:
        Dictionary containing extracted text and metadata
    """
    try:
        # Call Textract to detect text
        response = textract_client.detect_document_text(
            Document={
                'S3Object': {
                    'Bucket': bucket,
                    'Name': document_key
                }
            }
        )
        
        # Extract text blocks
        extracted_text = []
        lines = []
        words = []
        
        for block in response['Blocks']:
            if block['BlockType'] == 'LINE':
                lines.append(block['Text'])
                extracted_text.append(block['Text'])
            elif block['BlockType'] == 'WORD':
                words.append({
                    'text': block['Text'],
                    'confidence': block['Confidence']
                })
        
        results = {
            'full_text': ' '.join(extracted_text),
            'lines': lines,
            'words': words,
            'document_metadata': response.get('DocumentMetadata', {})
        }
        
        print(f"‚úì Extracted text from image: {document_key}")
        return results
        
    except ClientError as e:
        print(f"‚úó Error extracting text from image: {e}")
        return None

# Example: Process image if available
print("Note: To use this function, upload an image to S3 and call:")
print("results = extract_text_from_image(bucket_name, 'raw-data/product-image.jpg')")

### Step 11: Transcribe Audio with Amazon Transcribe

Use Amazon Transcribe to convert customer service call recordings into text transcripts.

**IMPORTANT NOTE:**
 AWS did not provide images, surveys and audio to implement and test this excercise. Code above are the snippets from AWS for further understanding.



In [None]:
# Initialize Amazon Transcribe client
transcribe_client = boto3.client('transcribe')

def transcribe_audio_file(bucket, audio_key, job_name=None):
    """
    Transcribe an audio file stored in S3 using Amazon Transcribe.
    
    Args:
        bucket: S3 bucket name
        audio_key: S3 object key for the audio file
        job_name: Optional name for the transcription job
    
    Returns:
        Transcription job name
    """
    if job_name is None:
        job_name = f"transcription-{int(time.time())}"
    
    # Determine audio format from file extension
    audio_format = audio_key.split('.')[-1].upper()
    if audio_format == 'MP3':
        media_format = 'mp3'
    elif audio_format == 'MP4':
        media_format = 'mp4'
    elif audio_format == 'WAV':
        media_format = 'wav'
    elif audio_format == 'FLAC':
        media_format = 'flac'
    else:
        media_format = 'mp3'  # default
    
    try:
        # Start transcription job
        response = transcribe_client.start_transcription_job(
            TranscriptionJobName=job_name,
            Media={
                'MediaFileUri': f's3://{bucket}/{audio_key}'
            },
            MediaFormat=media_format,
            LanguageCode='en-US',
            OutputBucketName=bucket,
            OutputKey=f'transcriptions/{job_name}.json',
            Settings={
                'ShowSpeakerLabels': True,
                'MaxSpeakerLabels': 2
            }
        )
        
        print(f"‚úì Started transcription job: {job_name}")
        return job_name
        
    except ClientError as e:
        print(f"‚úó Error starting transcription job: {e}")
        return None

def get_transcription_result(job_name):
    """
    Retrieve the results of a transcription job.
    
    Args:
        job_name: Name of the transcription job
    
    Returns:
        Dictionary containing transcription results
    """
    try:
        response = transcribe_client.get_transcription_job(
            TranscriptionJobName=job_name
        )
        
        status = response['TranscriptionJob']['TranscriptionJobStatus']
        
        if status == 'COMPLETED':
            transcript_uri = response['TranscriptionJob']['Transcript']['TranscriptFileUri']
            print(f"‚úì Transcription completed: {transcript_uri}")
            return response
        elif status == 'FAILED':
            print(f"‚úó Transcription failed: {response['TranscriptionJob'].get('FailureReason', 'Unknown')}")
            return None
        else:
            print(f"‚Ñπ Transcription status: {status}")
            return response
            
    except ClientError as e:
        print(f"‚úó Error retrieving transcription: {e}")
        return None

# Example usage
print("Note: To use this function, upload an audio file to S3 and call:")
print("job_name = transcribe_audio_file(bucket_name, 'raw-data/customer-call.mp3')")
print("# Wait a few minutes, then:")
print("result = get_transcription_result(job_name)")

### Step 12: Transform Survey Data to Natural Language

Convert structured tabular survey responses into natural language summaries.


**IMPORTANT NOTE:**
 AWS did not provide images, surveys and audio to implement and test this excercise. Code above are the snippets from AWS for further understanding.

In [None]:
import pandas as pd
import numpy as np
import argparse
import os
import json

def process_survey_data(input_path, output_path):
    # Read the survey data
    df = pd.read_csv(f"{input_path}/surveys.csv")
    
    # Basic data cleaning
    df = df.dropna(subset=['customer_id', 'survey_date'])  # Drop rows with missing key fields
    
    # Convert categorical ratings to numerical
    rating_map = {'Very Dissatisfied': 1, 'Dissatisfied': 2, 'Neutral': 3, 'Satisfied': 4, 'Very Satisfied': 5}
    for col in df.columns:
        if 'rating' in col.lower() or 'satisfaction' in col.lower():
            df[col] = df[col].map(rating_map).fillna(df[col])
    
    # Calculate summary statistics
    summary_stats = {
        'total_surveys': len(df),
        'avg_satisfaction': df['overall_satisfaction'].mean(),
        'sentiment_distribution': df['overall_satisfaction'].value_counts().to_dict(),
        'top_issues': df['improvement_area'].value_counts().head(3).to_dict()
    }
    
    # Generate natural language summaries for each survey
    summaries = []
    for _, row in df.iterrows():
        summary = {
            'customer_id': row['customer_id'],
            'survey_date': row['survey_date'],
            'summary_text': generate_summary(row),
            'ratings': {col: row[col] for col in df.columns if 'rating' in col.lower() or 'satisfaction' in col.lower()},
            'comments': row.get('comments', '')
        }
        summaries.append(summary)
    
    # Save the processed data
    with open(f"{output_path}/survey_summaries.json", 'w') as f:
        json.dump(summaries, f)
    
    with open(f"{output_path}/survey_statistics.json", 'w') as f:
        json.dump(summary_stats, f)

def generate_summary(row):
    """Generate a natural language summary of a survey response"""
    satisfaction_level = "satisfied" if row['overall_satisfaction'] >= 4 else \
                        "neutral" if row['overall_satisfaction'] == 3 else "dissatisfied"
    
    summary = f"Customer {row['customer_id']} was {satisfaction_level} overall with their experience. "
    
    # Add details about specific ratings
    if 'product_rating' in row:
        summary += f"They rated the product {row['product_rating']}/5. "
    
    if 'service_rating' in row:
        summary += f"They rated the customer service {row['service_rating']}/5. "
    
    # Add improvement area if available
    if 'improvement_area' in row and pd.notna(row['improvement_area']):
        summary += f"They suggested improvements in the area of {row['improvement_area']}. "
    
    # Add comments if available
    if 'comments' in row and pd.notna(row['comments']) and len(str(row['comments'])) > 0:
        summary += f"Their comments: '{row['comments']}'"
    
    return summary

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-path", type=str, default="/opt/ml/processing/input")
    parser.add_argument("--output-path", type=str, default="/opt/ml/processing/output")
    args = parser.parse_args()
    
    process_survey_data(args.input_path, args.output_path)


In [None]:
import boto3
import sagemaker
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput

def run_survey_processing_job():
    # Initialize SageMaker session
    sagemaker_session = sagemaker.Session()
    role = sagemaker.get_execution_role()
    
    # Define the processing job
    script_processor = ScriptProcessor(
        command=['python3'],
        image_uri='737474898029.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
        role=role,
        instance_count=1,
        instance_type='ml.m5.xlarge',
        sagemaker_session=sagemaker_session
    )
    
    # Run the processing job
    script_processor.run(
        code='survey_processing.py',
        inputs=[
            ProcessingInput(
                source='s3://customer-feedback-analysis-<your-initials>/raw-data/surveys.csv',
                destination='/opt/ml/processing/input'
            )
        ],
        outputs=[
            ProcessingOutput(
                output_name='survey_output',
                source='/opt/ml/processing/output',
                destination='s3://customer-feedback-analysis-<your-initials>/processed-data/surveys/'
            )
        ]
    )
    
    print("Survey processing job started")

if __name__ == "__main__":
    run_survey_processing_job()


---

## Part 3: Data Formatting for Foundation Models

### Step 13: Format Data for Claude in Amazon Bedrock

Prepare and format the processed multimodal feedback data according to Claude's input requirements.

---

## Summary and Next Steps

This notebook has implemented a comprehensive data validation and processing pipeline for customer feedback analysis, covering:

### Completed Components:

1. ‚úÖ **Data Validation Workflow**
   - S3 bucket setup and data upload
   - AWS Glue Data Catalog and Crawler
   - Data Quality Rulesets
   - Lambda-based text validation
   - CloudWatch monitoring

2. ‚úÖ **Multimodal Data Processing**
   - Amazon Comprehend for text analysis
   - Amazon Textract for image processing
   - Amazon Transcribe for audio transcription
   - Survey data transformation

3. ‚úÖ **Foundation Model Integration**
   - Claude data formatting
   - Conversation templates
   - Multimodal request handling

4. ‚úÖ **Quality Enhancement**
   - Entity and theme extraction
   - Text normalization
   - Feedback loop implementation

### Next Steps:

1. **Deploy Lambda Functions**: Create the Lambda functions in AWS Console using the provided code
2. **Configure IAM Roles**: Ensure proper permissions for all AWS services
3. **Test the Pipeline**: Upload sample data and verify the entire workflow
4. **Monitor Quality Metrics**: Use CloudWatch dashboard to track data quality over time
5. **Iterate and Improve**: Use the feedback loop to continuously enhance data quality

### Additional Features to Consider:

- Real-time streaming analysis with Kinesis
- Advanced topic modeling with SageMaker
- Custom ML models for domain-specific entity recognition
- Integration with business intelligence tools
- Automated reporting and alerting