In [3]:
# System-level dependencies
!apt-get update
!apt-get install -y postgresql postgresql-contrib python3-dev libpq-dev

# Python packages
!pip install pandas numpy sqlalchemy psutil mimesis dask "dask[dataframe]" tqdm psycopg2-binary

Hit:1 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:3 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
libpq-dev is already the newest version (14.15-0ubuntu0.22.04.1).
libpq-dev set to manually installed.
python3-dev is already the newest version (3.1

In [12]:
import os
import csv
import time
import psutil
import threading
import pandas as pd
import numpy as np
from datetime import datetime, date
from typing import List, Dict
from sqlalchemy import create_engine, text
from multiprocessing import Pool
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from mimesis.locales import Locale
from mimesis.schema import Fieldset
import tempfile
import io

# Docker setup for PostgreSQL
def setup_postgres_colab():
    """Setup PostgreSQL in Google Colab"""
    print("Setting up PostgreSQL in Google Colab...")

    # Install PostgreSQL
    !apt-get update
    !apt-get install -y postgresql postgresql-contrib

    # Start PostgreSQL service
    !service postgresql start

    # Configure PostgreSQL to accept connections
    !sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'password';"
    !sudo -u postgres psql -c "CREATE DATABASE employees;"

    # Update pg_hba.conf to allow local connections
    !echo "host all all 0.0.0.0/0 md5" | sudo tee -a /etc/postgresql/*/main/pg_hba.conf

    # Update postgresql.conf to listen on all addresses
    !echo "listen_addresses = '*'" | sudo tee -a /etc/postgresql/*/main/postgresql.conf

    # Restart PostgreSQL to apply changes
    !service postgresql restart

    # Wait for PostgreSQL to be ready
    connection_string = "postgresql://postgres:password@localhost:5432/employees"
    engine = create_engine(connection_string)

    max_attempts = 30
    attempt = 0
    while attempt < max_attempts:
        try:
            print(f"Attempting to connect to database... (Attempt {attempt + 1}/{max_attempts})")
            with engine.connect() as connection:
                connection.execute(text("SELECT 1"))
            print("Successfully connected to PostgreSQL!")
            return connection_string
        except Exception as e:
            print(f"Connection attempt failed: {str(e)}")
            attempt += 1
            time.sleep(2)

    raise Exception("Failed to connect to PostgreSQL after maximum attempts")

# Database schema setup
def setup_database(engine):
    """Create SCD Type 2 table schema"""
    with engine.connect() as conn:
        conn.execute(text("""
            CREATE TABLE IF NOT EXISTS employees (
                employee_id INTEGER,
                name VARCHAR(100),
                email VARCHAR(100),
                address TEXT,
                phone VARCHAR(50),
                date_of_birth DATE,
                gender VARCHAR(10),
                company VARCHAR(100),
                position VARCHAR(100),
                salary DECIMAL(10,2),
                retired VARCHAR(3),
                valid_from TIMESTAMP,
                valid_to TIMESTAMP,
                is_current BOOLEAN,
                PRIMARY KEY (employee_id, valid_from)
            )
        """))
        conn.commit()

def generate_data(row_count: int) -> pd.DataFrame:
    """Generate synthetic data using Mimesis Fieldset"""
    fieldset = Fieldset(locale=Locale.EN)

    # Generate all fields at once using Fieldset
    employee_ids = list(range(row_count))
    names = fieldset("full_name", i=row_count)
    emails = fieldset("email", i=row_count)
    addresses = fieldset("address", i=row_count)
    phones = fieldset("telephone", i=row_count)
    dates = [str(date.isoformat()) for date in fieldset("date", start=1950, end=2005, i=row_count)]
    genders = np.random.choice(["Male", "Female"], size=row_count).tolist()
    cities = fieldset("city", i=row_count)
    positions = fieldset("occupation", i=row_count)
    salaries = np.round(np.random.uniform(30000, 200000, row_count), 2).tolist()
    retired = np.random.choice(["Yes", "No"], size=row_count).tolist()

    # Create records using list comprehension with zip
    records = [
        {
            "employee_id": emp_id,
            "name": name,
            "email": email,
            "address": address,
            "phone": phone,
            "date_of_birth": dob,
            "gender": gender,
            "company": f"{city} Corp",
            "position": position,
            "salary": salary,
            "retired": retired_status
        }
        for emp_id, name, email, address, phone, dob, gender, city, position, salary, retired_status
        in zip(employee_ids, names, emails, addresses, phones, dates, genders,
               cities, positions, salaries, retired)
    ]

    return pd.DataFrame(records)

def identify_changes(new_df: pd.DataFrame, engine) -> pd.DataFrame:
    """Compare new data with existing records and identify changes"""
    current_records = pd.read_sql(
        """
        SELECT * FROM employees
        WHERE is_current = true
        """,
        engine
    )

    if len(current_records) > 0:
        merged = new_df.merge(
            current_records,
            on='employee_id',
            how='left',
            suffixes=('_new', '_current')
        )

        changed_mask = (
            (merged['name_new'] != merged['name_current']) |
            (merged['email_new'] != merged['email_current']) |
            (merged['address_new'] != merged['address_current']) |
            (merged['phone_new'] != merged['phone_current']) |
            (merged['position_new'] != merged['position_current']) |
            (merged['salary_new'] != merged['salary_current'])
        )

        new_mask = merged['name_current'].isna()

        new_df['change_type'] = 'no_change'
        new_df.loc[new_mask, 'change_type'] = 'insert'
        new_df.loc[changed_mask & ~new_mask, 'change_type'] = 'update'
    else:
        new_df['change_type'] = 'insert'

    return new_df

def apply_scd2_changes(df: pd.DataFrame, engine) -> pd.DataFrame:
    """Apply SCD Type 2 changes to the data"""
    current_timestamp = datetime.now().isoformat()

    df['valid_from'] = current_timestamp
    df['valid_to'] = '9999-12-31 23:59:59'
    df['is_current'] = True

    updates = df[df['change_type'] == 'update']
    if not updates.empty:
        with engine.begin() as conn:
            employee_ids = tuple(updates['employee_id'].tolist())
            conn.execute(
                text("""
                    UPDATE employees
                    SET valid_to = :valid_to,
                        is_current = FALSE
                    WHERE employee_id IN :employee_ids
                    AND is_current = TRUE
                """),
                {
                    "valid_to": current_timestamp,
                    "employee_ids": employee_ids
                }
            )

    return df.drop(columns=['change_type'])

def truncate_table(engine):
    """Truncate the employees table"""
    with engine.connect() as conn:
        conn.execute(text("TRUNCATE TABLE employees"))
        conn.commit()
    print("Table truncated successfully")

# Resource monitoring
def monitor_resources(interval, stats):
    """Monitor CPU and memory usage"""
    while not stats['stop']:
        stats['cpu'].append(psutil.cpu_percent(interval=None))
        stats['memory'].append(psutil.virtual_memory().percent)
        time.sleep(interval)

def print_resource_stats(stats):
    """Print resource usage statistics"""
    print("\nResource Usage Statistics:")
    print(f"Average CPU Usage: {sum(stats['cpu']) / len(stats['cpu']):.2f}%")
    print(f"Max CPU Usage: {max(stats['cpu']):.2f}%")
    print(f"Min CPU Usage: {min(stats['cpu']):.2f}%")
    print(f"Average Memory Usage: {sum(stats['memory']) / len(stats['memory']):.2f}%")
    print(f"Max Memory Usage: {max(stats['memory']):.2f}%")
    print(f"Min Memory Usage: {min(stats['memory']):.2f}%")

def monitor_performance(func):
    """Decorator to monitor performance of loading methods"""
    def wrapper(*args, **kwargs):
        stats = {'cpu': [], 'memory': [], 'stop': False}

        # Start monitoring thread
        monitor_thread = threading.Thread(target=monitor_resources, args=(1, stats))
        monitor_thread.start()

        try:
            # Execute the loading function
            start_time = time.time()
            func(*args, **kwargs)
            duration = time.time() - start_time

            # Stop monitoring
            stats['stop'] = True
            monitor_thread.join()

            # Calculate resource statistics
            resource_stats = {
                'duration': duration,
                'avg_cpu': sum(stats['cpu']) / len(stats['cpu']) if stats['cpu'] else 0,
                'max_cpu': max(stats['cpu']) if stats['cpu'] else 0,
                'avg_memory': sum(stats['memory']) / len(stats['memory']) if stats['memory'] else 0,
                'max_memory': max(stats['memory']) if stats['memory'] else 0
            }

            print_resource_stats(stats)
            return duration, resource_stats

        except Exception as e:
            stats['stop'] = True
            monitor_thread.join()
            raise e

    return wrapper

# Update loading methods with performance monitoring
@monitor_performance
def load_row_by_row(df: pd.DataFrame, engine):
    """Load data row by row with SCD Type 2"""
    start_time = time.time()

    df = identify_changes(df, engine)
    df = apply_scd2_changes(df, engine)

    with engine.begin() as conn:
        for _, row in df.iterrows():
            conn.execute(
                text("""
                    INSERT INTO employees
                    VALUES (:employee_id, :name, :email, :address, :phone,
                           :date_of_birth, :gender, :company, :position,
                           :salary, :retired, :valid_from, :valid_to, :is_current)
                """),
                row.to_dict()
            )

    duration = time.time() - start_time
    return duration

# Update loading methods with performance monitoring
@monitor_performance
def load_bulk_pandas(df: pd.DataFrame, engine):
    """Load data using pandas bulk insert"""
    start_time = time.time()

    df = identify_changes(df, engine)
    df = apply_scd2_changes(df, engine)
    df.to_sql('employees', engine, if_exists='append', index=False, method='multi', chunksize=1000)

    duration = time.time() - start_time
    return duration

# Update loading methods with performance monitoring
@monitor_performance
def load_streaming_chunks(df: pd.DataFrame, engine, chunk_size=1000):
    """Load data in chunks"""
    start_time = time.time()

    df = identify_changes(df, engine)
    df = apply_scd2_changes(df, engine)

    for chunk_start in range(0, len(df), chunk_size):
        chunk = df.iloc[chunk_start:chunk_start + chunk_size]
        chunk.to_sql('employees', engine, if_exists='append', index=False, method='multi')

    duration = time.time() - start_time
    return duration

def parallel_worker(chunk_data):
    """Worker function for parallel processing"""
    engine = create_engine("postgresql://postgres:password@localhost:5432/postgres")
    chunk_data.to_sql('employees', engine, if_exists='append', index=False, method='multi')

# Update loading methods with performance monitoring
@monitor_performance
def load_parallel(df: pd.DataFrame, engine, num_processes=4):
    """Load data using parallel processing"""
    start_time = time.time()

    df = identify_changes(df, engine)
    df = apply_scd2_changes(df, engine)

    chunks = np.array_split(df, num_processes)
    with Pool(num_processes) as pool:
        pool.map(parallel_worker, chunks)

    duration = time.time() - start_time
    return duration

# Update loading methods with performance monitoring
@monitor_performance
def load_dask(df: pd.DataFrame, engine, npartitions=4):
    """Load data using Dask"""
    start_time = time.time()

    df = identify_changes(df, engine)
    df = apply_scd2_changes(df, engine)

    ddf = dd.from_pandas(df, npartitions=npartitions)
    with ProgressBar():
        for partition in ddf.partitions:
            partition.compute().to_sql('employees', engine, if_exists='append', index=False)

    duration = time.time() - start_time
    return duration

# Update loading methods with performance monitoring
@monitor_performance
def load_postgres_copy(df: pd.DataFrame, engine):
    """Load data using PostgreSQL COPY command"""
    start_time = time.time()

    df = identify_changes(df, engine)
    df = apply_scd2_changes(df, engine)

    with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
        df.to_csv(f, index=False, header=False, sep='\t')
        temp_file_path = f.name

    try:
        with engine.connect().execution_options(autocommit=True) as conn:
            with open(temp_file_path, 'r') as f:
                cursor = conn.connection.cursor()
                cursor.copy_from(
                    f,
                    'employees',
                    sep='\t',
                    columns=df.columns.tolist()
                )
    finally:
        os.remove(temp_file_path)

    duration = time.time() - start_time
    return duration

def main():
    # 1. Setup Database
    print("\n=== 1. Setting up PostgreSQL Database ===")
    connection_string = setup_postgres_colab()
    engine = create_engine(connection_string)
    setup_database(engine)

    # Rest of the function remains the same...

    # 2. Create Initial Load File
    print("\n=== 2. Creating Initial Load File ===")
    initial_df = generate_data(100)
    print(f"Generated {len(initial_df)} records for initial load")

    # 3. Create Subsequent Load File
    print("\n=== 3. Creating Subsequent Load File ===")
    update_df = generate_data(50)
    print(f"Generated {len(update_df)} records for subsequent load")

    # Define the loading methods
    methods = [
        (load_row_by_row, "Row-by-row"),
        (load_bulk_pandas, "Bulk Pandas"),
        (load_streaming_chunks, "Streaming Chunks"),
        (load_parallel, "Parallel Processing"),
        (load_dask, "Dask"),
        (load_postgres_copy, "PostgreSQL COPY")
    ]

    results = []

    # Execute methods sequentially
    for idx, (method, name) in enumerate(methods, start=1):
        print(f"\n=== Method {idx}: {name} ===")

        try:
            # 4. Load Initial File
            print(f"\nLoading initial file...")
            initial_duration, initial_stats = method(initial_df, engine)

            # Get count after initial load
            with engine.connect() as conn:
                initial_count = conn.execute(text("SELECT COUNT(*) FROM employees")).scalar()

            # 5. Load Subsequent File
            print(f"\nLoading subsequent file...")
            update_duration, update_stats = method(update_df, engine)

            # Get final count
            with engine.connect() as conn:
                final_count = conn.execute(text("SELECT COUNT(*) FROM employees")).scalar()

            results.append({
                'Method': name,
                'Initial Load Time': f"{initial_duration:.2f}s",
                'Initial Records': initial_count,
                'Initial Avg CPU': f"{initial_stats['avg_cpu']:.1f}%",
                'Initial Max CPU': f"{initial_stats['max_cpu']:.1f}%",
                'Initial Avg Memory': f"{initial_stats['avg_memory']:.1f}%",
                'Initial Max Memory': f"{initial_stats['max_memory']:.1f}%",
                'Update Load Time': f"{update_duration:.2f}s",
                'Update Records': final_count - initial_count,
                'Update Avg CPU': f"{update_stats['avg_cpu']:.1f}%",
                'Update Max CPU': f"{update_stats['max_cpu']:.1f}%",
                'Update Avg Memory': f"{update_stats['avg_memory']:.1f}%",
                'Update Max Memory': f"{update_stats['max_memory']:.1f}%",
                'Final Records': final_count,
                'Total Time': f"{(initial_duration + update_duration):.2f}s"
            })

            print(f"\nMethod {idx} Results:")
            print(f"Initial Load: {initial_duration:.2f}s ({initial_count} records)")
            print(f"Initial Load Resource Usage:")
            print(f"  Avg CPU: {initial_stats['avg_cpu']:.1f}%, Max CPU: {initial_stats['max_cpu']:.1f}%")
            print(f"  Avg Memory: {initial_stats['avg_memory']:.1f}%, Max Memory: {initial_stats['max_memory']:.1f}%")
            print(f"\nUpdate Load: {update_duration:.2f}s ({final_count - initial_count} records)")
            print(f"Update Load Resource Usage:")
            print(f"  Avg CPU: {update_stats['avg_cpu']:.1f}%, Max CPU: {update_stats['max_cpu']:.1f}%")
            print(f"  Avg Memory: {update_stats['avg_memory']:.1f}%, Max Memory: {update_stats['max_memory']:.1f}%")

            # 6. Truncate table (if not the last method)
            if idx < len(methods):
                print(f"\nTruncating table for next method...")
                truncate_table(engine)

        except Exception as e:
            print(f"Error in {name} method: {str(e)}")
            results.append({
                'Method': name,
                'Initial Load Time': 'Failed',
                'Initial Records': 'Failed',
                'Initial Avg CPU': 'Failed',
                'Initial Max CPU': 'Failed',
                'Initial Avg Memory': 'Failed',
                'Initial Max Memory': 'Failed',
                'Update Load Time': 'Failed',
                'Update Records': 'Failed',
                'Update Avg CPU': 'Failed',
                'Update Max CPU': 'Failed',
                'Update Avg Memory': 'Failed',
                'Update Max Memory': 'Failed',
                'Final Records': 'Failed',
                'Total Time': 'Failed'
            })

    # Print final results
    print("\nFinal Results:")
    print("=" * 140)
    headers = [
        'Method', 'Initial Load Time', 'Initial Records', 'Initial Avg CPU', 'Initial Max Memory',
        'Update Load Time', 'Update Records', 'Update Avg CPU', 'Update Max Memory',
        'Total Time'
    ]
    row_format = "{:<20} {:<20} {:<15} {:<15} {:<20} {:<20} {:<15} {:<15} {:<20} {:<15}"
    print(row_format.format(*headers))
    print("-" * 140)
    for result in results:
        print(row_format.format(
            result['Method'],
            result['Initial Load Time'],
            str(result['Initial Records']),
            result['Initial Avg CPU'],
            result['Initial Max Memory'],
            result['Update Load Time'],
            str(result['Update Records']),
            result['Update Avg CPU'],
            result['Update Max Memory'],
            result['Total Time']
        ))

if __name__ == "__main__":
    main()


=== 1. Setting up PostgreSQL Database ===
Setting up PostgreSQL in Google Colab...
Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:6 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
postgresql is already the newest version (14+238).
postgresql-con

  return bound(*args, **kwds)



Resource Usage Statistics:
Average CPU Usage: 0.20%
Max CPU Usage: 0.20%
Min CPU Usage: 0.20%
Average Memory Usage: 1.90%
Max Memory Usage: 1.90%
Min Memory Usage: 1.90%

Loading subsequent file...


  return bound(*args, **kwds)



Resource Usage Statistics:
Average CPU Usage: 0.60%
Max CPU Usage: 0.60%
Min CPU Usage: 0.60%
Average Memory Usage: 1.90%
Max Memory Usage: 1.90%
Min Memory Usage: 1.90%

Method 4 Results:
Initial Load: 0.17s (0 records)
Initial Load Resource Usage:
  Avg CPU: 0.2%, Max CPU: 0.2%
  Avg Memory: 1.9%, Max Memory: 1.9%

Update Load: 0.16s (0 records)
Update Load Resource Usage:
  Avg CPU: 0.6%, Max CPU: 0.6%
  Avg Memory: 1.9%, Max Memory: 1.9%

Truncating table for next method...
Table truncated successfully

=== Method 5: Dask ===

Loading initial file...
[########################################] | 100% Completed | 101.79 ms
[########################################] | 100% Completed | 101.35 ms
[########################################] | 100% Completed | 101.49 ms
[########################################] | 100% Completed | 101.47 ms

Resource Usage Statistics:
Average CPU Usage: 0.50%
Max CPU Usage: 0.50%
Min CPU Usage: 0.50%
Average Memory Usage: 1.90%
Max Memory Usage: 1.90%
Min