# Advanced Databases 2025/2026 
### Prof. Márcia Barros and Prof. Francisco Couto
TP5 - Week6 
 Testing concorrency with Python: mySQL


# Part 1 - Creating the databases

### Create the database in mySQL

In [None]:
""" 
Create the same database in mySQL
"""

# pip install sqlalchemy (if it's not installed)
# pip install mysql-connector-python (if it's not installed)

import pandas as pd
from sqlalchemy import create_engine, text
import numpy as np

# Load CSV
df = pd.read_csv("dias_catalogue.csv")
df = df.replace('', None)
df.replace([np.inf, -np.inf], np.nan, inplace=True)

# Create a connection to MySQL
# Replace user, password, host, port, database_name with your info

engine = create_engine("mysql+mysqlconnector://root:1234@localhost:3306/")

# Execute raw SQL to create database
with engine.connect() as conn:
    conn.execute(text("CREATE DATABASE IF NOT EXISTS openclusters"))
    conn.commit()
engine = create_engine("mysql+mysqlconnector://root:1234@localhost:3306/openclusters")

# If the table does not exist, it will be created automatically
df.to_sql(name='clusters', con=engine, if_exists='replace', index=False)


# Part 2 - Testing concorrency

In [None]:
import mysql.connector
from mysql.connector import errors

def get_conn(user="root", password="1234", host="localhost", port=3306, database="openclusters", autocommit=False):
    return mysql.connector.connect(
        user=user,
        password=password,
        host=host,
        port=port,
        database=database,
        autocommit=autocommit
    )

def read_value(conn, pk_col, pk_val, value_col, table="clusters"):
    cur = conn.cursor()
    cur.execute(f"SELECT {value_col} FROM {table} WHERE {pk_col} = %s", (pk_val,))
    r = cur.fetchone()
    cur.close()
    return r[0] if r else None

### Test 1 - Concurrent reads (many clients performing SELECT)
This is the simplest concurrency test (read-only). It measures throughput for concurrent SELECT queries.

In [None]:

import time
from concurrent.futures import ThreadPoolExecutor
#from db_helpers import get_conn

def do_select(name_value):
    conn = get_conn()
    cur = conn.cursor()
    cur.execute("SELECT name, r50 FROM clusters WHERE name = %s", (name_value,))
    r = cur.fetchone()
    cur.close()
    conn.close()
    return r

def concurrent_reads(names, workers=20):
    t0 = time.time()
    with ThreadPoolExecutor(max_workers=workers) as ex:
        futures = [ex.submit(do_select, n) for n in names]
        results = [f.result() for f in futures]
    t1 = time.time()
    print(f"{len(names)} selects with {workers} workers took {t1-t0:.3f}s")
    return results


# prepare a list: pick one name many times or varied names
import pandas as pd
df = pd.read_csv("dias_catalogue.csv")
sample_names = df['name'].dropna().head(100).tolist()  # 100 different names
# test with 200 concurrent requests (reusing names)
names = sample_names * 2
res = concurrent_reads(names, workers=50)
print("Sample result:", res[0])


## Test 2 - Lost update (two transactions updating same row without locking)

This demonstrates the classic lost-update problem when two transactions read, compute, write without synchronization.

We’ll pick a row, read a numeric column (e.g. r50), and have two threads attempt to increment it in their own transaction using simple UPDATE clusters SET r50 = r50 + 1 WHERE name = %s or the unsafe pattern read-modify-write:

Unsafe pattern (read -> compute -> write) — this is where lost updates happen when isolation level is READ COMMITTED or REPEATABLE READ if not doing locking.

In [None]:

import time
from concurrent.futures import ThreadPoolExecutor


def unsafe_increment(name_value, delay_before_write=0.5):
    conn = get_conn()
    cur = conn.cursor()
    try:
        conn.start_transaction()  # explicit transaction
        # 1) read
        cur.execute("SELECT r50 FROM clusters WHERE name = %s FOR SHARE", (name_value,))
        row = cur.fetchone()
        if not row:
            conn.rollback(); return "no-row"
        current = row[0]
        # simulate compute time
        import time as _t
        _t.sleep(delay_before_write)
        new = current + 1
        # 2) write (without re-check)
        cur.execute("UPDATE clusters SET r50 = %s WHERE name = %s", (new, name_value))
        conn.commit()
        return ("ok", current, new)
    except Exception as e:
        conn.rollback()
        return ("err", str(e))
    finally:
        cur.close()
        conn.close()

def run_lost_update_test(name_value):
    # run two concurrent unsafe increments
    with ThreadPoolExecutor(max_workers=2) as ex:
        f1 = ex.submit(unsafe_increment, name_value, 1.0)
        f2 = ex.submit(unsafe_increment, name_value, 0.2)
        r1 = f1.result()
        r2 = f2.result()
    print("Result1:", r1)
    print("Result2:", r2)
    # read final value
    conn = get_conn()
    cur = conn.cursor()
    cur.execute("SELECT r50 FROM clusters WHERE name = %s", (name_value,))
    final = cur.fetchone()[0]
    cur.close(); conn.close()
    print("Final r50:", final)


run_lost_update_test("ASCC_105")


## Test 3 - Preventing lost-update with pessimistic locking (SELECT ... FOR UPDATE)

Use SELECT ... FOR UPDATE inside a transaction to lock the selected row (InnoDB row-level lock), so concurrent update transactions serialize access.

In [None]:

from concurrent.futures import ThreadPoolExecutor
import time

def increment_with_for_update(name_value, delay=0.5):
    conn = get_conn()
    cur = conn.cursor()
    try:
        conn.start_transaction()  # explicit
        cur.execute("SELECT r50 FROM clusters WHERE name = %s FOR UPDATE", (name_value,))
        row = cur.fetchone()
        if not row:
            conn.rollback(); return "no-row"
        current = row[0]
        time.sleep(delay)
        new = current + 1
        cur.execute("UPDATE clusters SET r50 = %s WHERE name = %s", (new, name_value))
        conn.commit()
        return ("ok", current, new)
    except Exception as e:
        conn.rollback()
        return ("err", str(e))
    finally:
        cur.close()
        conn.close()

def run_for_update_test(name_value):
    with ThreadPoolExecutor(max_workers=2) as ex:
        f1 = ex.submit(increment_with_for_update, name_value, 1.0)
        f2 = ex.submit(increment_with_for_update, name_value, 0.2)
        r1 = f1.result()
        r2 = f2.result()
    print("r1:", r1)
    print("r2:", r2)
    conn = get_conn()
    cur = conn.cursor()
    cur.execute("SELECT r50 FROM clusters WHERE name = %s", (name_value,))
    final = cur.fetchone()[0]
    cur.close(); conn.close()
    print("Final r50:", final)

run_for_update_test('ASCC_105')


## Test 4 - Transaction isolation level effects (e.g., REPEATABLE READ vs READ COMMITTED)
You can change the session isolation level and observe behaviors like non-repeatable reads or phantom reads. MySQL default (InnoDB) is REPEATABLE READ.

Example of testing non-repeatable reads: transaction A reads a value, transaction B updates it and commits, transaction A reads again — in some isolation levels A will see the update.

In [None]:

import threading
import time

def txn_a(name):
    conn = get_conn()
    cur = conn.cursor()
    # set session isolation if desired
    cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")
    conn.start_transaction()
    cur.execute("SELECT r50 FROM clusters WHERE name = %s", (name,))
    v1 = cur.fetchone()[0]
    print("[A] first read:", v1)
    time.sleep(2.0)  # let B run and commit
    cur.execute("SELECT r50 FROM clusters WHERE name = %s", (name,))
    v2 = cur.fetchone()[0]
    print("[A] second read:", v2)
    conn.commit()
    cur.close(); conn.close()

def txn_b(name):
    time.sleep(0.5)  # start after A's first read
    conn = get_conn()
    cur = conn.cursor()
    conn.start_transaction()
    cur.execute("UPDATE clusters SET r50 = r50 + 10 WHERE name = %s", (name,))
    conn.commit()
    print("[B] updated and committed")
    cur.close(); conn.close()

name = "ASCC_105"
t1 = threading.Thread(target=txn_a, args=(name,))
t2 = threading.Thread(target=txn_b, args=(name,))
t1.start(); t2.start()
t1.join(); t2.join()


## Exercise 1 -  Basic Transaction Control
Goal: Observe isolation level behavior when one session updates a cluster’s radius but does not commit.

In [None]:
# Session A
cur.execute("START TRANSACTION;")
cur.execute("UPDATE clusters SET r50 = r50 + 5 WHERE id = 1;")
print("Session A: Updated r50, not committed yet.")

# Session B
cur.execute("SELECT r50 FROM clusters WHERE id = 1;")


Question: Does Session B see the uncommitted change?

## Exercise 2 – Dirty Read

Goal: Demonstrate dirty reads under READ UNCOMMITTED.

In [None]:
# Session A
cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;")
cur.execute("START TRANSACTION;")
cur.execute("UPDATE clusters SET Vr = Vr + 10 WHERE id = 2;")

# Session B
cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;")
cur.execute("SELECT Vr FROM clusters WHERE id = 2;")


Question: Does Session B see Session A’s uncommitted value?

## Exercise 3 – Non-Repeatable Read

Goal: Show data changes between two reads under READ COMMITTED

In [None]:
# Session A
cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;")
cur.execute("START TRANSACTION;")
cur.execute("SELECT FeH FROM clusters WHERE id = 3;")

# Session B
cur.execute("UPDATE clusters SET FeH = FeH + 0.1 WHERE id = 3; COMMIT;")

# Session A (again)
cur.execute("SELECT FeH FROM clusters WHERE id = 3;")


Question: Did the FeH value change between reads?

## Exercise 4 – Phantom Read
Goal: Show phantom rows appearing under REPEATABLE READ.

In [None]:
# Session A
cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;")
cur.execute("START TRANSACTION;")
cur.execute("SELECT COUNT(*) FROM clusters WHERE Diam_pc > 10;")

# Session B
cur.execute("INSERT INTO clusters (name, r50, Vr, age, FeH, Diam_pc) VALUES ('NewCluster', 5.0, 0.0, 2.5, -0.1, 12.0); COMMIT;")

# Session A (again)
cur.execute("SELECT COUNT(*) FROM clusters WHERE Diam_pc > 10;")


Question: Did the count change in Session A?

## Exercise 5 – Serializable Locking
Goal: Show how SERIALIZABLE isolation blocks concurrent inserts.

In [None]:
# Session A
cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE;")
cur.execute("START TRANSACTION;")
cur.execute("SELECT * FROM clusters WHERE FeH < 0;")

# Session B
cur.execute("INSERT INTO clusters (name, r50, Vr, age, FeH, Diam_pc) VALUES ('TempCluster', 4.0, 0.0, 1.2, -0.3, 7.5);")


Question: Is Session B blocked until Session A commits?