## PostgreSQL

#### Замеры времени вставки данных в БД PostgreSQL

Справочно: мои значения при вставке 1_000_000 записей на сервер Postgres, расположенный в локальной 1G сети:

**1. psycopg2**

- 1.1. Cursor.execute: в цикле при вставке из генератора: 246.3413 секунд
- 1.2. Cursor.executemany: при вставке из генератора: 240.3837 секунд
- 1.3. extras.execute_values при вставке из генератора: 14.0147 секунд
- 2.1. Cursor.copy_from при вставке из CSV-файла: 1.5685 секунд

**2. sqlalchemy + pandas DataFrame.to_sql**

- 2.1. pandas DataFrame.to_sql при вставке из DataFrame: 40.1986 секунд

In [None]:
import time
import csv

import psycopg2
import psycopg2.extras
import sqlalchemy
import pandas as pd


def db_init(db_params: dict, query_drop: str, query_create:str):
    """Инициализирует базу данных Postgres."""    
    conn = psycopg2.connect(**db_params)
    cur = conn.cursor()
    cur.execute(query_drop)
    cur.execute(query_create)
    conn.commit()
    cur.close()
    conn.close()


def iter_data(size: int):
    """Генерирует последовательность кортежей с данными пользователя."""    
    for i in range(size):
        yield (i, f"user_{i}", f"user_{i}@example.org")


# num_records = 10_000
# num_records = 100_000
num_records = 1_000_000

query_drop_table = "drop table if exists users"
query_create_table = "create table if not exists users(id int, username varchar(15), email varchar(30))"

db_params = {
    "host": "192.168.1.55",
    "port": "15432",
    "user": "postgres",
    "password": "postgres",
    "dbname": "test",
}
dsn = f"postgresql+psycopg2://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"

# сформировать CSV-файл
csv_file = "/tmp/users.tsv"
fieldnames = ["id", "username", "email"]
with open(csv_file, "w", encoding="utf-8") as f:
    writer = csv.writer(f, delimiter="\t")
    writer.writerow(fieldnames)
    writer.writerows(iter_data(num_records))

### 1. psycopg2

1.1. Вставка с использованием execute в цикле

In [None]:
data_to_insert = iter_data(num_records)

db_init(db_params, query_drop_table, query_create_table)

query = "insert into users values (%s, %s, %s)"

conn = psycopg2.connect(**db_params)
cur = conn.cursor()
# <measuring time start block>
start_time = time.perf_counter()
for record in data_to_insert:
    cur.execute(query, record)
conn.commit()
end_time = time.perf_counter()
# <measuring time end block>
cur.close()
conn.close()
print(
    f"psycopg2 execute в цикле: операция вставки заняла {end_time - start_time:.4f} секунд"
)

1.2. Вставка c использованием executemany

In [None]:
data_to_insert = iter_data(num_records)

db_init(db_params, query_drop_table, query_create_table)

query = "insert into users values (%s, %s, %s)"

conn = psycopg2.connect(**db_params)
cur = conn.cursor()
# <measuring time start block>
start_time = time.perf_counter()
cur.executemany(query, data_to_insert)
conn.commit()
end_time = time.perf_counter()
# <measuring time end block>
cur.close()
conn.close()
print(
    f"psycopg2 executemany: операция вставки заняла {end_time - start_time:.4f} секунд"
)

1.3. Вставка с использованием execute_values

In [None]:
data_to_insert = iter_data(num_records)

db_init(db_params, query_drop_table, query_create_table)

query = "insert into users values %s"

conn = psycopg2.connect(**db_params)
cur = conn.cursor()
# <measuring time start block>
start_time = time.perf_counter()

# execute_values: по умолчанию page_size = 100, при этом значении время вставки ~35 секунд:
psycopg2.extras.execute_values(cur=cur, sql=query, argslist=data_to_insert, page_size=500_000)
conn.commit()
end_time = time.perf_counter()
# <measuring time end block>
cur.close()
conn.close()
print(
    f"psycopg2.extras execute_values: операция вставки заняла {end_time - start_time:.4f} секунд"
)

1.4. Вставка с использованием copy_from

In [None]:
db_init(db_params, query_drop_table, query_create_table)

conn = psycopg2.connect(**db_params)
cur = conn.cursor()
f = open("/tmp/users.tsv", "r", encoding="utf-8")
f.readline()  # пропускаем строку заголовков
# <measuring time start block>
start_time = time.perf_counter()
cur.copy_from(f, "users")
conn.commit()
end_time = time.perf_counter()
# <measuring time end block>
f.close()
cur.close()
conn.close()
print(
    f"psycopg2 copy_from: операция вставки заняла {end_time - start_time:.4f} секунд"
)

### 2. sqlalchemy + pandas DataFrame.to_sql

2.2. Вставка с использованием DataFrame.to_sql

In [None]:
db_init(db_params, query_drop_table, query_create_table)

engine = sqlalchemy.create_engine(dsn)
df = pd.read_csv(csv_file, delimiter="\t")
# <measuring time start block>
start_time = time.perf_counter()
df.to_sql(
    con=engine, name="users", if_exists="append", index=False
)
# df.to_sql(
#     con=engine, name="users", if_exists="append", method='multi', index=False, chunksize=100_000
# )
end_time = time.perf_counter()
# <measuring time end block>
print(
    f"pandas.DataFrame.to_sql: операция вставки заняла {end_time - start_time:.4f} секунд"
)