In [2]:
#!/usr/bin/env python3
"""
Test script for the fixed single-file table insertion logic
"""
import sys
import os
sys.path.append('.')

from database.connection import DatabaseConnection
from datetime import datetime
import boto3

def check_table_structure():
    """Check the actual table structure in S3"""
    print("Checking table structure in S3...")
    
    s3_client = boto3.client('s3', region_name='us-east-1')
    bucket = 'cpi-uk-us-datascience-stage'
    
    # Check institution table file
    institution_key = 'auxiliary-data/reference-data/reference-db/institution/data.parquet'
    
    try:
        response = s3_client.head_object(Bucket=bucket, Key=institution_key)
        print(f"Institution table file exists: s3://{bucket}/{institution_key}")
        print(f"  Size: {response['ContentLength']} bytes")
        print(f"  Last modified: {response['LastModified']}")
        return True
    except Exception as e:
        print(f"Institution table file not found: {e}")
        return False

def read_current_data():
    """Read current data from the institution table"""
    print("\nReading current institution data...")
    
    try:
        df = DatabaseConnection.get_table_data('institution', limit=5)
        print(f"Current row count: {len(df) if not df.empty else 0}")
        
        if not df.empty:
            print("Sample records:")
            print(df[['institution_cpi', 'institution_type_layer1', 'country_sub']].to_string(index=False))
        else:
            print("No data found")
            
        return df
    except Exception as e:
        print(f"Error reading data: {e}")
        return None

def test_single_insert():
    """Test inserting a single record using the new method"""
    print("\n" + "="*50)
    print("TESTING SINGLE INSERT")
    print("="*50)
    
    # Create unique test data
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    test_data = {
        'institution_cpi': f'Test Institution {timestamp}',
        'institution_type_layer1': 'Private',
        'institution_type_layer2': 'Corporation',
        'institution_type_layer3': 'Corporate',
        'country_sub': 'United States',
        'country_parent': 'United States',
        'last_verified': datetime.now().year
    }
    
    print(f"Inserting: {test_data['institution_cpi']}")
    
    # Get row count before
    before_df = DatabaseConnection.get_table_data('institution')
    before_count = len(before_df) if not before_df.empty else 0
    print(f"Rows before insert: {before_count}")
    
    # Attempt insert
    success = DatabaseConnection.execute_insert('institution', test_data)
    
    if success:
        print("Insert operation completed successfully")
        
        # Check row count after
        print("Waiting 3 seconds for consistency...")
        import time
        time.sleep(3)
        
        after_df = DatabaseConnection.get_table_data('institution')
        after_count = len(after_df) if not after_df.empty else 0
        print(f"Rows after insert: {after_count}")
        
        if after_count > before_count:
            print(f"Success! Added {after_count - before_count} row(s)")
            
            # Try to find our specific record
            search_query = f"""
            SELECT * FROM institution 
            WHERE institution_cpi = '{test_data['institution_cpi']}'
            """
            result = DatabaseConnection.execute_query(search_query)
            
            if not result.empty:
                print("Record found in table:")
                print(result.to_string(index=False))
                return True
            else:
                print("Record not found when searching specifically")
                return False
        else:
            print("Row count did not increase")
            return False
    else:
        print("Insert operation failed")
        return False

def test_bulk_insert():
    """Test bulk insert of multiple records"""
    print("\n" + "="*50)
    print("TESTING BULK INSERT")
    print("="*50)
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    test_data_list = [
        {
            'institution_cpi': f'Bulk Test Institution 1 {timestamp}',
            'institution_type_layer1': 'Public',
            'institution_type_layer2': 'Government',
            'country_sub': 'Canada',
            'country_parent': 'Canada',
            'last_verified': datetime.now().year
        },
        {
            'institution_cpi': f'Bulk Test Institution 2 {timestamp}',
            'institution_type_layer1': 'Private',
            'institution_type_layer2': 'Funds',
            'institution_type_layer3': 'Venture Capital Fund',
            'country_sub': 'United Kingdom',
            'country_parent': 'United Kingdom',
            'last_verified': datetime.now().year
        }
    ]
    
    print(f"Inserting {len(test_data_list)} records")
    
    # Get row count before
    before_df = DatabaseConnection.get_table_data('institution')
    before_count = len(before_df) if not before_df.empty else 0
    print(f"Rows before bulk insert: {before_count}")
    
    # Attempt bulk insert
    success = DatabaseConnection.bulk_insert('institution', test_data_list)
    
    if success:
        print("Bulk insert operation completed successfully")
        
        # Check row count after
        print("Waiting 3 seconds for consistency...")
        import time
        time.sleep(3)
        
        after_df = DatabaseConnection.get_table_data('institution')
        after_count = len(after_df) if not after_df.empty else 0
        print(f"Rows after bulk insert: {after_count}")
        
        if after_count >= before_count + len(test_data_list):
            print(f"Success! Added {after_count - before_count} row(s)")
            return True
        else:
            print("Expected row count increase not found")
            return False
    else:
        print("Bulk insert operation failed")
        return False

def main():
    print("TESTING SINGLE-FILE TABLE INSERTION")
    print("="*60)
    
    # Check basic structure
    structure_ok = check_table_structure()
    if not structure_ok:
        print("❌ Table structure check failed")
        return
    
    # Read current data
    current_data = read_current_data()
    if current_data is None:
        print("❌ Could not read current data")
        return
    
    # Test single insert
    single_success = test_single_insert()
    
    # Test bulk insert
    bulk_success = test_bulk_insert()
    
    # Summary
    print("\n" + "="*60)
    print("SUMMARY")
    print("="*60)
    
    if single_success and bulk_success:
        print("🎉 ALL TESTS PASSED!")
        print("Your insert logic is now working correctly.")
    elif single_success:
        print("✅ Single insert works, ❌ bulk insert failed")
    elif bulk_success:
        print("❌ Single insert failed, ✅ bulk insert works")
    else:
        print("❌ Both tests failed")
        print("Check the error messages above for debugging")

if __name__ == "__main__":
    main()

TESTING FIXED INSERTION LOGIC

Checking table location...
Institution table expects data at: s3://cpi-uk-us-datascience-stage/auxiliary-data/reference-data/reference-db/data/institution/
Testing single record insert...
Inserting: Test Institution 20251016_120631
✅ Wrote to S3: s3://cpi-uk-us-datascience-stage/auxiliary-data/reference-data/reference-db/data/institution/institution_20251016_120634_038522.parquet
✅ Refreshed metadata for table institution
✅ Insert reported success
Verifying insert...


2025-10-16 12:06:39.756 
  command:

    streamlit run /opt/homebrew/Caskroom/miniforge/base/envs/cpi-data/lib/python3.10/site-packages/ipykernel_launcher.py [ARGUMENTS]


❌ Verification failed: Record 'Test Institution 20251016_120631' not found in institution
❌ Insert failed verification

❌ FAILED: Insert still not working
