# OLI Torus XAPI ETL Pipeline - Bulk Processing

This notebook provides an interface to trigger bulk processing of historical XAPI data using AWS Lambda.

## Prerequisites

1. AWS credentials configured (via AWS CLI, environment variables, or IAM roles)
2. Lambda functions deployed in your target environment
3. Appropriate permissions to invoke Lambda functions

## Setup

## Quick Setup - Install Requirements

Run the cell below to install all required dependencies for this notebook:

In [None]:
# Install all required dependencies
# Choose one of the options below:

# Option 1: Install notebook-specific requirements (includes pandas and visualization tools)
!pip install -r notebook-requirements.txt

In [None]:
import boto3
import json
import pandas as pd
from datetime import datetime, timedelta
import time

# Configure AWS region and environment
AWS_REGION = 'us-east-1'  # Change to your region
ENVIRONMENT = 'dev'  # Change to 'staging' or 'prod' as needed

# Initialize AWS clients
lambda_client = boto3.client('lambda', region_name=AWS_REGION)
s3_client = boto3.client('s3', region_name=AWS_REGION)

# Unified Lambda function name (adjust based on your deployment)
XAPI_ETL_FUNCTION = f'xapi-etl-processor-{ENVIRONMENT}'

print(f"Configured for {ENVIRONMENT} environment in {AWS_REGION}")
print(f"Unified XAPI ETL Function: {XAPI_ETL_FUNCTION}")

In [None]:
# S3 Configuration
S3_XAPI_BUCKET = 'torus-xapi-test'  # Replace with your actual S3 bucket name

print(f"S3 Bucket: {S3_XAPI_BUCKET}")
print("⚠️  Make sure to update S3_XAPI_BUCKET with your actual bucket name!")

In [None]:
import sys

# Execution Mode Configuration
EXECUTION_MODE = 'local'  # Change to 'local' to run locally instead of using Lambda

# For local execution, we'll import the required modules
if EXECUTION_MODE == 'local':
    import sys
    import os

    # Add current directory to path to import local modules
    current_dir = os.path.dirname(os.path.abspath('__file__' if '__file__' in globals() else os.getcwd()))
    if current_dir not in sys.path:
        sys.path.insert(0, current_dir)

    # Import local modules
    try:
        from lambda_function import lambda_handler, health_check
        from common import get_config
        from clickhouse_client import ClickHouseClient
        print(f"✅ Local modules imported successfully")
    except ImportError as e:
        print(f"❌ Failed to import local modules: {e}")
        print("Make sure you're running this notebook from the xapi-etl-processor directory")
        print("Exiting due to failed local module imports.")
        sys.exit(1)

print(f"🔧 Execution mode: {EXECUTION_MODE.upper()}")
if EXECUTION_MODE == 'lambda':
    print(f"   Using Lambda function: {XAPI_ETL_FUNCTION}")
else:
    print(f"   Running locally with imported modules")

## Execution Modes

This notebook supports two execution modes:

### Lambda Mode (Default)
- **EXECUTION_MODE = 'lambda'**
- Invokes the deployed AWS Lambda function remotely
- Requires AWS credentials and deployed Lambda function
- Best for production use and processing large datasets
- Supports true asynchronous processing

### Local Mode
- **EXECUTION_MODE = 'local'**
- Runs the Lambda function code locally in this notebook
- Requires local modules (lambda_function.py, common.py, clickhouse_client.py)
- Good for testing and development
- All operations run synchronously in the notebook

**To switch modes:** Change the `EXECUTION_MODE` variable in the cell above and re-run that cell.

## Helper Functions

In [None]:
def execute_function(payload):
    """Execute function either locally or via Lambda based on EXECUTION_MODE"""
    if EXECUTION_MODE == 'local':
        return execute_local(payload)
    else:
        return invoke_lambda_sync(payload)

def execute_function_async(payload):
    """Execute function asynchronously - Lambda only or simulate locally"""
    if EXECUTION_MODE == 'local':
        # For local execution, run synchronously but indicate it's "async"
        result = execute_local(payload)
        if result['success']:
            result['async_mode'] = True
            result['request_id'] = f"local-{datetime.now().strftime('%Y%m%d%H%M%S')}"
        return result
    else:
        return invoke_lambda_async(payload)

def execute_local(payload):
    """Execute the Lambda function locally"""
    try:
        # Create a mock context object
        class MockContext:
            def __init__(self):
                self.function_name = "xapi-etl-processor-local"
                self.function_version = "1"
                self.aws_request_id = f"local-{datetime.now().strftime('%Y%m%d%H%M%S')}"

        context = MockContext()

        # Call the lambda_handler function directly
        result = lambda_handler(payload, context)

        return {
            'success': True,
            'status_code': result.get('statusCode', 200),
            'result': result,
            'execution_mode': 'local'
        }
    except Exception as e:
        return {
            'success': False,
            'error': str(e),
            'execution_mode': 'local'
        }

def invoke_lambda_async(payload):
    """Invoke the unified Lambda function asynchronously"""
    try:
        response = lambda_client.invoke(
            FunctionName=XAPI_ETL_FUNCTION,
            InvocationType='Event',  # Async invocation
            Payload=json.dumps(payload)
        )
        return {
            'success': True,
            'status_code': response['StatusCode'],
            'request_id': response['ResponseMetadata']['RequestId'],
            'execution_mode': 'lambda'
        }
    except Exception as e:
        return {
            'success': False,
            'error': str(e),
            'execution_mode': 'lambda'
        }

def invoke_lambda_sync(payload):
    """Invoke the unified Lambda function synchronously"""
    try:
        response = lambda_client.invoke(
            FunctionName=XAPI_ETL_FUNCTION,
            InvocationType='RequestResponse',  # Sync invocation
            Payload=json.dumps(payload)
        )

        result = json.loads(response['Payload'].read().decode())
        return {
            'success': True,
            'status_code': response['StatusCode'],
            'result': result,
            'execution_mode': 'lambda'
        }
    except Exception as e:
        return {
            'success': False,
            'error': str(e),
            'execution_mode': 'lambda'
        }

def check_function_health():
    """Check if the function is healthy (Lambda or local)"""
    payload = {'health_check': True}
    return execute_function(payload)

print("Helper functions loaded")

## 1. Health Checks

First, let's verify that our Lambda functions and ClickHouse are healthy:

In [None]:
# Check XAPI ETL processor health
print("Checking XAPI ETL processor health...")
health = check_function_health()
print(json.dumps(health, indent=2))

## 2. Dry Run - Explore Available Data

Let's do a dry run to see what data is available for processing:

In [None]:
# Configuration for dry run
dry_run_payload = {
    'mode': 'bulk',
    's3_bucket': S3_XAPI_BUCKET,
    's3_prefix': 'section/',  # Adjust based on your S3 structure
    'start_date': '2024-01-01',  # Adjust date range as needed
    'end_date': '2024-12-31',
    'dry_run': True
}

print("Running dry run to explore available data...")
dry_run_result = execute_function(dry_run_payload)

if dry_run_result['success']:
    result_body = dry_run_result['result']['body']
    if isinstance(result_body, str):
        result_data = json.loads(result_body)
    else:
        result_data = result_body

    print(f"Found {result_data.get('files_found', 0)} files to process")
    print(f"Total files: {result_data.get('total_files', 0)}")

    if 'files' in result_data:
        print("\nSample files:")
        for i, file in enumerate(result_data['files'][:5], 1):
            print(f"  {i}. {file}")
else:
    print(f"Dry run failed: {dry_run_result.get('error', 'Unknown error')}")

In [None]:
# Optional: Clear existing section data before reprocessing
# Uncomment and run this cell if you want to start fresh for this section

clear_section_data = True  # Set to True to enable clearing
target_section_id = '145'   # Make sure this matches your section ID above

if clear_section_data:
    print(f"⚠️  WARNING: This will delete ALL existing data for section {target_section_id}")

    if EXECUTION_MODE == 'local':
        try:
            # For local execution, directly use ClickHouse client
            from clickhouse_client import ClickHouseClient
            clickhouse_client = ClickHouseClient()

            # Delete all events for this section across all tables
            tables = ['video_events', 'activity_attempt_events', 'page_attempt_events',
                     'page_viewed_events', 'part_attempt_events']

            total_deleted = 0
            for table in tables:
                try:
                    query = f"DELETE FROM {clickhouse_client.database}.{table} WHERE section_id = {target_section_id}"
                    response = clickhouse_client._execute_query(query)
                    print(f"✅ Cleared {table} for section {target_section_id}")
                except Exception as e:
                    print(f"⚠️  Table {table} may not exist or is empty: {str(e)}")

            print(f"✅ Section {target_section_id} data cleared - you can now set force_reprocess=False")

        except Exception as e:
            print(f"❌ Failed to clear section data: {str(e)}")
    else:
        print("❌ Data clearing only available in local mode")
        print("   For Lambda mode, use force_reprocess=True instead")
else:
    print("💡 Section data clearing disabled")
    print("   Set clear_section_data = True above to enable section data clearing")

## 3. Process Specific Section Data

Process historical data for a specific course section:

In [None]:
# Configure section-specific processing
section_id = '145'  # Replace with actual section ID
start_date = '2023-01-01'  # Adjust as needed
end_date = '2025-12-31'    # Adjust as needed

section_payload = {
    'mode': 'bulk',
    's3_bucket': S3_XAPI_BUCKET,
    'section_id': section_id,
    'start_date': start_date,
    'end_date': end_date,
    'force_reprocess': True  # Set to True to reprocess existing data - CHANGED!
}

print(f"Processing section {section_id} data from {start_date} to {end_date}...")
print(f"⚠️  Force reprocess enabled - will reprocess existing data")
if EXECUTION_MODE == 'local':
    print("This will run locally (synchronously).")
else:
    print("This will be an asynchronous operation.")

section_result = execute_function_async(section_payload)

if section_result['success']:
    print(f"✅ Successfully triggered processing for section {section_id}")
    print(f"Request ID: {section_result['request_id']}")
    if EXECUTION_MODE == 'lambda':
        print("Check CloudWatch logs for processing status.")
    else:
        print("Local execution completed.")
else:
    print(f"❌ Failed to trigger processing: {section_result.get('error', 'Unknown error')}")

## 4. Bulk Process Multiple Sections

Process data for multiple sections (useful for large-scale historical data loading):

In [None]:
# List of section IDs to process
section_ids = ['123', '456', '789']  # Replace with actual section IDs
date_range = {
    'start_date': '2024-01-01',
    'end_date': '2024-12-31'
}

print(f"Processing {len(section_ids)} sections...")

results = []
for i, section_id in enumerate(section_ids, 1):
    payload = {
        'mode': 'bulk',
        's3_bucket': S3_XAPI_BUCKET,
        'section_id': section_id,
        **date_range,
        'force_reprocess': False
    }

    print(f"  {i}/{len(section_ids)}: Triggering processing for section {section_id}...")

    result = execute_function_async(payload)
    results.append({
        'section_id': section_id,
        'success': result['success'],
        'request_id': result.get('request_id'),
        'error': result.get('error')
    })

    # Small delay to avoid overwhelming Lambda (not needed for local execution)
    if EXECUTION_MODE == 'lambda':
        time.sleep(1)

# Summary
successful = sum(1 for r in results if r['success'])
failed = len(results) - successful

print(f"\n📊 Summary:")
print(f"  ✅ Successfully triggered: {successful}")
print(f"  ❌ Failed: {failed}")

if failed > 0:
    print("\nFailed sections:")
    for result in results:
        if not result['success']:
            print(f"  - Section {result['section_id']}: {result['error']}")

## 5. Process All Available Data

Process all available XAPI data (use with caution for large datasets):

In [None]:
# ⚠️ WARNING: This will process ALL available data. Use carefully!
process_all = False  # Set to True to enable

if process_all:
    all_data_payload = {
        'mode': 'bulk',
        's3_bucket': S3_XAPI_BUCKET,
        's3_prefix': 'section/',
        'start_date': '2024-01-01',  # Adjust as needed
        'end_date': '2024-12-31',    # Adjust as needed
        'force_reprocess': False
    }

    print("⚠️  Processing ALL available data...")
    if EXECUTION_MODE == 'local':
        print("This will run locally (synchronously).")
    else:
        print("This is an asynchronous operation that may take a long time.")

    all_result = execute_function_async(all_data_payload)

    if all_result['success']:
        print(f"✅ Successfully triggered bulk processing")
        print(f"Request ID: {all_result['request_id']}")
        if EXECUTION_MODE == 'lambda':
            print("Monitor CloudWatch logs for progress.")
        else:
            print("Local execution completed.")
    else:
        print(f"❌ Failed to trigger bulk processing: {all_result.get('error')}")
else:
    print("Bulk processing disabled. Set process_all = True to enable.")

## 6. Test Single File Processing

Test processing of a single JSONL file:

In [None]:
# Test with a specific file
test_bucket = 'your-xapi-bucket'  # Replace with your S3 bucket
test_key = 'section/123/video/2024-01-01T12-00-00.000Z_test-bundle.jsonl'  # Replace with actual file

test_payload = {
    'bucket': test_bucket,
    'key': test_key
}

print(f"Testing single file processing: s3://{test_bucket}/{test_key}")

test_result = execute_function(test_payload)

if test_result['success']:
    result_body = test_result['result']['body']
    if isinstance(result_body, str):
        result_data = json.loads(result_body)
    else:
        result_data = result_body

    print("✅ Single file processing completed")
    print(json.dumps(result_data, indent=2))
else:
    print(f"❌ Single file processing failed: {test_result.get('error')}")

## 7. Monitoring and Troubleshooting

Check Lambda function logs and status:

In [None]:
# Check recent Lambda invocations (requires CloudWatch Logs access)
import boto3
from datetime import datetime, timedelta

logs_client = boto3.client('logs', region_name=AWS_REGION)

def get_recent_lambda_logs(function_name, hours=1):
    """Get recent logs for a Lambda function"""
    log_group = f'/aws/lambda/{function_name}'

    try:
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(hours=hours)

        response = logs_client.filter_log_events(
            logGroupName=log_group,
            startTime=int(start_time.timestamp() * 1000),
            endTime=int(end_time.timestamp() * 1000),
            limit=100
        )

        return response.get('events', [])
    except Exception as e:
        print(f"Error getting logs for {function_name}: {str(e)}")
        return []

# Get recent logs for XAPI ETL processor
print(f"Recent logs for {XAPI_ETL_FUNCTION}:")
etl_logs = get_recent_lambda_logs(XAPI_ETL_FUNCTION)
for event in etl_logs[-5:]:  # Show last 5 log events
    timestamp = datetime.fromtimestamp(event['timestamp'] / 1000)
    print(f"[{timestamp}] {event['message']}")

## 8. ClickHouse Data Verification

If you have direct access to ClickHouse, you can verify the data was loaded correctly:

In [None]:
# Note: This requires direct ClickHouse access or a separate verification Lambda
# Here's sample code for direct verification (uncomment and modify as needed)

import requests

# ClickHouse connection details
CLICKHOUSE_HOST = 'localhost'
CLICKHOUSE_PORT = 8123
CLICKHOUSE_USER = 'default'
CLICKHOUSE_PASSWORD = 'clickhouse'
CLICKHOUSE_DATABASE = 'oli_analytics_dev'

def query_clickhouse(query):
    url = f"http://{CLICKHOUSE_HOST}:{CLICKHOUSE_PORT}"
    headers = {
        'Content-Type': 'text/plain',
        'X-ClickHouse-User': CLICKHOUSE_USER,
        'X-ClickHouse-Key': CLICKHOUSE_PASSWORD
    }

    response = requests.post(url, data=query, headers=headers)
    if response.status_code == 200:
        return response.text
    else:
        return f"Error: {response.status_code} - {response.text}"

# Check total event count
total_events_query = f"SELECT COUNT(*) FROM {CLICKHOUSE_DATABASE}.video_events"
total_events = query_clickhouse(total_events_query)
print(f"Total video events in ClickHouse: {total_events}")

# Check events by section
section_query = f'''
SELECT section_id, COUNT(*) as event_count
FROM {CLICKHOUSE_DATABASE}.video_events
GROUP BY section_id
ORDER BY event_count DESC
LIMIT 10
'''
section_stats = query_clickhouse(section_query)
print(f"\\nEvents by section (top 10):\\n{section_stats}")