In [None]:
# Import required libraries
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from IPython.display import display
import boto3

print("Libraries imported successfully!")

In [None]:
# Function to drop and recreate tables
def setup_databases():
    try:
        print("Starting database setup...")
        
        # First, try to connect to postgres database to create our databases
        print("Connecting to postgres database...")
        conn = psycopg2.connect(
            dbname="postgres",
            user="espinshalo",
            host="localhost",
            port="5432"
        )
        conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
        cur = conn.cursor()
        
        # Create source database if it doesn't exist
        print("Creating source database if it doesn't exist...")
        cur.execute("SELECT 1 FROM pg_database WHERE datname = 'fdms_source'")
        if not cur.fetchone():
            cur.execute('CREATE DATABASE fdms_source')
            print("Created source database")
        
        # Create target database if it doesn't exist
        print("Creating target database if it doesn't exist...")
        cur.execute("SELECT 1 FROM pg_database WHERE datname = 'fdms_target'")
        if not cur.fetchone():
            cur.execute('CREATE DATABASE fdms_target')
            print("Created target database")
        
        cur.close()
        conn.close()
        print("Databases created successfully")
        
        # Now connect to source database
        print("Connecting to source database...")
        source_conn = psycopg2.connect(
            dbname="fdms_source",
            user="espinshalo",
            host="localhost",
            port="5432"
        )
        source_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
        source_cur = source_conn.cursor()

        # Drop existing tables if they exist in source database
        print("Dropping existing tables in source database...")
        source_cur.execute("""
            DROP TABLE IF EXISTS employee_documents CASCADE;
            DROP TABLE IF EXISTS employees CASCADE;
            DROP TABLE IF EXISTS departments CASCADE;
            DROP TABLE IF EXISTS document_metadata CASCADE;
        """)
        print("Existing tables dropped from source database successfully!")

        # Create departments table
        print("Creating departments table...")
        source_cur.execute("""
        CREATE TABLE departments (
            department_id SERIAL PRIMARY KEY,
            department_name VARCHAR(100) NOT NULL
        );
        """)

        # Create employees table
        print("Creating employees table...")
        source_cur.execute("""
        CREATE TABLE employees (
            employee_id SERIAL PRIMARY KEY,
            first_name VARCHAR(50) NOT NULL,
            last_name VARCHAR(50) NOT NULL,
            email VARCHAR(100) NOT NULL UNIQUE,
            department_id INTEGER REFERENCES departments(department_id)
        );
        """)

        # Create employee_documents table
        print("Creating employee_documents table...")
        source_cur.execute("""
        CREATE TABLE employee_documents (
            document_id SERIAL PRIMARY KEY,
            employee_id INTEGER REFERENCES employees(employee_id),
            document_type VARCHAR(50) NOT NULL,
            file_path VARCHAR(255) NOT NULL,
            upload_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """)

        # Insert sample data into departments
        print("Inserting sample data into departments...")
        source_cur.execute("""
        INSERT INTO departments (department_name) VALUES 
            ('HR'),
            ('Engineering'),
            ('Finance');
        """)

        # Insert sample data into employees
        print("Inserting sample data into employees...")
        source_cur.execute("""
        INSERT INTO employees (first_name, last_name, email, department_id) VALUES 
            ('John', 'Doe', 'john.doe@example.com', 1),
            ('Jane', 'Smith', 'jane.smith@example.com', 2),
            ('Bob', 'Johnson', 'bob.johnson@example.com', 3);
        """)

        # Insert sample data into employee_documents
        print("Inserting sample data into employee_documents...")
        source_cur.execute("""
        INSERT INTO employee_documents (employee_id, document_type, file_path) VALUES 
            (1, 'PASSPORT', '/tmp/docs/passport_1.pdf'),
            (1, 'CONTRACT', '/tmp/docs/contract_1.pdf'),
            (2, 'VISA', '/tmp/docs/visa_2.pdf'),
            (2, 'PAYSLIP', '/tmp/docs/payslip_2.pdf'),
            (3, 'CONTRACT', '/tmp/docs/contract_3.pdf');
        """)

        # Close the source connection
        source_cur.close()
        source_conn.close()
        print("Source database setup completed")

        # Connect to target database
        print("Connecting to target database...")
        target_conn = psycopg2.connect(
            dbname="fdms_target",
            user="espinshalo",
            host="localhost",
            port="5432"
        )
        target_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
        target_cur = target_conn.cursor()

        # Drop existing document_metadata table if it exists in target database
        print("Dropping existing document_metadata table in target database...")
        target_cur.execute("""
            DROP TABLE IF EXISTS document_metadata CASCADE;
        """)

        # Create document_metadata table in target database
        print("Creating document_metadata table in target database...")
        target_cur.execute("""
        CREATE TABLE document_metadata (
            id SERIAL PRIMARY KEY,
            employee_id INTEGER NOT NULL,
            first_name VARCHAR(50) NOT NULL,
            last_name VARCHAR(50) NOT NULL,
            email VARCHAR(100) NOT NULL,
            department_name VARCHAR(100) NOT NULL,
            document_type VARCHAR(50) NOT NULL,
            document_category VARCHAR(50) NOT NULL,
            file_path VARCHAR(255) NOT NULL,
            upload_date TIMESTAMP NOT NULL,
            processed_date TIMESTAMP NOT NULL,
            status VARCHAR(20) NOT NULL,
            document_id VARCHAR(100) NOT NULL UNIQUE,
            s3_path VARCHAR(255)
        );
        """)

        # Close the target connection
        target_cur.close()
        target_conn.close()
        print("Target database setup completed")

        print("Database setup completed successfully!")

    except Exception as e:
        print(f"Error during database setup: {str(e)}")
        print(f"Error type: {type(e)}")
        import traceback
        print("Full traceback:")
        print(traceback.format_exc())

# Run the setup function
setup_databases()

In [None]:
def test_connections():
    try:
        # Test source database connection
        source_conn = psycopg2.connect(
            dbname='fdms_source',
            user='espinshalo',
            host='localhost',
            port='5432'
        )
        print("Successfully connected to source database")
        
        # Test target database connection
        target_conn = psycopg2.connect(
            dbname='fdms_target',
            user='espinshalo',
            host='localhost',
            port='5432'
        )
        print("Successfully connected to target database")
        
        # Close connections
        source_conn.close()
        target_conn.close()
        print("All connections closed successfully")
        
    except Exception as e:
        print(f"Error during connection test: {str(e)}")

# Run the connection test
test_connections()

In [None]:
def extract_data():
    try:
        print("Starting data extraction...")
        
        # Create SQLAlchemy engine for source database
        print("Creating database engine...")
        source_engine = create_engine('postgresql://espinshalo@localhost:5432/fdms_source')
        
        # Test connection
        print("Testing connection...")
        with source_engine.connect() as conn:
            print("Successfully connected to database")
        
        print("Extracting departments data...")
        departments_df = pd.read_sql('SELECT * FROM departments', source_engine)
        print(f"Extracted {len(departments_df)} departments")
        print("Departments data preview:")
        print(departments_df.head())
        
        print("\nExtracting document metadata...")
        document_metadata_df = pd.read_sql('SELECT * FROM document_metadata', source_engine)
        print(f"Extracted {len(document_metadata_df)} document metadata records")
        print("Document metadata preview:")
        print(document_metadata_df.head())
        
        print("\nExtracting employee documents...")
        employee_documents_df = pd.read_sql('SELECT * FROM employee_documents', source_engine)
        print(f"Extracted {len(employee_documents_df)} employee documents")
        print("Employee documents preview:")
        print(employee_documents_df.head())
        
        return departments_df, document_metadata_df, employee_documents_df
        
    except Exception as e:
        print(f"Error during data extraction: {str(e)}")
        print(f"Error type: {type(e)}")
        import traceback
        print("Full traceback:")
        print(traceback.format_exc())
        return None, None, None

# Run the extraction
print("Starting extraction process...")
departments_df, document_metadata_df, employee_documents_df = extract_data()
print("Extraction process completed.")

In [None]:
# First, let's check if df exists and has data
print("Checking if df exists and has data...")
print("df exists:", 'df' in locals())
if 'df' in locals():
    print("Number of rows in df:", len(df))
    print("\nFirst few rows of df:")
    display(df.head())
else:
    print("df variable not found. Please run Cell 4 first.")

# Now let's do the transformation
print("\nStarting data transformation...")

# Add document category based on document type
df['document_category'] = df['document_type'].apply(lambda x: 'IDENTIFICATION' if x in ['PASSPORT', 'VISA'] else 'EMPLOYMENT' if x in ['CONTRACT', 'PAYSLIP'] else 'OTHER')

# Generate document IDs
df['document_id'] = df.apply(lambda row: f"DOC_{row['employee_id']}_{row['document_type']}_{datetime.now().strftime('%Y%m%d%H%M%S')}", axis=1)

# Generate S3 paths
df['s3_path'] = df.apply(lambda row: f"s3://fdms-documents/{row['department_name']}/{row['document_type']}/{row['document_id']}.pdf", axis=1)

# Add processed date and status
df['processed_date'] = datetime.now()
df['status'] = 'PENDING'

# Reorder columns to match target table structure
df = df[[
    'employee_id', 'first_name', 'last_name', 'email', 'department_name',
    'document_type', 'document_category', 'file_path', 'upload_date',
    'processed_date', 'status', 'document_id', 's3_path'
]]

print("\nData transformation completed!")
print(f"Number of transformed records: {len(df)}")
print("\nTransformed data preview:")
display(df)