# Benchmark: SQL inserting

Several different ways of inserting data into Postgres, and a benchmark suite at the bottom.

In [None]:
import time
from io import StringIO
from io import BytesIO
from struct import pack

import joblib
import matplotlib.pyplot as plt
import psycopg2
import psycopg2.extras as extras
import numpy as np
import pandas as pd

# insert

In [None]:
def insert(df, table):
    for _, row in df.iterrows():
        cur.execute(
            f"""
            INSERT INTO {table} (date, loc, varA, varB, varC)
            VALUES (%s, %s, %s, %s, %s)
            """,
            row.tolist(),
        )

# execute_values

In [None]:
def values(df, table):
    sql = f"INSERT INTO {table} (date, loc, varA, varB, varC) VALUES %s"
    extras.execute_values(cur, sql, df.values)

# upsert

In [None]:
def upsert(df, table):
    sql = f"""
    INSERT INTO {table} (date, loc, varA, varB, varC) 
    VALUES %s
    ON CONFLICT (date, loc)
    DO UPDATE SET varA = excluded.varA, 
                  varB = excluded.varB,
                  varC = excluded.varC;
    """
    extras.execute_values(cur, sql, df.values)

# copy_from

In [None]:
def copy(df, table):
    tmp = "tmp.csv"
    df.to_csv(tmp, index=False, header=False)
    f = open(tmp, "r")
    cur.copy_from(f, table, sep=",")

# copy_from memory

In [None]:
def copy_mem(df, table):
    buff = StringIO()
    df.to_csv(buff, index=False, header=False)
    buff.seek(0)
    cur.copy_from(buff, table, sep=",")

# copy_from ignore duplicates

In [None]:
def make_temp(table, temp):
    cur.execute(
        f"""
    CREATE TEMP TABLE {name} ON COMMIT DROP
    AS SELECT * FROM {table} WITH NO DATA;
    """
    )

In [None]:
def temp_to_main(temp, table):
    cur.execute(
        f"""
    INSERT INTO {table}
    SELECT * FROM {temp}
    ON CONFLICT (date, loc)
    DO UPDATE SET varA = excluded.varA, 
                  varB = excluded.varB,
                  varC = excluded.varC;
    """
    )

In [None]:
def copy_mem_upsert(df, table):
    temp = "tmp"
    make_temp(table, temp)
    copy_mem(df, temp)
    temp_to_main(table, temp)

# copy_from binary
https://stackoverflow.com/a/8150329

In [None]:
def prepare_binary(data):
    pgcopy_dtype = [("num_fields", ">i2")]
    for field, dtype in data.dtype.descr:
        pgcopy_dtype += [(field + "_length", ">i4"), (field, dtype.replace("<", ">"))]
    pgcopy = np.empty(data.shape, pgcopy_dtype)
    pgcopy["num_fields"] = len(data.dtype)
    for i in range(len(data.dtype)):
        field = data.dtype.names[i]
        pgcopy[field + "_length"] = data.dtype[i].alignment
        pgcopy[field] = data[field]
    byt = BytesIO()
    byt.write(pack("!11sii", b"PGCOPY\n\377\r\n\0", 0, 0))
    byt.write(pgcopy.tobytes())
    byt.write(pack("!h", -1))
    byt.seek(0)
    return byt

In [None]:
def copy_bin(df, table):
    data = df.to_records(index=False)
    byt = prepare_binary(data)
    cur.copy_expert(f"COPY {table} FROM STDIN WITH BINARY", byt)

# copy_from binary ignore

In [None]:
def copy_bin_upsert(df, table):
    temp = make_temp(table)
    copy_bin(df, temp)
    temp_to_main(temp, table)

# Benchmarking

In [None]:
class Benchmarker:
    def __init__(self, nums, loops, df, table):
        self.nums = nums
        self.loops = loops
        self.df = df
        self.table = table
        self.times = pd.DataFrame(index=nums)

    def prep(self):
        try:
            cur.execute(
                f"""
            CREATE TABLE {self.table}    
            (
                date integer NOT NULL,
                loc integer NOT NULL,
                varA real NOT NULL,
                varB real NOT NULL,
                varC real NOT NULL,
                UNIQUE (date, loc)
            )
            """
            )
        except:
            pass
        finally:
            conn.commit()

    def drop(self):
        try:
            cur.execute(f"DROP TABLE {self.table}")
        except:
            pass
        finally:
            conn.commit()

    def bench(self, func):
        res = []
        print(func.__name__, end=": ")
        for num in nums:
            print(num, end="  ")
            best = 9e9
            for _ in range(self.loops):
                d = self.df[:num]
                conn.commit()
                self.drop()
                self.prep()
                start = time.time()
                func(d, self.table)
                conn.commit()
                elapsed = time.time() - start
                if elapsed < best:
                    best = elapsed
            res.append(best)
        self.times[func.__name__] = res
        print()

In [None]:
df = joblib.load("data/df.joblib")

In [None]:
conn = psycopg2.connect(dbname="climate", user="chris",)
cur = conn.cursor()

In [None]:
nums = [1, 10, 100, 1000, 10000, 100000, 1000000]
loops = 4
b = Benchmarker(nums, loops, df, table="test")

In [None]:
b.bench(insert)
b.bench(values)
b.bench(upsert)
b.bench(copy)
b.bench(copy_mem)
b.bench(copy_mem_upsert)
b.bench(copy_bin)
b.bench(copy_bin_upsert)

In [None]:
b.times.to_csv("sql_times.csv")

In [None]:
fix, ax = plt.subplots(figsize=(20, 10))
b.times.plot(ax=ax)
plt.show()