In [None]:
import psycopg2
import time
from concurrent.futures import ThreadPoolExecutor

# Налаштування підключення до БД
DB_CONFIG = {
    'host': 'localhost',
    'database': 'testdb',
    'user': 'postgres',
    'password': 'postgres'
}

NUM_THREADS = 10
ITERATIONS_PER_THREAD = 10000
EXPECTED_RESULT = NUM_THREADS * ITERATIONS_PER_THREAD

def create_database():
    """Створення бази даних якщо вона не існує"""
    # Підключаємось до системної бази postgres
    conn = psycopg2.connect(
        host=DB_CONFIG['host'],
        database='postgres',
        user=DB_CONFIG['user'],
        password=DB_CONFIG['password']
    )
    conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
    cursor = conn.cursor()
    
    # Перевіряємо чи існує база даних
    cursor.execute(
        "SELECT 1 FROM pg_database WHERE datname = %s",
        (DB_CONFIG['database'],)
    )
    exists = cursor.fetchone()
    
    if not exists:
        cursor.execute(f"CREATE DATABASE {DB_CONFIG['database']}")
        print(f"База даних '{DB_CONFIG['database']}' створена")
    else:
        print(f"База даних '{DB_CONFIG['database']}' вже існує")
    
    cursor.close()
    conn.close()

def create_table():
    """Створення таблиці user_counter"""
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    
    cursor.execute("DROP TABLE IF EXISTS user_counter")
    cursor.execute("""
        CREATE TABLE user_counter (
            user_id INTEGER PRIMARY KEY,
            counter INTEGER NOT NULL DEFAULT 0,
            version INTEGER NOT NULL DEFAULT 0
        )
    """)
    cursor.execute("INSERT INTO user_counter (user_id, counter, version) VALUES (1, 0, 0)")
    conn.commit()
    cursor.close()
    conn.close()
    print("Таблицю user_counter створено та ініціалізовано")

def reset_counter():
    """Скидання каунтера до 0"""
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    cursor.execute("UPDATE user_counter SET counter = 0, version = 0 WHERE user_id = 1")
    conn.commit()
    cursor.close()
    conn.close()

def get_counter_value():
    """Отримання поточного значення каунтера"""
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    cursor.execute("SELECT counter FROM user_counter WHERE user_id = 1")
    result = cursor.fetchone()[0]
    cursor.close()
    conn.close()
    return result

# Варіант 1: Lost-update (реалізація що втрачатиме значення)
def lost_update_worker(thread_id):
    """Робочий потік для lost-update підходу"""
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    
    for i in range(ITERATIONS_PER_THREAD):
        cursor.execute("SELECT counter FROM user_counter WHERE user_id = 1")
        counter = cursor.fetchone()[0]
        counter = counter + 1
        cursor.execute("UPDATE user_counter SET counter = %s WHERE user_id = %s", (counter, 1))
        conn.commit()
    
    cursor.close()
    conn.close()

def test_lost_update():
    print("\n" + "="*70)
    print("ВАРІАНТ 1: Lost-update (з втратою значень)")
    print("="*70)
    
    reset_counter()
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
        futures = [executor.submit(lost_update_worker, i) for i in range(NUM_THREADS)]
        for future in futures:
            future.result()
    
    elapsed_time = time.time() - start_time
    final_value = get_counter_value()
    
    print(f"Час виконання: {elapsed_time:.2f} секунд")
    print(f"Очікуване значення: {EXPECTED_RESULT}")
    print(f"Фактичне значення: {final_value}")
    print(f"Втрачено: {EXPECTED_RESULT - final_value}")
    print(f"Успішність: {(final_value / EXPECTED_RESULT * 100):.2f}%")

# Варіант 2: Serializable update
def serializable_update_worker(thread_id):
    """Робочий потік для serializable підходу"""
    conn = psycopg2.connect(**DB_CONFIG)
    conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
    cursor = conn.cursor()
    
    for i in range(ITERATIONS_PER_THREAD):
        while True:
            try:
                cursor.execute("SELECT counter FROM user_counter WHERE user_id = 1")
                counter = cursor.fetchone()[0]
                counter = counter + 1
                cursor.execute("UPDATE user_counter SET counter = %s WHERE user_id = %s", (counter, 1))
                conn.commit()
                break  # Успішно виконано
            except psycopg2.extensions.TransactionRollbackError:
                # Конфлікт серіалізації - повторюємо транзакцію
                conn.rollback()
                continue
    
    cursor.close()
    conn.close()

def test_serializable_update():
    """Тест варіанту 2: Serializable update"""
    print("\n" + "="*70)
    print("ВАРІАНТ 2: Serializable update (з retry логікою)")
    print("="*70)
    
    reset_counter()
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
        futures = [executor.submit(serializable_update_worker, i) for i in range(NUM_THREADS)]
        for future in futures:
            future.result()
    
    elapsed_time = time.time() - start_time
    final_value = get_counter_value()
    
    print(f"Час виконання: {elapsed_time:.2f} секунд")
    print(f"Очікуване значення: {EXPECTED_RESULT}")
    print(f"Фактичне значення: {final_value}")
    print(f"Коректність: {'ПРАВИЛЬНО' if final_value == EXPECTED_RESULT else 'ПОМИЛКА'}")

# Варіант 3: In-place update
def in_place_update_worker(thread_id):
    """Робочий потік для in-place update підходу"""
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    
    for i in range(ITERATIONS_PER_THREAD):
        cursor.execute("UPDATE user_counter SET counter = counter + 1 WHERE user_id = %s", (1,))
        conn.commit()
    
    cursor.close()
    conn.close()

def test_in_place_update():
    print("\n" + "="*70)
    print("ВАРІАНТ 3: In-place update")
    print("="*70)
    
    reset_counter()
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
        futures = [executor.submit(in_place_update_worker, i) for i in range(NUM_THREADS)]
        for future in futures:
            future.result()
    
    elapsed_time = time.time() - start_time
    final_value = get_counter_value()
    
    print(f"Час виконання: {elapsed_time:.2f} секунд")
    print(f"Очікуване значення: {EXPECTED_RESULT}")
    print(f"Фактичне значення: {final_value}")
    print(f"Коректність: {'ПРАВИЛЬНО' if final_value == EXPECTED_RESULT else 'ПОМИЛКА'}")

# Варіант 4: Row-level locking
def row_level_locking_worker(thread_id):
    """Робочий потік для row-level locking підходу"""
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    
    for i in range(ITERATIONS_PER_THREAD):
        cursor.execute("SELECT counter FROM user_counter WHERE user_id = 1 FOR UPDATE")
        counter = cursor.fetchone()[0]
        counter = counter + 1
        cursor.execute("UPDATE user_counter SET counter = %s WHERE user_id = %s", (counter, 1))
        conn.commit()
    
    cursor.close()
    conn.close()

def test_row_level_locking():
    print("\n" + "="*70)
    print("ВАРІАНТ 4: Row-level locking (SELECT ... FOR UPDATE)")
    print("="*70)
    
    reset_counter()
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
        futures = [executor.submit(row_level_locking_worker, i) for i in range(NUM_THREADS)]
        for future in futures:
            future.result()
    
    elapsed_time = time.time() - start_time
    final_value = get_counter_value()
    
    print(f"Час виконання: {elapsed_time:.2f} секунд")
    print(f"Очікуване значення: {EXPECTED_RESULT}")
    print(f"Фактичне значення: {final_value}")
    print(f"Коректність: {'ПРАВИЛЬНО' if final_value == EXPECTED_RESULT else 'ПОМИЛКА'}")

# Варіант 5: Optimistic concurrency control
def optimistic_concurrency_worker(thread_id):
    """Робочий потік для optimistic concurrency підходу"""
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    
    for i in range(ITERATIONS_PER_THREAD):
        while True:
            cursor.execute("SELECT counter, version FROM user_counter WHERE user_id = 1")
            counter, version = cursor.fetchone()
            
            counter = counter + 1
            cursor.execute(
                "UPDATE user_counter SET counter = %s, version = %s WHERE user_id = %s AND version = %s",
                (counter, version + 1, 1, version)
            )
            conn.commit()
            
            if cursor.rowcount > 0:
                break  # Успішно оновлено
            # Інакше повторюємо спробу
    
    cursor.close()
    conn.close()

def test_optimistic_concurrency():
    print("\n" + "="*70)
    print("ВАРІАНТ 5: Optimistic concurrency control")
    print("="*70)
    
    reset_counter()
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
        futures = [executor.submit(optimistic_concurrency_worker, i) for i in range(NUM_THREADS)]
        for future in futures:
            future.result()
    
    elapsed_time = time.time() - start_time
    final_value = get_counter_value()
    
    print(f"Час виконання: {elapsed_time:.2f} секунд")
    print(f"Очікуване значення: {EXPECTED_RESULT}")
    print(f"Фактичне значення: {final_value}")
    print(f"Коректність: {'ПРАВИЛЬНО' if final_value == EXPECTED_RESULT else 'ПОМИЛКА'}")

def main():
    print(f"Кількість потоків: {NUM_THREADS}")
    print(f"Ітерацій на потік: {ITERATIONS_PER_THREAD}")
    print(f"Очікуване фінальне значення: {EXPECTED_RESULT}")
    print("="*70)
    
    try:
        create_database()
        create_table()
    except Exception as e:
        print(f"Помилка ініціалізації: {e}")
        return
    
    test_lost_update()
    test_serializable_update()
    test_in_place_update()
    test_row_level_locking()
    test_optimistic_concurrency()



In [None]:
main()

Кількість потоків: 10
Ітерацій на потік: 10000
Очікуване фінальне значення: 100000
База даних 'testdb' створена
Таблицю user_counter створено та ініціалізовано

ВАРІАНТ 1: Lost-update (з втратою значень)
Час виконання: 307.78 секунд
Очікуване значення: 100000
Фактичне значення: 10047
Втрачено: 89953
Успішність: 10.05%

ВАРІАНТ 2: Serializable update (з retry логікою)
Час виконання: 715.85 секунд
Очікуване значення: 100000
Фактичне значення: 100000
Коректність: ПРАВИЛЬНО

ВАРІАНТ 3: In-place update
Час виконання: 364.40 секунд
Очікуване значення: 100000
Фактичне значення: 100000
Коректність: ПРАВИЛЬНО

ВАРІАНТ 4: Row-level locking (SELECT ... FOR UPDATE)
Час виконання: 429.62 секунд
Очікуване значення: 100000
Фактичне значення: 100000
Коректність: ПРАВИЛЬНО

ВАРІАНТ 5: Optimistic concurrency control
Час виконання: 2953.83 секунд
Очікуване значення: 100000
Фактичне значення: 100000
Коректність: ПРАВИЛЬНО
