# Data Lake Setup and Configuration

This notebook sets up the DuckDB data lake with S3 integration and performs initial configuration.

## Prerequisites
1. Install required packages: `pip install -r requirements.txt`
2. Copy `.env.example` to `.env` and configure your AWS credentials
3. Ensure you have access to the S3 bucket `vendor-data-s3`

In [2]:
# Import required libraries
import sys
import os
import pandas as pd
import numpy as np
import duckdb
import logging
from pathlib import Path

# Add project root to Python path
project_root = Path.cwd()
if str(project_root) not in sys.path:
    sys.path.append(str(project_root))

# Import custom modules
from config import config
from utils import db_manager, s3_manager

# Setup logging
logger = config.setup_logging()
logger.info("Starting Data Lake Setup")

2025-06-18 13:24:44,399 - config.settings - INFO - Starting Data Lake Setup


## 1. Validate Configuration

In [3]:
# Validate configuration
try:
    config.validate_config()
    print("✅ Configuration validation passed")
    
    # Display key configuration settings
    print("\n📋 Configuration Summary:")
    print(f"S3 Bucket: {config.S3_BUCKET}")
    print(f"AWS Region: {config.AWS_DEFAULT_REGION}")
    print(f"DuckDB Database: {config.DUCKDB_DATABASE_PATH}")
    print(f"Memory Limit: {config.DUCKDB_MEMORY_LIMIT}")
    print(f"Threads: {config.DUCKDB_THREADS}")
    
except Exception as e:
    print(f"❌ Configuration validation failed: {e}")
    print("Please check your .env file and ensure all required variables are set")

✅ Configuration validation passed

📋 Configuration Summary:
S3 Bucket: vendor-data-s3
AWS Region: us-east-1
DuckDB Database: ./data_lake.duckdb
Memory Limit: 8GB
Threads: 4


## 2. Initialize DuckDB Connection

In [4]:
# Initialize DuckDB connection
try:
    conn = db_manager.connect()
    print("✅ DuckDB connection established")
    
    # Test basic functionality
    test_result = db_manager.execute_query("SELECT 'Hello DuckDB!' as message")
    print(f"Test query result: {test_result['message'].iloc[0]}")
    
except Exception as e:
    print(f"❌ Failed to connect to DuckDB: {e}")

2025-06-18 13:24:56,367 - utils.database - INFO - S3 credentials configured
2025-06-18 13:24:56,368 - utils.database - INFO - DuckDB connection configured successfully
2025-06-18 13:24:56,377 - utils.database - INFO - Query executed successfully. Returned 1 rows


✅ DuckDB connection established
Test query result: Hello DuckDB!


## 3. Test S3 Connectivity

In [None]:
# Test S3 connectivity
print("🔗 Testing S3 connectivity...")

try:
    # Test DuckDB S3 integration
    s3_test = db_manager.test_s3_connection()
    
    if s3_test:
        print("✅ DuckDB S3 integration working")
    else:
        print("❌ DuckDB S3 integration failed")
    
    # List files in S3 bucket using boto3
    files = s3_manager.list_files('LSEG/TRTH/LSE/ingestion/')
    
    if files:
        print(f"✅ Found {len(files)} files in S3 bucket")
        print("\n📁 Sample files:")
        for file in files[:5]:  # Show first 5 files
            size_mb = file['size'] / (1024 * 1024)
            print(f"  {file['key']} ({size_mb:.1f} MB)")
    else:
        print("❌ No files found in S3 bucket")
        
except Exception as e:
    print(f"❌ S3 connectivity test failed: {e}")

🔗 Testing S3 connectivity...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

## 4. Create Database Schema

In [None]:
# Create database schema for data lake layers
print("🏗️ Creating database schema...")

schema_sql = """
-- Create schemas for different data lake layers
CREATE SCHEMA IF NOT EXISTS bronze;
CREATE SCHEMA IF NOT EXISTS silver;
CREATE SCHEMA IF NOT EXISTS gold;
CREATE SCHEMA IF NOT EXISTS staging;
CREATE SCHEMA IF NOT EXISTS audit;

-- Create audit tables for tracking data lineage
CREATE TABLE IF NOT EXISTS audit.data_ingestion_log (
    ingestion_id UUID DEFAULT gen_random_uuid(),
    source_path VARCHAR,
    file_size BIGINT,
    record_count BIGINT,
    ingestion_timestamp TIMESTAMP DEFAULT NOW(),
    data_date DATE,
    processing_status VARCHAR DEFAULT 'bronze',
    error_message VARCHAR,
    created_at TIMESTAMP DEFAULT NOW()
);

-- Create pipeline state tracking
CREATE TABLE IF NOT EXISTS audit.pipeline_state (
    pipeline_name VARCHAR PRIMARY KEY,
    last_processed_date DATE,
    last_processed_file VARCHAR,
    processing_status VARCHAR,
    last_updated TIMESTAMP DEFAULT NOW(),
    error_message VARCHAR
);

-- Create data quality checks table
CREATE TABLE IF NOT EXISTS audit.data_quality_checks (
    check_id UUID DEFAULT gen_random_uuid(),
    check_name VARCHAR,
    table_name VARCHAR,
    check_timestamp TIMESTAMP DEFAULT NOW(),
    records_checked BIGINT,
    records_failed BIGINT,
    failure_rate DECIMAL(5,2),
    check_status VARCHAR,
    error_details VARCHAR
);
"""

try:
    db_manager.execute_sql(schema_sql)
    print("✅ Database schema created successfully")
    
    # Verify tables were created
    tables = db_manager.list_tables()
    print(f"\n📊 Created {len(tables)} tables:")
    for _, table in tables.iterrows():
        print(f"  {table['schema']}.{table['name']}")
        
except Exception as e:
    print(f"❌ Failed to create schema: {e}")

## 5. Initialize Pipeline State

In [None]:
# Initialize pipeline state
print("🔄 Initializing pipeline state...")

init_pipeline_sql = """
INSERT INTO audit.pipeline_state (pipeline_name, last_processed_date, processing_status)
VALUES ('lse_market_data', '2023-08-31', 'ready')
ON CONFLICT (pipeline_name) DO UPDATE SET
    processing_status = 'ready',
    last_updated = NOW();
"""

try:
    db_manager.execute_sql(init_pipeline_sql)
    
    # Check pipeline state
    pipeline_state = db_manager.execute_query("SELECT * FROM audit.pipeline_state")
    print("✅ Pipeline state initialized")
    print("\n📊 Current pipeline state:")
    print(pipeline_state)
    
except Exception as e:
    print(f"❌ Failed to initialize pipeline state: {e}")

## 6. Test Data Access from S3

In [None]:
# Test reading data from S3
print("📖 Testing data access from S3...")

test_query = f"""
SELECT 
    filename,
    COUNT(*) as row_count
FROM read_csv('{config.INGESTION_PATH}/*/*.csv.gz', 
             AUTO_DETECT=true, 
             FILENAME=true,
             SAMPLE_SIZE=10000) 
GROUP BY filename
LIMIT 5
"""

try:
    sample_data = db_manager.execute_query(test_query)
    
    if not sample_data.empty:
        print("✅ Successfully read data from S3")
        print(f"\n📊 Sample data from {len(sample_data)} files:")
        print(sample_data)
        
        total_rows = sample_data['row_count'].sum()
        print(f"\nTotal rows in sample: {total_rows:,}")
    else:
        print("❌ No data found in S3 files")
        
except Exception as e:
    print(f"❌ Failed to read data from S3: {e}")
    print("Check your S3 credentials and file paths")

## 7. Data Schema Discovery

In [None]:
# Discover data schema
print("🔍 Discovering data schema...")

schema_query = f"""
DESCRIBE (
    SELECT * FROM read_csv('{config.INGESTION_PATH}/*/*.csv.gz', 
                          AUTO_DETECT=true,
                          SAMPLE_SIZE=50000) 
    LIMIT 0
)
"""

try:
    schema_info = db_manager.execute_query(schema_query)
    
    print("✅ Schema discovery completed")
    print(f"\n📊 Discovered {len(schema_info)} columns:")
    print(schema_info)
    
    # Save schema info for later use
    schema_info.to_csv('discovered_schema.csv', index=False)
    print("\n💾 Schema saved to 'discovered_schema.csv'")
    
except Exception as e:
    print(f"❌ Schema discovery failed: {e}")

## 8. Performance Settings Optimization

In [6]:
# Optimize DuckDB settings for large data processing
print("⚡ Optimizing performance settings...")

performance_sql = f"""
-- Set performance parameters
SET memory_limit='{config.DUCKDB_MEMORY_LIMIT}';
SET threads={config.DUCKDB_THREADS};
SET enable_progress_bar=1;

-- Set S3 specific optimizations
SET s3_region='{config.AWS_DEFAULT_REGION}';
SET enable_http_metadata_cache=true;
SET http_timeout=30000;

-- Optimize for analytical workloads
SET default_order='ASC';
SET enable_object_cache=true;
"""

try:
    db_manager.execute_sql(performance_sql)
    print("✅ Performance settings optimized")
    
    # Display current settings
    settings_query = """
    SELECT name, value, description 
    FROM duckdb_settings() 
    WHERE name IN ('memory_limit', 'threads', 'enable_progress_bar')
    """
    
    current_settings = db_manager.execute_query(settings_query)
    print("\n⚙️ Current settings:")
    print(current_settings)
    
except Exception as e:
    print(f"❌ Failed to optimize settings: {e}")

2025-06-18 16:19:34,805 - utils.database - INFO - SQL statement executed successfully
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││    Query Profiling Information    ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
     SELECT name, value, description      FROM duckdb_settings()      WHERE name IN ('memory_limit', 'threads', 'enable_progress_bar')     
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││         HTTPFS HTTP Stats         ││
││                                   ││
││  2025-06-18 16:19:34,826 - utils.database - INFO - Query executed successfully. Returned 3 rows


⚡ Optimizing performance settings...
✅ Performance settings optimized

⚙️ Current settings:
                  name    value  \
0  enable_progress_bar     true   
1         memory_limit  7.4 GiB   
2              threads        4   

                                         description  
0  Enables the progress bar, printing progress to...  
1        The maximum memory of the system (e.g. 1GB)  
2    The number of total threads used by the system.  


          in: 0 bytes            ││
││            out: 0 bytes           ││
││              #HEAD: 0             ││
││              #GET: 0              ││
││              #PUT: 0              ││
││              #POST: 0             ││
││             #DELETE: 0            ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
┌────────────────────────────────────────────────┐
│┌──────────────────────────────────────────────┐│
││              Total Time: 0.0122s             ││
│└──────────────────────────────────────────────┘│
└────────────────────────────────────────────────┘
┌───────────────────────────┐
│           QUERY           │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│           FILTER          │
│    ────────────────────   │
│  ((name = 'memory_limit') │
│  OR (name = 'threads') OR │
│          (name =          │
│  'enable_progress_bar'))  │
│                           │
│           3 Rows          │
│          (0.00s)          │

## 9. Create Helper Functions

In [11]:
# Create useful helper functions (DuckDB 1.3.1 Compatible - Fixed)
print("🛠️ Creating helper functions...")

# Fixed approach - handle existing table gracefully
helper_setup_sql = """
-- Create audit schema
CREATE SCHEMA IF NOT EXISTS audit;

-- Drop and recreate the logging table to ensure correct schema
DROP TABLE IF EXISTS audit.data_ingestion_log;

CREATE TABLE audit.data_ingestion_log (
    log_id INTEGER PRIMARY KEY,
    source_path VARCHAR,
    record_count BIGINT,
    processing_status VARCHAR,
    error_message VARCHAR,
    created_at TIMESTAMP DEFAULT current_timestamp
);

-- Create a simple sequence for log IDs
DROP SEQUENCE IF EXISTS audit.log_seq;
CREATE SEQUENCE audit.log_seq START 1;

-- Create helper views (these work reliably)
CREATE OR REPLACE VIEW system_info AS
SELECT 
    current_setting('memory_limit') as memory_limit,
    current_setting('threads') as thread_count,
    current_timestamp as check_time;

CREATE OR REPLACE VIEW database_info AS
SELECT 
    current_database() as database_name,
    version() as duckdb_version,
    current_timestamp as current_time;

CREATE OR REPLACE VIEW audit.processing_summary AS
SELECT 
    processing_status,
    COUNT(*) as event_count,
    MAX(created_at) as last_occurrence
FROM audit.data_ingestion_log
GROUP BY processing_status;
"""

try:
    db_manager.execute_sql(helper_setup_sql)
    print("✅ Helper infrastructure created successfully")
    
    # Test logging with the correct schema
    log_sql = """
        INSERT INTO audit.data_ingestion_log 
        (log_id, source_path, record_count, processing_status, error_message)
        VALUES (nextval('audit.log_seq'), 'data_lake_initialization', 0, 'setup', 'Setup completed')
    """
    
    db_manager.execute_sql(log_sql)
    print("✅ Logging setup successful")
    
    # Test by querying the log
    test_result = db_manager.execute_query(
        "SELECT * FROM audit.data_ingestion_log ORDER BY created_at DESC LIMIT 1"
    )
    print(f"✅ Logging test successful: {test_result['processing_status'].iloc[0]} - {test_result['error_message'].iloc[0]}")
    
    # Test the views
    db_info = db_manager.execute_query("SELECT * FROM database_info")
    print(f"✅ Database info: {db_info['database_name'].iloc[0]} v{db_info['duckdb_version'].iloc[0]}")
    
    print("\n💡 Helper Views Available:")
    print("  📊 System Info: SELECT * FROM system_info;")
    print("  📋 Processing Summary: SELECT * FROM audit.processing_summary;")
    print("  🔍 Database Info: SELECT * FROM database_info;")
    print("  📝 Log Event: INSERT INTO audit.data_ingestion_log (...) VALUES (...);")
    print("  📈 Table Stats: SELECT COUNT(*) FROM your_table;")
    
except Exception as e:
    print(f"❌ Failed to create helper functions: {e}")
    print("\n🔧 Alternative approach - simpler logging:")
    
    # Fallback: Create a simpler logging approach without sequences
    try:
        simple_sql = """
        DROP TABLE IF EXISTS audit.simple_log;
        CREATE TABLE audit.simple_log (
            source_path VARCHAR,
            record_count BIGINT,
            processing_status VARCHAR,
            error_message VARCHAR,
            created_at TIMESTAMP DEFAULT current_timestamp
        );
        """
        
        db_manager.execute_sql(simple_sql)
        
        # Test simple logging
        db_manager.execute_sql("""
            INSERT INTO audit.simple_log (source_path, record_count, processing_status, error_message)
            VALUES ('data_lake_initialization', 0, 'setup', 'Setup completed with simple logging')
        """)
        
        print("✅ Simple logging table created successfully")
        print("💡 Use: INSERT INTO audit.simple_log (...) VALUES (...) for logging")
        
    except Exception as fallback_error:
        print(f"❌ Fallback also failed: {fallback_error}")

2025-06-18 16:26:16,579 - utils.database - INFO - SQL statement executed successfully
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││    Query Profiling Information    ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
         INSERT INTO audit.data_ingestion_log          (log_id, source_path, record_count, processing_status, error_message)         VALUES (nextval('audit.log_seq'), 'data_lake_initialization', 0, 'setup', 'Setup completed')     
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││         HTTPFS HTTP Stats         ││
││                                   ││
││            in: 0 bytes            ││
││            out: 0 bytes           ││
││              #HEAD: 0             ││
││              #GET: 0              ││
││              #PUT: 0              ││
││              #POST: 0             ││
││             #DELETE: 0            ││
│└───────────────────────────────────┘│
└──

🛠️ Creating helper functions...
✅ Helper infrastructure created successfully
✅ Logging setup successful
✅ Logging test successful: setup - Setup completed
✅ Database info: data_lake vv1.3.1

💡 Helper Views Available:
  📊 System Info: SELECT * FROM system_info;
  📋 Processing Summary: SELECT * FROM audit.processing_summary;
  🔍 Database Info: SELECT * FROM database_info;
  📝 Log Event: INSERT INTO audit.data_ingestion_log (...) VALUES (...);
  📈 Table Stats: SELECT COUNT(*) FROM your_table;


## 10. Setup Summary

In [13]:
# Display setup summary
print("📋 Data Lake Setup Summary")
print("=" * 50)

try:
    # Get database info
    db_info = db_manager.execute_query("SELECT current_database() as db_name, version() as db_version")

    
    # Get table count
    tables = db_manager.list_tables()
    
    # Get pipeline state
    pipeline_state = db_manager.execute_query("SELECT * FROM audit.pipeline_state")
    
    print(f"✅ DuckDB Version: {db_info.iloc[0]['db_version']}")
    print(f"✅ Database: {db_info.iloc[0]['db_name']}")
    print(f"✅ Tables Created: {len(tables)}")
    print(f"✅ Schemas: bronze, silver, gold, staging, audit")
    print(f"✅ S3 Integration: Configured")
    print(f"✅ Pipeline State: Initialized")
    
    print("\n🎉 Data lake setup completed successfully!")
    print("\n📌 Next Steps:")
    print("1. Run '02_data_discovery.ipynb' to explore your data")
    print("2. Run '03_bronze_layer.ipynb' to create the bronze layer")
    print("3. Run '04_silver_layer.ipynb' to clean and transform data")
    print("4. Run '05_gold_layer.ipynb' to create analytics-ready data")
    
except Exception as e:
    print(f"❌ Error generating summary: {e}")

finally:
    # Clean up (optional - you might want to keep connection open)
    # db_manager.close()
    print("\n✨ Setup notebook completed")

┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││    Query Profiling Information    ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
SELECT current_database() as db_name, version() as db_version
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││         HTTPFS HTTP Stats         ││
││                                   ││
││            in: 0 bytes            ││
││            out: 0 bytes           ││
││              #HEAD: 0             ││
││              #GET: 0              ││
││              #PUT: 0              ││
││              #POST: 0             ││
││             #DELETE: 0            ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
┌────────────────────────────────────────────────┐
│┌──────────────────────────────────────────────┐│
││              Total Time: 0.0035s             ││
│└──────────────────────────────────────────────┘│
�2025-06-18 16

📋 Data Lake Setup Summary
✅ DuckDB Version: v1.3.1
✅ Database: data_lake
✅ Tables Created: 2
✅ Schemas: bronze, silver, gold, staging, audit
✅ S3 Integration: Configured
✅ Pipeline State: Initialized

🎉 Data lake setup completed successfully!

📌 Next Steps:
1. Run '02_data_discovery.ipynb' to explore your data
2. Run '03_bronze_layer.ipynb' to create the bronze layer
3. Run '04_silver_layer.ipynb' to clean and transform data
4. Run '05_gold_layer.ipynb' to create analytics-ready data

✨ Setup notebook completed
