In [135]:
import kagglehub
import psycopg2
import pandas as pd

import csv
from io import StringIO

from time import sleep, time
from threading import Thread
import os

In [136]:
# WORKDIR = '.'
# DATAFILE = 'amazon.csv'
# DATAPATH = os.path.join(WORKDIR, DATAFILE)

In [137]:
# if not os.path.isfile(DATAPATH):
#     # https://www.kaggle.com/datasets/karkavelrajaj/amazon-sales-dataset
#     PATH = kagglehub.dataset_download('karkavelrajaj/amazon-sales-dataset')
#     os.system(f'mv {PATH}/amazon.csv {DATAPATH}')

In [138]:
# df = pd.read_csv(DATAPATH)
# print(f'Columns of the dataset: {df.columns}')
# df = df[['product_id', 'product_text', 'rating', 'actual_price']].retext(columns= {
#     'product_id': 'origin_id',
#     'product_text': 'text',
#     'actual_price': 'price'
# })
# df['rating'] = pd.to_numeric(df['rating'], errors='coerce')
# df['price'] = df['price'].map(lambda s: float(s[1:].replace(',', '')))
# print(f'New columns of the dataset: {df.columns}')
# df.dropna(inplace=True)

In [139]:
WORKDIR = '.'
DATAFILE = 'data/cut.json'
# DATAFILE = 'data/tmp.json'
DATAPATH = os.path.join(WORKDIR, DATAFILE)

In [140]:
df = pd.read_json(DATAFILE, lines=True)
print(f'Columns of the dataset: {df.columns}')
df = df[['review_id', 'text', 'stars', 'date']].rename(columns= {
    'review_id': 'origin_id',
    'text': 'text',
    'stars': 'rating'
})
df['rating'] = pd.to_numeric(df['rating'], errors='coerce')
print(f'New columns of the dataset: {df.columns}')
df.dropna(inplace=True)

Columns of the dataset: Index(['review_id', 'user_id', 'business_id', 'stars', 'useful', 'funny',
       'cool', 'text', 'date'],
      dtype='object')
New columns of the dataset: Index(['origin_id', 'text', 'rating', 'date'], dtype='object')


In [141]:
df.head()

Unnamed: 0,origin_id,text,rating,date
0,KU_O5udG6zpxOg-VcAEodg,"If you decide to eat here, just be aware it is...",3,2018-07-07 22:09:11
1,BiTunyQ73aT9WBnpR9DZGw,I've taken a lot of spin classes over the year...,5,2012-01-03 15:28:18
2,saUsX_uimxRlCVr67Z4Jig,Family diner. Had the buffet. Eclectic assortm...,3,2014-02-05 20:30:30
3,AqPFMleE6RsU23_auESxiA,"Wow! Yummy, different, delicious. Our favo...",5,2015-01-04 00:01:03
4,Sx8TMOWLNuJBWer-0pcmoA,Cute interior and owner (?) gave us tour of up...,4,2017-01-14 20:54:15


In [142]:
df.describe()

Unnamed: 0,rating,date
count,10000.0,10000
mean,3.8543,2015-04-17 08:27:40.820000
min,1.0,2005-03-01 17:47:15
25%,3.0,2013-11-14 11:16:35.500000
50%,4.0,2015-09-09 23:20:24
75%,5.0,2017-03-27 02:25:32.500000
max,5.0,2018-10-04 18:22:35
std,1.346719,


In [143]:
def get_db_connection():
    return psycopg2.connect(
        dbname='db',
        user='postgres',
        password='admin123',
        host='localhost',
        port=5432
    )

conn = get_db_connection()
cursor = conn.cursor()

In [144]:
def commit(statement: str):
    try:
        cursor.execute(statement)
    except Exception as e:
        conn.rollback()
        print(e)
    finally:
        conn.commit()
        
def commit_copy(table, output):
    try:
        cursor.copy_expert(
            f'COPY {table} (origin_id, text, rating, date) FROM STDIN;', 
            output
        )
    except Exception as e:
        conn.rollback()
        print(e)
    finally:
        conn.commit()

In [145]:
# drop
commit('''
    DROP SCHEMA IF EXISTS lab CASCADE;
    CREATE SCHEMA lab;
    GRANT ALL ON SCHEMA lab to postgres;
''')

commit('''
    CREATE TABLE lab.data (
        id VARCHAR(22) PRIMARY KEY,
        text TEXT,
        rating FLOAT,
        date TIMESTAMP
    );
''')

In [146]:
import gc

csv_file = 'temp_data.csv'

try:
    # Save DataFrame to CSV
    df.to_csv(csv_file, index=False, encoding='utf-8', quoting=csv.QUOTE_MINIMAL)

    try:
        cursor.execute('''
            CREATE TABLE lab.tmp (
                id SERIAL PRIMARY KEY,
                origin_id VARCHAR(22),
                text TEXT,
                rating FLOAT,
                date TIMESTAMP
            );
        ''')
        with open(csv_file, 'r', encoding='utf-8') as cf:
            cursor.copy_expert('''
                COPY lab.tmp (origin_id, text, rating, date)
                FROM STDIN WITH (FORMAT CSV, HEADER TRUE)
                ''',
                cf
            )
            
        cursor.execute('''
            INSERT INTO lab.data (id, text, rating, date)
            SELECT origin_id, text, rating, date
            FROM lab.tmp
            ON CONFLICT (id) DO NOTHING;
        ''')
        
        cursor.execute('TRUNCATE TABLE lab.tmp')
        conn.commit()
        print("Data successfully loaded into lab.tmp using COPY")

    except psycopg2.Error as e:
        print(f"Database error: {e}")
        conn.rollback()

except Exception as e:
    print(f"Error: {e}")
finally:
    # Step 5: Clean up temporary CSV and memory
    if os.path.exists(csv_file):
        os.remove(csv_file)
    del df  # Clear DataFrame
    gc.collect()  # Free memory


Data successfully loaded into lab.tmp using COPY


# 1.1: Типы индексов и их использование на практике.

In [147]:
def time_filter_rating(rating: int) -> tuple[float, str]:
    s = time()

    cursor.execute('''
        EXPLAIN ANALYZE
        SELECT * FROM lab.data WHERE rating > %s
    ''', (rating,))
    plan = '\t\t' + '\n\t\t'.join(row[0] for row in cursor.fetchall())

    e = time()

    return (e - s, plan)


def time_filter_date(start, end) -> tuple[float, str]:
    s = time()

    cursor.execute('''
        EXPLAIN ANALYZE
        SELECT * FROM lab.data WHERE date BETWEEN %s AND %s
    ''', (start, end))
    plan = '\t\t' + '\n\t\t'.join(row[0] for row in cursor.fetchall())

    e = time()

    return (e - s, plan)


def time_filter_text(word: str) -> tuple[float, str]:
    s = time()

    cursor.execute(f'''
        EXPLAIN ANALYZE
        SELECT * FROM lab.data WHERE text LIKE %s
    ''', (f'*{word}*',))
    plan = '\t\t' + '\n\t\t'.join(row[0] for row in cursor.fetchall())

    e = time()

    return (e - s, plan)

## Запросы без индексов

In [148]:
(t_rating_no_index, p_rating_no_index) = time_filter_rating(2)
(t_date_no_index, p_date_no_index) = time_filter_date('2011-01-01 00:00:00', '2012-01-01 00:00:00')
(t_text_no_index, p_text_no_index) = time_filter_text('good')

print('Время запросов и план запроса без индексов:')
print(f'\tБез BTREE: {t_rating_no_index} сек, план:\n{p_rating_no_index}')
print(f'\tБез BRIN: {t_date_no_index} сек, план:\n{p_date_no_index}')
print(
    f'\tБез GIN: {t_text_no_index} сек, план:\n{p_text_no_index}')

Время запросов и план запроса без индексов:
	Без BTREE: 0.005046844482421875 сек, план:
		Seq Scan on data  (cost=0.00..1257.95 rows=14239 width=110) (actual time=0.011..2.502 rows=8158 loops=1)
		  Filter: (rating > '2'::double precision)
		  Rows Removed by Filter: 1842
		Planning Time: 0.063 ms
		Execution Time: 2.807 ms
	Без BRIN: 0.0033118724822998047 сек, план:
		Seq Scan on data  (cost=0.00..1364.74 rows=214 width=110) (actual time=0.014..2.229 rows=476 loops=1)
		  Filter: ((date >= '2011-01-01 00:00:00'::timestamp without time zone) AND (date <= '2012-01-01 00:00:00'::timestamp without time zone))
		  Rows Removed by Filter: 9524
		Planning Time: 0.064 ms
		Execution Time: 2.263 ms
	Без GIN: 0.00443577766418457 сек, план:
		Seq Scan on data  (cost=0.00..1257.95 rows=214 width=110) (actual time=3.623..3.624 rows=0 loops=1)
		  Filter: (text ~~ '*good*'::text)
		  Rows Removed by Filter: 10000
		Planning Time: 0.037 ms
		Execution Time: 3.633 ms


## Запросы с индексами

In [149]:
commit('''
    CREATE INDEX IF NOT EXISTS idx_btree_rating ON lab.data USING BTREE (rating)
''')

commit('''
    CREATE INDEX IF NOT EXISTS idx_brin_date ON lab.data USING BRIN (date)
''')

commit('''
    CREATE EXTENSION IF NOT EXISTS pg_trgm;
    ALTER EXTENSION pg_trgm SET SCHEMA lab;
    CREATE INDEX IF NOT EXISTS idx_gin_text ON lab.data USING GIN (text lab.gin_trgm_ops);
''')

In [150]:
(t_rating_index, p_rating_index) = time_filter_rating(2)
(t_date_index, p_date_index) = time_filter_date('2011-01-01 00:00:00', '2012-01-01 00:00:00')
(t_text_index, p_text_index) = time_filter_text('good')

print('Время запросов и план запроса c индексами:')
print(f'\tBTREE: {t_rating_index} сек, план:\n{p_rating_index}')
print(f'\tBRIN: {t_date_index} сек, план:\n{p_date_index}')
print(
    f'\tGIN: {t_text_index} сек, план:\n{p_text_index}')

Время запросов и план запроса c индексами:
	BTREE: 0.0041310787200927734 сек, план:
		Bitmap Heap Scan on data  (cost=66.12..831.78 rows=3333 width=110) (actual time=0.637..2.146 rows=8158 loops=1)
		  Recheck Cond: (rating > '2'::double precision)
		  Heap Blocks: exact=723
		  ->  Bitmap Index Scan on idx_btree_rating  (cost=0.00..65.28 rows=3333 width=0) (actual time=0.549..0.549 rows=8158 loops=1)
		        Index Cond: (rating > '2'::double precision)
		Planning Time: 0.190 ms
		Execution Time: 2.392 ms
	BRIN: 0.002749919891357422 сек, план:
		Seq Scan on data  (cost=0.00..874.00 rows=50 width=110) (actual time=0.012..1.837 rows=476 loops=1)
		  Filter: ((date >= '2011-01-01 00:00:00'::timestamp without time zone) AND (date <= '2012-01-01 00:00:00'::timestamp without time zone))
		  Rows Removed by Filter: 9524
		Planning Time: 0.097 ms
		Execution Time: 1.864 ms
	GIN: 0.0041027069091796875 сек, план:
		Bitmap Heap Scan on data  (cost=64.39..222.77 rows=50 width=110) (actual time=3

In [151]:
print('Разница между запросами с индексами и без:')
print(f'\tBTREE: {t_rating_index/t_rating_no_index:.02}%')
print(f'\tBRIN: {t_date_index/t_date_no_index:.02}%')
print(f'\tGIN: {t_text_index/t_text_no_index:.02}%')

Разница между запросами с индексами и без:
	BTREE: 0.82%
	BRIN: 0.83%
	GIN: 0.92%


## Вывод
Даже на таком маленьком датасете, есть прирост к скорости выполнения запросов, с его увеличением ускорение будет тоже расти

Еще интересное наблюдение, если брать слишком большой рендж для колонки, индексированной с помощью BRIN, то БД не будет использовать индекс, так как быстрее будет искать как обычно

Чтобы этого избежать, можно настроить размер страницы индекса (сделать ее больше)

# 1.2: Транзакции в PostgreSQL: виды и использование на практике

## READ COMMITTED

Демонстрация аномалии Nonrepeatable read.

In [153]:
def read_commited_session_1(id: str):
    conn = get_db_connection()
    cur = conn.cursor()

    try:
        cur.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED;")
        cur.execute("BEGIN;")
        cur.execute("SELECT id, rating FROM lab.data WHERE id = %s;", (id,))
        r1 = cur.fetchall()
        print("Session 1 - First read:", r1)

        sleep(2)

        cur.execute("SELECT id, rating FROM lab.data WHERE id = %s;", (id,))
        r2 = cur.fetchall()
        print("Session 1 - Second read:", r2)
        cur.execute("COMMIT")
        
        print("Consistent" if r1 == r2 else "Anomaly")
    except Exception as e:
        conn.rollback()
        print('Error:', e)
    finally:
        cur.close()
        conn.close()


def read_commited_session_2(id: str, rating: float):
    conn = get_db_connection()
    cur = conn.cursor()

    try:
        sleep(1)  # Чтобы Session 1 начала раньше
        cur.execute("BEGIN;")
        cur.execute("UPDATE lab.data SET rating = %s WHERE id = %s;", (rating, id))
        print(f"Session 2 - Updated {id} to {rating}")
        cur.execute("COMMIT")
    except Exception as e:
        conn.rollback()
        print('Error:', e)
    finally:
        cur.close()
        conn.close()


id = 'KU_O5udG6zpxOg-VcAEodg'
rating = 4

t1 = Thread(target=lambda: read_commited_session_1(id))
t2 = Thread(target=lambda: read_commited_session_2(id, rating))

t1.start()
t2.start()

t1.join()
t2.join()

Session 1 - First read: [('KU_O5udG6zpxOg-VcAEodg', 3.0)]
Session 2 - Updated KU_O5udG6zpxOg-VcAEodg to 4
Session 1 - Second read: [('KU_O5udG6zpxOg-VcAEodg', 4.0)]
Anomaly


# REPEATABLE READ

Уровень Repeatable read недопускает фантомные чтения.

In [None]:
def repeatable_read_session_1(threshold: float):
    conn = get_db_connection()
    cur = conn.cursor()

    try:
        cur.execute("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;")
        cur.execute("BEGIN;")
        cur.execute("SELECT id, rating FROM lab.data WHERE rating  < %s;", (threshold,))
        r1 = cur.fetchall()
        print("Session 1 - First read:", r1)

        sleep(1)

        cur.execute("SELECT id, rating FROM lab.data WHERE rating < %s;", (threshold,))
        r2 = cur.fetchall()
        print("Session 1 - Second read:", r2)
        cur.execute("COMMIT")

        # Если количество строк или значения отличаются — это Phantom Read
        print("Consistent" if r1 == r2 else "Phantom Read Detected")
    except Exception as e:
        conn.rollback()
        print('Error:', e)
    finally:
        cur.close()
        conn.close()


def repeatable_read_session_2(id: str, rating: float):
    conn = get_db_connection()
    cur = conn.cursor()

    try:
        sleep(1)  # Session 1 должна начаться первой
        cur.execute("BEGIN;")
        cur.execute("INSERT INTO lab.data (id, rating) \
                     VALUES (%s, %s);", 
                     (id, rating))
        print("Session 2 - Inserted phantom row")
        cur.execute("COMMIT")
    except Exception as e:
        conn.rollback()
        print('Error:', e)
    finally:
        cur.close()
        conn.close()

id = 'ABC'
rating = 1
threshold = 2

# Запуск сценария
t1 = Thread(target=lambda: repeatable_read_session_1(threshold))
t2 = Thread(target=lambda: repeatable_read_session_2(id, rating))

t1.start()
t2.start()

t1.join()
t2.join()

Session 1 - First read: [('JrIxlS1TzJ-iCu79ul40cQ', 1.0), ('TcCcHzc3L6Aboq3DteEfZA', 1.0), ('qdzNocGBnh8U-cvE_N8qbA', 1.0), ('PDHRlnEdkEcwATry4w71PQ', 1.0), ('5obXxR0b94b5q6j1zYCAzw', 1.0), ('meGaFP7yxQdjyABrYDVeoQ', 1.0), ('PPgbLBvi34A6m7bKJfTwhw', 1.0), ('pl5AjpEcFxFTltkBvHjsRA', 1.0), ('2zN9R1C5RTZ9oxuQ1uhP6Q', 1.0), ('xS4qtB70SM-q7kX7smKlRg', 1.0), ('O2QGOa66t5BCYDvxLiOtbA', 1.0), ('e4H9r1BSvBGAVQ-HvelnhA', 1.0), ('zM7u3iffMjniyvQAaN9Tnw', 1.0), ('bi6GaeWDGceGv62lXTIKQA', 1.0), ('_-D9UYciNNTIe6amQAsJUg', 1.0), ('a5JHzBrWxRd_OmIvV7znDA', 1.0), ('lt-tsizQin_wNCkQv71YPQ', 1.0), ('Zb_27vX8weaYyDn-_2ZhVA', 1.0), ('bVgpM_sA9AMAlL2R5TPNAQ', 1.0), ('9YtDeiEdfrnRj1ykyp7Utw', 1.0), ('B_O-c_YGiNRXFk3yGzYhyQ', 1.0), ('RC8D-iY19aZFHXUHaS8xLg', 1.0), ('ydXJhVuaE6r7kN1niSqA1w', 1.0), ('22n_UbOS_iqKKR3IrYhNGw', 1.0), ('NUqNm6gqj-iOWo30r_iFrA', 1.0), ('INSGnYhsLHPc7v0VjuNxzw', 1.0), ('nCbPmH8TTnk42YvWCthLLg', 1.0), ('NU4_03KfndoUcE8mDNIErw', 1.0), ('Q7sc8JXBOfA0gqKObNWsnA', 1.0), ('qeSxL-POvGLZD6aQ

Пример аномалии write skew.

Пусть у нас есть инвариант: на заданной выборке средний рейтинг товаров должен быть не меньше 3, и пусть будет две транзакции, в которых проверяется это условие и обновляются данные. 
При конкуренции таких транзакций будет нарушение инварианта.

In [156]:
# Session 1: Lowers rating of id1
def repeatable_read_session_1_write_skew(id1: str, id2: str):
    conn = get_db_connection()
    cur = conn.cursor()
    try:
        cur.execute("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;")
        cur.execute("BEGIN;")
        cur.execute("SELECT AVG(rating) FROM lab.data WHERE id IN (%s, %s);", (id1, id2))
        avg_rating = cur.fetchone()[0]
        print("Session 1 - Read:", avg_rating)
        
        sleep(1)

        # avg_rating = sum(rating for _, rating in items) / len(items)
        print(f"Session 1 - Average rating: {avg_rating}")
        if avg_rating >= 3.0:  # Simplified check for demo
            print(f"Session 1 - Lowering rating of {id1} to 2.0")
            sleep(1)  # Simulate delay
            cur.execute("UPDATE lab.data SET rating = 2.0 WHERE id = %s;", (id1,))

        conn.commit()
    except Exception as e:
        conn.rollback()
        print("Session 1 - Rolled back:", e)
    finally:
        cur.close()
        conn.close()

# Session 2: Lowers rating of id2
def repeatable_read_session_2_write_skew(id1: str, id2: str):
    conn = get_db_connection()
    cur = conn.cursor()
    try:
        sleep(1)
        cur.execute("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;")
        cur.execute("BEGIN;")
        cur.execute("SELECT AVG(rating) FROM lab.data WHERE id IN (%s, %s);", (id1, id2))
        avg_rating = cur.fetchone()[0]
        print("Session 1 - Read:", avg_rating)

        # avg_rating = sum(rating for _, rating in items) / len(items)
        print(f"Session 2 - Average rating: {avg_rating}")
        if avg_rating >= 3.0:  # Simplified check for demo
            print(f"Session 2 - Lowering rating of {id2} to 2.0")
            sleep(2)  # Simulate delay
            cur.execute("UPDATE lab.data SET rating = 2.0 WHERE id = %s;", (id2,))

        conn.commit()
    except Exception as e:
        conn.rollback()
        print("Session 2 - Rolled back:", e)
    finally:
        cur.close()
        conn.close()

# Check final state (same as above)
def check_result(id1: str, id2: str):
    conn = get_db_connection()
    cur = conn.cursor()
    try:
        cur.execute("SELECT id, rating FROM lab.data WHERE id IN (%s, %s);", (id1, id2))
        result = cur.fetchall()
        avg_rating = sum(rating for _, rating in result) / len(result)
        print(f"\nFinal Result: {result}, Average rating: {avg_rating}")
    finally:
        cur.close()
        conn.close()

commit('''
       INSERT INTO lab.data (id, rating) 
       VALUES
            ('1', 4.0),
            ('2', 4.0);
''')

# Run the example
id1 = '1'
id2 = '2'

print("\n=== Starting concurrent transactions (Serializable) ===")
t1 = Thread(target=lambda: repeatable_read_session_1_write_skew(id1, id2))
t2 = Thread(target=lambda: repeatable_read_session_2_write_skew(id1, id2))

t1.start()
t2.start()

t1.join()
t2.join()

print("\n=== Checking final state ===")
check_result(id1, id2)


=== Starting concurrent transactions (Serializable) ===
Session 1 - Read: 4.0
Session 1 - Read: 4.0
Session 2 - Average rating: 4.0
Session 2 - Lowering rating of 2 to 2.0
Session 1 - Average rating: 4.0
Session 1 - Lowering rating of 1 to 2.0

=== Checking final state ===

Final Result: [('1', 2.0), ('2', 2.0)], Average rating: 2.0


# SERIALIZABLE

Тут все должно быть хорошо.

In [157]:
# Session 1: Lowers rating of id1
def serializable_session_1_write_skew(id1: str, id2: str):
    conn = get_db_connection()
    cur = conn.cursor()
    try:
        cur.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;")
        cur.execute("BEGIN;")
        cur.execute("SELECT AVG(rating) FROM lab.data WHERE id IN (%s, %s);", (id1, id2))
        avg_rating = cur.fetchone()[0]
        print("Session 1 - Read:", avg_rating)

        sleep(1)

        # avg_rating = sum(rating for _, rating in items) / len(items)
        print(f"Session 1 - Average rating: {avg_rating}")
        if avg_rating >= 3.0:  # Simplified check for demo
            print(f"Session 1 - Lowering rating of {id1} to 2.0")
            sleep(1)  # Simulate delay
            cur.execute("UPDATE lab.data SET rating = 2.0 WHERE id = %s;", (id1,))

        conn.commit()
    except Exception as e:
        conn.rollback()
        print("Session 1 - Rolled back:", e)
    finally:
        cur.close()
        conn.close()

# Session 2: Lowers rating of id2
def serializable_session_2_write_skew(id1: str, id2: str):
    conn = get_db_connection()
    cur = conn.cursor()
    try:
        sleep(1)
        cur.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;")
        cur.execute("BEGIN;")
        cur.execute("SELECT AVG(rating) FROM lab.data WHERE id IN (%s, %s);", (id1, id2))
        avg_rating = cur.fetchone()[0]
        print("Session 1 - Read:", avg_rating)

        # avg_rating = sum(rating for _, rating in items) / len(items)
        print(f"Session 2 - Average rating: {avg_rating}")
        if avg_rating >= 3.0:  # Simplified check for demo
            print(f"Session 2 - Lowering rating of {id2} to 2.0")
            sleep(2)  # Simulate delay
            cur.execute("UPDATE lab.data SET rating = 2.0 WHERE id = %s;", (id2,))

        conn.commit()
    except Exception as e:
        conn.rollback()
        print("Session 2 - Rolled back:", e)
    finally:
        cur.close()
        conn.close()

# Check final state (same as above)
def check_result(id1: str, id2: str):
    conn = get_db_connection()
    cur = conn.cursor()
    try:
        cur.execute("SELECT id, rating FROM lab.data WHERE id IN (%s, %s);", (id1, id2))
        result = cur.fetchall()
        avg_rating = sum(rating for _, rating in result) / len(result)
        print(f"\nFinal Result: {result}, Average rating: {avg_rating}")
    finally:
        cur.close()
        conn.close()

commit('''
       INSERT INTO lab.data (id, rating) 
       VALUES
            ('3', 4.0),
            ('4', 4.0);
''')

# Run the example
id1 = '3'
id2 = '4'

print("\n=== Starting concurrent transactions (Serializable) ===")
t1 = Thread(target=lambda: serializable_session_1_write_skew(id1, id2))
t2 = Thread(target=lambda: serializable_session_2_write_skew(id1, id2))

t1.start()
t2.start()

t1.join()
t2.join()

print("\n=== Checking final state ===")
check_result(id1, id2)


=== Starting concurrent transactions (Serializable) ===
Session 1 - Read: 4.0
Session 1 - Average rating: 4.0
Session 1 - Lowering rating of 3 to 2.0
Session 1 - Read: 4.0
Session 2 - Average rating: 4.0
Session 2 - Lowering rating of 4 to 2.0
Session 2 - Rolled back: could not serialize access due to read/write dependencies among transactions
DETAIL:  Reason code: Canceled on identification as a pivot, during write.
HINT:  The transaction might succeed if retried.


=== Checking final state ===

Final Result: [('4', 4.0), ('3', 2.0)], Average rating: 3.0


# 1.3: Использование расширений PostgreSQL для полнотекстового поиска и криптографических операций

## `pg_trgm`

`pg_trgm` -- расширение postgresql, позволяющее искать в тексте эффективно, строя три-граммы

n-граммы -- последовательности из n символов *(3 буквоцифры в случае `pg_trgm`)* в определенном порядке

Это расширение привносит в БД большой функционал работы с текстом, в том числе индексирование текста при помощи `GIN`

## `pg_bigm`

`pg_bigm` -- брат `pg_trgm`, которые считает би-граммы, суть та же, но немного отличается моментами. Например в запросах с ключевыми словами длиной 1-2 символа он быстрее, однако `pg_trgm` может искать с опечатками, а `pg_bigm` только по точным частичным совпадениям

In [158]:
(t1, p) = time_filter_text('good')
print(f'Результаты pg_trgm:\n\t{t1} сек, план:\n{p}')

cursor.execute('''
    DROP INDEX IF EXISTS idx_gin_text
''')
cursor.execute('''
    CREATE EXTENSION IF NOT EXISTS pg_bigm;
    ALTER EXTENSION pg_bigm SET SCHEMA lab;
''')
cursor.execute('''
    CREATE INDEX IF NOT EXISTS idx_bigm_text
    ON lab.data USING GIN (text lab.gin_bigm_ops)
''')
conn.commit()

(t2, p) = time_filter_text('good')
print(f'Результаты pg_bigm:\n\t{t2} сек, план:\n{p}')

print(f'\nРазница -- {t2/t1:.03}')

Результаты pg_trgm:
	0.006681680679321289 сек, план:
		Bitmap Heap Scan on data  (cost=68.01..72.02 rows=1 width=548) (actual time=4.441..4.442 rows=0 loops=1)
		  Recheck Cond: (text ~~ '*good*'::text)
		  Rows Removed by Index Recheck: 3511
		  Heap Blocks: exact=723
		  ->  Bitmap Index Scan on idx_gin_text  (cost=0.00..68.01 rows=1 width=0) (actual time=0.867..0.867 rows=3512 loops=1)
		        Index Cond: (text ~~ '*good*'::text)
		Planning Time: 0.207 ms
		Execution Time: 4.469 ms
Результаты pg_bigm:
	0.0016529560089111328 сек, план:
		Bitmap Heap Scan on data  (cost=60.01..64.02 rows=1 width=548) (actual time=0.182..0.182 rows=0 loops=1)
		  Recheck Cond: (text ~~ '*good*'::text)
		  ->  Bitmap Index Scan on idx_bigm_text  (cost=0.00..60.01 rows=1 width=0) (actual time=0.180..0.180 rows=0 loops=1)
		        Index Cond: (text ~~ '*good*'::text)
		Planning Time: 0.183 ms
		Execution Time: 0.218 ms

Разница -- 0.247


## `pgcrypto`

`pgcrypto` -- расширение, вносящее в БД криптографические средства

В моем курсовом проекте можно использовать это расширение для хеширования паролей аккаунтов

In [None]:
cursor.execute('CREATE EXTENSION IF NOT EXISTS pgcrypto')
cursor.execute('''
    DROP TABLE IF EXISTS users;
    CREATE TABLE IF NOT EXISTS users (
        id SERIAL PRIMARY KEY,
        username VARCHAR(50) UNIQUE NOT NULL,
        password_hash TEXT NOT NULL
    )
''')

cursor.execute('''
    INSERT INTO users (username, password_hash) 
    VALUES (%s, crypt(%s, gen_salt('bf')))
''', ('admin', 'qwerty123'))
conn.commit()

SyntaxError: syntax error at or near "NOT"
LINE 2:     DROP TABLE IF NOT EXISTS users;
                          ^


In [161]:
conn.rollback();