In [81]:
import psycopg2
from threading import Thread

 Створення таблиці та ініціалізація початковго counter

In [82]:
connect = psycopg2.connect(host='localhost',
                           user='postgres',
                           password='postgres',
                           dbname='postgres')
cursor = connect.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS task1.user_counter(
    user_id int not null primary key ,
    counter int,
    version int
);
""")
cursor.execute("""
INSERT INTO task1.user_counter (user_id, counter, version)
VALUES (%s, %s, %s) ON CONFLICT DO NOTHING """, (1, 1, 0))
connect.commit()

cursor.close()
connect.close()

Допоміжний метод для обнулення counter після кожного завдання

In [83]:
def reset_counter():
    connect = psycopg2.connect(host='localhost',
                               user='postgres',
                               password='postgres',
                               dbname='postgres')
    cursor = connect.cursor()

    cursor.execute(f"""
    update task1.user_counter set counter = 0 where user_id = 1;""")
    connect.commit()

    cursor.close()
    connect.close()

1. Lost-update

In [84]:
def increment_counter_lost(increment_value=10000):
    connect = psycopg2.connect(host='localhost',
                           user='postgres',
                           password='postgres',
                           dbname='postgres')
    cursor = connect.cursor()

    for _ in range(increment_value):
        cursor.execute(
            """SELECT counter FROM task1.user_counter WHERE user_id = 1""")

        fetched = cursor.fetchone()

        if fetched:
            counter = fetched[0] + 1

            cursor.execute(f"""
            update task1.user_counter
            set counter = {counter} where user_id = 1""")
            connect.commit()

    cursor.close()
    connect.close()

In [85]:
reset_counter()

In [86]:
%%time
threads = []
for i in range(10):
    thread = Thread(target=increment_counter_lost)
    threads.append(thread)
    thread.start()

for thr in threads:
    thr.join()

CPU times: user 3.52 s, sys: 3.48 s, total: 7 s
Wall time: 53.7 s


2. In-place update

In [87]:
def increment_counter_in_place(increment_value=10000):
    connect = psycopg2.connect(host='localhost',
                               user='postgres',
                               password='postgres',
                               dbname='postgres')
    cursor = connect.cursor()

    for _ in range(increment_value):
        cursor.execute(f"""
        update task1.user_counter
        set counter = counter + 1 where user_id = 1""")
        connect.commit()

    cursor.close()
    connect.close()

In [88]:
reset_counter()

In [89]:
%%time
threads = []
for i in range(10):
    thread = Thread(target=increment_counter_in_place)
    threads.append(thread)
    thread.start()

for thr in threads:
    thr.join()

CPU times: user 3.3 s, sys: 3.67 s, total: 6.97 s
Wall time: 55.8 s


3. Row-level locking

In [90]:
def increment_counter_row_locking(increment_value=10000):
    connect = psycopg2.connect(host='localhost',
                               user='postgres',
                               password='postgres',
                               dbname='postgres')
    cursor = connect.cursor()

    for _ in range(increment_value):
        cursor.execute("""
        SELECT counter FROM task1.user_counter WHERE user_id = 1 FOR UPDATE""")

        fetched = cursor.fetchone()

        if fetched:
            counter = fetched[0] + 1

            cursor.execute(f"""
            update task1.user_counter
            set counter = {counter} where user_id = 1""")
            connect.commit()

    cursor.close()
    connect.close()

In [91]:
reset_counter()

In [92]:
%%time
threads = []
for i in range(10):
    thread = Thread(target=increment_counter_row_locking)
    threads.append(thread)
    thread.start()

for thr in threads:
    thr.join()

CPU times: user 7.77 s, sys: 7.7 s, total: 15.5 s
Wall time: 1min 19s


4. Optimistic concurrency control

In [93]:
def increment_counter_optimistic_concurrency(increment_value=10000):
    connect = psycopg2.connect(host='localhost',
                               user='postgres',
                               password='postgres',
                               dbname='postgres')
    cursor = connect.cursor()

    for _ in range(increment_value):
        while True:
            cursor.execute("""
            SELECT counter, version FROM task1.user_counter
            WHERE user_id = 1 FOR UPDATE""")

            fetched = cursor.fetchone()

            if fetched:
                counter = fetched[0] + 1
                version = fetched[1]

                cursor.execute(f"""
                update task1.user_counter
                set counter = {counter}, version={version + 1}
                where user_id = 1 and version={version}""")
                connect.commit()

                count = cursor.rowcount
                if count > 0:
                    break

    cursor.close()
    connect.close()

In [96]:
reset_counter()

In [97]:
%%time
threads = []
for i in range(10):
    thread = Thread(target=increment_counter_optimistic_concurrency)
    threads.append(thread)
    thread.start()

for thr in threads:
    thr.join()

CPU times: user 8.31 s, sys: 7.6 s, total: 15.9 s
Wall time: 1min 21s
