# RDS Database Integration

Store and retrieve law analysis results from Amazon RDS (PostgreSQL/MySQL)

## 1. Install Database Driver

Choose and install the appropriate driver for your database type:

In [6]:
# For MySQL:
!pip install pymysql

# For PostgreSQL (uncomment if needed):
# !pip install psycopg2-binary



## 2. Database Configuration

Replace these with your actual RDS credentials:

In [13]:
import json
from datetime import datetime

# RDS Database Configuration
DB_CONFIG = {
    'host': 'database-2.cl2uwq0cma4d.us-west-2.rds.amazonaws.com',  # e.g., mydb.c9akciq32.us-west-2.rds.amazonaws.com
    'port': 3306,  # 5432 for PostgreSQL, 3306 for MySQL
    'database': 'database-2',
    'user': 'datathon-user-2',
    'password': 'LigmaBalls1!!'
}

print("✓ Database configuration set")
print(f"Host: {DB_CONFIG['host']}")
print(f"Database: {DB_CONFIG['database']}")

✓ Database configuration set
Host: database-2.cl2uwq0cma4d.us-west-2.rds.amazonaws.com
Database: database-2


## 3. Connection Functions

In [None]:
# For MySQL
import pymysql

def get_mysql_connection():
    """Create a connection to MySQL RDS"""
    conn = pymysql.connect(
        host=DB_CONFIG['host'],
        port=DB_CONFIG['port'],
        database=DB_CONFIG['database'],
        user=DB_CONFIG['user'],
        password=DB_CONFIG['password'],
        cursorclass=pymysql.cursors.DictCursor
    )
    return conn

# Test connection
try:
    conn = get_mysql_connection()
    print("✓ Successfully connected to MySQL RDS")
    conn.close()
except Exception as e:
    print(f"⚠ Connection error: {e}")

⚠ Connection error: (2003, "Can't connect to MySQL server on 'database-2.cl2uwq0cma4d.us-west-2.rds.amazonaws.com' (timed out)")


In [None]:
# Alternative: For PostgreSQL (uncomment if using PostgreSQL)
# import psycopg2
# from psycopg2.extras import RealDictCursor

# def get_postgres_connection():
#     """Create a connection to PostgreSQL RDS"""
#     conn = psycopg2.connect(
#         host=DB_CONFIG['host'],
#         port=DB_CONFIG['port'],
#         database=DB_CONFIG['database'],
#         user=DB_CONFIG['user'],
#         password=DB_CONFIG['password']
#     )
#     return conn

# # Test connection
# try:
#     conn = get_postgres_connection()
#     print("✓ Successfully connected to PostgreSQL RDS")
#     conn.close()
# except Exception as e:
#     print(f"⚠ Connection error: {e}")

## 4. Create Database Table

In [None]:
def create_analysis_table():
    """Create table to store analysis results"""
    conn = get_mysql_connection()
    cursor = conn.cursor()
    
    # MySQL syntax
    create_table_query = """
    CREATE TABLE IF NOT EXISTS law_analysis (
        id INT AUTO_INCREMENT PRIMARY KEY,
        law_text TEXT NOT NULL,
        analysis_result JSON,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        s3_input_key VARCHAR(500),
        endpoint_name VARCHAR(255)
    );
    """
    
    # For PostgreSQL, use this instead:
    # CREATE TABLE IF NOT EXISTS law_analysis (
    #     id SERIAL PRIMARY KEY,
    #     law_text TEXT NOT NULL,
    #     analysis_result JSONB,
    #     created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    #     s3_input_key VARCHAR(500),
    #     endpoint_name VARCHAR(255)
    # );
    
    cursor.execute(create_table_query)
    conn.commit()
    cursor.close()
    conn.close()
    
    print("✓ Table 'law_analysis' created successfully")

# Create the table
create_analysis_table()

## 5. Write to RDS

In [None]:
def write_analysis_to_rds(law_text, analysis_result, s3_input_key=None, endpoint_name="huggingface-demo-endpoint"):
    """
    Write law analysis results to RDS database
    
    Args:
        law_text: The original law text
        analysis_result: The analysis result from SageMaker (dict or JSON string)
        s3_input_key: Optional S3 key where the law document is stored
        endpoint_name: Name of the SageMaker endpoint used
    
    Returns:
        The ID of the inserted record
    """
    conn = get_mysql_connection()
    cursor = conn.cursor()
    
    # Convert result to JSON string if it's a dict
    if isinstance(analysis_result, dict):
        analysis_json = json.dumps(analysis_result)
    else:
        analysis_json = analysis_result
    
    # MySQL syntax
    insert_query = """
    INSERT INTO law_analysis (law_text, analysis_result, s3_input_key, endpoint_name)
    VALUES (%s, %s, %s, %s);
    """
    
    cursor.execute(insert_query, (law_text, analysis_json, s3_input_key, endpoint_name))
    record_id = cursor.lastrowid  # MySQL uses lastrowid instead of RETURNING
    
    conn.commit()
    cursor.close()
    conn.close()
    
    print(f"✓ Analysis saved to RDS with ID: {record_id}")
    return record_id

In [None]:
# Example usage:
example_law = "Banks must increase capital reserves by 15% by Q2 2026"
example_result = {"analysis": "This regulation will impact banking sector liquidity..."}

# Uncomment to test:
# record_id = write_analysis_to_rds(example_law, example_result)
# print(f"Record ID: {record_id}")

## 6. Read from RDS

In [None]:
def read_analysis_from_rds(record_id=None, limit=10):
    """
    Read analysis records from RDS
    
    Args:
        record_id: Optional specific record ID to fetch
        limit: Number of recent records to fetch if no ID specified
    
    Returns:
        List of records as dictionaries
    """
    conn = get_mysql_connection()
    cursor = conn.cursor()  # MySQL cursor is already DictCursor from connection
    
    if record_id:
        query = "SELECT * FROM law_analysis WHERE id = %s;"
        cursor.execute(query, (record_id,))
        result = cursor.fetchone()
        results = [result] if result else []
    else:
        query = "SELECT * FROM law_analysis ORDER BY created_at DESC LIMIT %s;"
        cursor.execute(query, (limit,))
        results = cursor.fetchall()
    
    cursor.close()
    conn.close()
    
    return results

In [None]:
# Example: Get latest 5 analyses
recent_analyses = read_analysis_from_rds(limit=5)

for analysis in recent_analyses:
    print(f"\nID: {analysis['id']}")
    print(f"Created: {analysis['created_at']}")
    print(f"Law: {analysis['law_text'][:100]}...")
    print("---")

## 7. Search and Query Functions

In [None]:
def search_analyses(search_term, limit=10):
    """
    Search for analyses containing specific terms
    
    Args:
        search_term: Text to search for in law_text
        limit: Maximum number of results to return
    
    Returns:
        List of matching records
    """
    conn = get_mysql_connection()
    cursor = conn.cursor()
    
    # MySQL syntax with LIKE (case-insensitive by default in MySQL)
    query = """
    SELECT id, law_text, created_at, endpoint_name
    FROM law_analysis
    WHERE law_text LIKE %s
    ORDER BY created_at DESC
    LIMIT %s;
    """
    
    # For PostgreSQL use ILIKE instead of LIKE
    
    cursor.execute(query, (f'%{search_term}%', limit))
    results = cursor.fetchall()
    
    cursor.close()
    conn.close()
    
    return results

In [None]:
# Example: Find all analyses related to "capital reserves"
results = search_analyses("capital reserves")

print(f"Found {len(results)} matching records:\n")
for record in results:
    print(f"ID: {record['id']}")
    print(f"Text: {record['law_text'][:100]}...")
    print("---")

## 8. Batch Operations

In [None]:
def batch_write_analyses(analyses_list):
    """
    Write multiple analyses to RDS in one transaction
    
    Args:
        analyses_list: List of tuples (law_text, analysis_result, s3_key, endpoint_name)
    
    Returns:
        List of record IDs
    """
    conn = get_mysql_connection()
    cursor = conn.cursor()
    record_ids = []
    
    try:
        for law_text, analysis_result, s3_key, endpoint_name in analyses_list:
            analysis_json = json.dumps(analysis_result) if isinstance(analysis_result, dict) else analysis_result
            
            insert_query = """
            INSERT INTO law_analysis (law_text, analysis_result, s3_input_key, endpoint_name)
            VALUES (%s, %s, %s, %s);
            """
            
            cursor.execute(insert_query, (law_text, analysis_json, s3_key, endpoint_name))
            record_ids.append(cursor.lastrowid)  # MySQL uses lastrowid
        
        conn.commit()
        print(f"✓ Successfully wrote {len(record_ids)} records to RDS")
        
    except Exception as e:
        conn.rollback()
        print(f"⚠ Error during batch write: {e}")
        raise
    finally:
        cursor.close()
        conn.close()
    
    return record_ids

In [None]:
# Example batch write:
batch_data = [
    ("Law 1: Banking regulations", {"impact": "high"}, None, "endpoint-1"),
    ("Law 2: ESG reporting requirements", {"impact": "medium"}, None, "endpoint-1"),
    ("Law 3: Digital payment compliance", {"impact": "low"}, None, "endpoint-1"),
]

# Uncomment to test:
# record_ids = batch_write_analyses(batch_data)
# print(f"Created records: {record_ids}")

## 9. Integration with SageMaker and S3

Use these functions alongside your SageMaker endpoint calls from `simple_api_call.ipynb`

In [None]:
# Example: After analyzing with SageMaker, store in RDS

# 1. Get your analysis result from SageMaker (from simple_api_call.ipynb)
# result = analyze_law(law_text)

# 2. Store in RDS
# record_id = write_analysis_to_rds(
#     law_text=law_text,
#     analysis_result=result,
#     s3_input_key="s3://my-bucket/laws/document.txt",
#     endpoint_name="huggingface-demo-endpoint"
# )

# 3. Later, retrieve the analysis
# stored_analysis = read_analysis_from_rds(record_id=record_id)
# print(stored_analysis)