In [1]:
import math
import random
from tqdm import tqdm

# Model definition

主キーの他に2つのIntegerFieldをもつモデルを定義。
一方はそれらのカラムそれぞれに対して・およびペアでインデクスを張るモデル、もう一方はインデクスを張らないモデル。

In [2]:
import peewee

from peewee import IntegerField
from peewee import Model
from peewee import SqliteDatabase

from peewee import chunked

db = SqliteDatabase(None)

class NoIndexedTable(Model):
    value_1 = IntegerField(unique=False)
    value_2 = IntegerField(unique=False)
    
    class Meta:
        database = db
        
    def __str__(self):
        return "{}-{}".format(self.value_1, self.value_2)
        
class IndexedTable(Model):
    value_1 = IntegerField(index=True, unique=False)
    value_2 = IntegerField(index=True, unique=False)
    
    class Meta:
        database = db
        
        indexes = ((("value_1", "value_2"), True),)
        
    def __str__(self):
        return "{}-{}".format(self.value_1, self.value_2)

# Initialize DB

まっさらなDBを作成する。
IO速度による影響を軽減するためにいくつかのオプションを使用。

In [3]:
import os
os.remove('tmp.db')
db.init('tmp.db', pragmas={'synchronous': 0, 'journal_mode': 'memory'})
db.connect()
db.create_tables([NoIndexedTable, IndexedTable])

def delete_all():
    """Delete all the columns in IndexedTable and NoIndexedTable"""
    IndexedTable.delete().execute()
    NoIndexedTable.delete().execute()

# 実験条件の設定

1億件のinsertをベンチマーク。
SQLiteでは1クエリには最大999変数まで含められ、各モデルの1レコードのinsertに2変数を保存できるため、1クエリ（チャンク）サイズは最大で499である。ここでは`chunk_size`として400を設定。

レコードの各カラムの値としてランダム値を用い、その最大値を`max_value`とする。

このnotebookの実行に全体で約5時間（i7 8700K上で）。

In [4]:
N = 1000 * 1000 * 100
chunk_size = 400
max_value = N ** 2

# (1) Peeweeの通常のインタフェースでのバッチinsertの速度の計測

これがベースライン。

時間についてはtqdmのit/sではなく%timeのWall timeを参照のこと。

## 両モデルにレコードを`N`要素追加するコード

In [5]:
def insert_records_peewee_standard_api(table, tpl=False):
    with db.atomic(), tqdm(total=N) as pbar:
        for _ in range(int(math.ceil(N / chunk_size))):
            if tpl:
                table.insert_many([(random.randint(0, max_value), random.randint(0, max_value))
                                   for _ in range(chunk_size)],
                                  fields=[NoIndexedTable.value_1, NoIndexedTable.value_2]).execute()
            else:
                table.insert_many([{'value_1': random.randint(0, max_value),
                                    'value_2': random.randint(0, max_value)}
                                  for _ in range(chunk_size)]).execute()
            pbar.update(chunk_size)
    assert table.select().count() == N

## インデクスを（主キー以外）もたないモデル

### (1a) Baseline (dict)
通常のinsert_manyをチャンクごとに呼ぶパターン。

### (1b) Baseline (dict)
insert_manyに渡す際小さなdictを大量生成しそれをPeewee内部で確認するオーバヘッドの懸念があるためタプルで渡すようにしたパターン。

In [6]:
%time insert_records_peewee_standard_api(NoIndexedTable)
delete_all()
%time insert_records_peewee_standard_api(NoIndexedTable, tpl=True)
delete_all()

100%|██████████| 100000000/100000000 [19:16<00:00, 86472.91it/s]


CPU times: user 19min 17s, sys: 4.62 s, total: 19min 22s
Wall time: 19min 18s


100%|██████████| 100000000/100000000 [18:08<00:00, 91852.23it/s]


CPU times: user 18min 10s, sys: 3.95 s, total: 18min 14s
Wall time: 18min 10s


dictを大量に生成し渡すよりタプルを用いたほうが約1分高速。

### インデクスをもつモデル

In [7]:
%time insert_records_peewee_standard_api(IndexedTable)
delete_all()
%time insert_records_peewee_standard_api(IndexedTable, tpl=True)
delete_all()

100%|██████████| 100000000/100000000 [54:52<00:00, 30370.43it/s]


CPU times: user 35min 2s, sys: 20min 1s, total: 55min 3s
Wall time: 54min 54s


100%|██████████| 100000000/100000000 [53:54<00:00, 30915.55it/s]


CPU times: user 33min 48s, sys: 20min 17s, total: 54min 6s
Wall time: 53min 56s


インデクスがある場合はinsertの速度が大幅に遅くなっていることがわかる。タプルを渡すのはやはり有効で、インデクスがある場合・ない場合とも約1分の高速化。この1分はdictに絡むオーバヘッドで消費されていることがわかる（1レコードあたり0.6マイクロ秒なのでそんなものだろうか）。

| Table     | Method                                 | Wall Time |  Average speed   | 
|-----------|----------------------------------------|-----------|------------------|
| NoIndexed | (1a) Baseline (dict)                   | 19min 18s |  86356 records/s |
| NoIndexed | (1b) Baseline (tuple)                  | 18min 10s |  91743 records/s |
| Indexed   | (1a) Baseline (dict)                   | 54min 54s |  30358 records/s |
| Indexed   | (1b) Baseline (tuple)                  | 53min 56s |  30902 records/s |

# (2) インデクスを一旦無効化してinsertし再有効化する

アクロバティックでかつあまり安全ではない方法。

インデクスの更新をレコード追加のたびに行うのはオーバヘッドが大きいため、一旦インデクスを削除した状態でレコードを追加した上でインデクスを張り直す。まとめてインデクスを再構築したほうが、レコード追加のたびに更新を行うより効率がいいという仮説に基づく方法。

Peeweeの拡張モジュールplayhouseにあるDBマイグレーションAPIを用いて、インデクスの削除および再構築を行う。

※DBをオンラインに保った状態でこれをするのは危険もしくは動かない。

In [8]:
from playhouse.migrate import SqliteMigrator, migrate
migrator = SqliteMigrator(db)

def drop_index_insert_remake_index():
    table_name = IndexedTable._meta.table_name
    with db.atomic():
        migrate(
            migrator.drop_index(table_name, table_name + '_value_1'),
            migrator.drop_index(table_name, table_name + '_value_2'),
            migrator.drop_index(table_name, table_name + '_value_1_value_2'),
        )

    insert_records_peewee_standard_api(IndexedTable, tpl=True)
    
    with db.atomic():
        migrate(
            migrator.add_index(table_name, ('value_1',), False),
            migrator.add_index(table_name, ('value_2',), False),
            migrator.add_index(table_name, ('value_1', 'value_2'), True),
        )
    assert IndexedTable.select().count() == N

In [9]:
%time drop_index_insert_remake_index()
delete_all()

100%|██████████| 100000000/100000000 [18:13<00:00, 91489.88it/s]


CPU times: user 20min 33s, sys: 15.1 s, total: 20min 48s
Wall time: 20min 45s


| Table       | Method                                 | Wall Time |  Average speed   |
|-------------|----------------------------------------|-----------|------------------|
| NoIndexed   | (1a) Baseline (dict)                   | 19min 18s |  86356 records/s |
| NoIndexed   | (1b) Baseline (tuple)                  | 18min 10s |  91743 records/s |
| Indexed     | (1a) Baseline (dict)                   | 54min 54s |  30358 records/s |
| Indexed     | (1b) Baseline (tuple)                  | 53min 56s |  30902 records/s |
| **Indexed** | **(2) Drop index->insert->Reindex**    | **20min 45s** | **80321 records/s** |

インデクスのあるテーブルに対して、53分56秒→20分45秒（2.6倍）高速化。

# (3) SQLクエリを効率化する

## (3-1) モデルを経由せずSQLiteクエリを直接Peeweeから実行する

前述の方法とは直交する改善案。

極端に大量のレコードを一括で挿入する場合、ORMそのもののオーバヘッドが速度の制約となる可能性がある。

そこで、Peeweeのexecute_sqlを呼び生SQLでレコードを作成する。

In [10]:
def insert_records_peewee_execute_sql(table):
    with db.atomic():
        query = 'insert into {}(value_1, value_2) values (?,?)'.format(table._meta.table_name)
        for _ in tqdm(range(N)):
            db.execute_sql(query, (random.randint(0, max_value), random.randint(0, max_value)))
    assert table.select().count() == N

下記はインデクスのないテーブルの場合の結果。

In [11]:
%time insert_records_peewee_execute_sql(NoIndexedTable)
delete_all()

100%|██████████| 100000000/100000000 [07:21<00:00, 226344.94it/s]


CPU times: user 7min 21s, sys: 3.63 s, total: 7min 25s
Wall time: 7min 23s


下記はインデクスのあるテーブルの場合の結果。

In [12]:
%time insert_records_peewee_execute_sql(IndexedTable)
delete_all()

100%|██████████| 100000000/100000000 [39:46<00:00, 41903.39it/s]


CPU times: user 19min 39s, sys: 20min 13s, total: 39min 52s
Wall time: 39min 47s


| Table       | Method                                 | Wall Time |  Average speed   |
|-------------|----------------------------------------|-----------|------------------|
| NoIndexed   | (1a) Baseline (dict)                   | 19min 18s |  86356 records/s |
| NoIndexed   | (1b) Baseline (tuple)                  | 18min 10s |  91743 records/s |
| Indexed     | (1a) Baseline (dict)                   | 54min 54s |  30358 records/s |
| Indexed     | (1b) Baseline (tuple)                  | 53min 56s |  30902 records/s |
| Indexed     | (2) Drop index->insert->Reindex        | 20min 45s |  80321 records/s |
| **NoIndexed** | **(3-1) Raw SQL through ORM**        | **7min 23s** | **225734 records/s** |
| **Indexed** | **(3-1) Raw SQL through ORM**        | **39min 47s** | **41893 records/s** |

(1)で示した標準APIで追加する場合では、インデクスなしで18分、インデクスありで約54分であり、その差は約36分であった。
ここで示した直接SQL実行でのインデクスなしとありの差は32分であった。
つまりこのおよそ30分強がインデクスの更新で消費されていると言えそう。1要素あたりにならすと18マイクロ秒程度であるが1億件insertでは無視できない。

## (3-2) SQLite APIでPeeweeを一切介さずにレコードを挿入

上記の直接SQL発行では1要素ずつクエリを発行していた。これは現在のPeeweeのAPIの制約によるものであるが、SQLite自体は複数レコードを一度にinsertできる。

オーバヘッドを極力減らすため、ORMを介さずレコード挿入をしてみる。

なおこのときPeeweeで開いたDBは一旦閉じる必要がある。

In [13]:
import sqlite3

def insert_records_direct_sqlite(table):
    con = sqlite3.connect("tmp.db")
    c = con.cursor()
    query = "insert into {}(value_1, value_2) values (?,?)".format(table._meta.table_name)
    c.executemany(query, [(random.randint(0, max_value), random.randint(0, max_value))
                          for _ in tqdm(range(N))])
    con.commit()
    con.close()

def measure(table):
    db.close()
    %time insert_records_direct_sqlite(table)
    db.init('tmp.db')
    assert table.select().count() == N
    delete_all()

インデクスのないテーブルの場合とインデクスのあるテーブルの場合。

In [14]:
measure(NoIndexedTable)
measure(IndexedTable)

100%|██████████| 100000000/100000000 [03:03<00:00, 543872.58it/s]


CPU times: user 3min 56s, sys: 6.18 s, total: 4min 2s
Wall time: 4min 2s


100%|██████████| 100000000/100000000 [03:01<00:00, 550154.27it/s]


CPU times: user 15min 12s, sys: 19min 16s, total: 34min 29s
Wall time: 53min 23s


| Table       | Method                                 | Wall Time |  Average speed   |
|-------------|----------------------------------------|-----------|------------------|
| NoIndexed   | (1a) Baseline (dict)                   | 19min 18s |  86356 records/s |
| NoIndexed   | (1b) Baseline (tuple)                  | 18min 10s |  91743 records/s |
| Indexed     | (1a) Baseline (dict)                   | 54min 54s |  30358 records/s |
| Indexed     | (1b) Baseline (tuple)                  | 53min 56s |  30902 records/s |
| Indexed     | (2) Drop index->insert->Reindex        | 20min 45s |  80321 records/s |
| NoIndexed   | (3-1) Raw SQL through ORM              |  7min 23s | 225734 records/s |
| Indexed     | (3-1) Raw SQL through ORM              | 39min 47s |  41893 records/s |
| **NoIndexed** | **(3-2) Direct SQLite**              |  **4min 2s** | **413223 records/s** |
| **Indexed** | **(3-2) Raw SQL through ORM**          | **53min 23s** |  **31221 records/s** |

インデクスのないテーブルの場合は、execute_sqlを呼ぶ場合に比べてさらに大幅に高速化（7分23秒→4分2秒、差は約3分強）。

当然、インデクスのあるテーブルの場合は全く恩恵に与れない。

# (4) 上記のテクニックを組み合わせる

(2)で示したように、インデクスを一旦削除しレコードを追加した上でインデクス再構築することが（安全性と引き換えに）パフォーマンス上有効。

また(3)で示したように、ORMをすっとばしてレコードを追加することがパフォーマンス上有効。

これらを組み合わせて最速insertを目指す。

## Peeweeにおける汎用的なindexの削除・再構築コード

index名を手打ちしてPeeweeのマイグレータに投げるのは能率が悪いので、これを自動化する。

In [3]:
from playhouse.migrate import SqliteMigrator, migrate

class drop_and_recreate_index:
    def __init__(self, table):
        self.table = table
        self.table_name = table._meta.table_name
        self.migrator = SqliteMigrator(table._meta.database)
        
        self.indexed_columns = [(name, col.unique)
                                for name, col in self.table._meta.columns.items()
                                if col.index]
        
        # [(('value_1', 'value_2'), True), ...]
        self.multi_column_indexes = self.table._meta.indexes

    def drop_index(self):
        with self.table._meta.database.atomic():
            # Drop column indexes
            for name, _ in self.indexed_columns:
                idx_name = self.table_name + '_' + name
                migrate(migrator.drop_index(self.table_name, idx_name))
                
            # Drop multi-column indexes
            for columns, _ in self.multi_column_indexes:
                idx_name = "_".join([self.table_name] + list(columns))
                migrate(migrator.drop_index(self.table_name, idx_name))

    def recreate_index(self):
        with self.table._meta.database.atomic():
            # Recreate column indexes
            for name, unique in self.indexed_columns:
                migrate(migrator.add_index(self.table_name, (name,), unique))
            
            # Recreate multi-column indexes
            for (columns, unique) in self.multi_column_indexes:
                migrate(migrator.add_index(self.table_name, columns, unique))

    def __enter__(self):
        self.drop_index()
    
    def __exit__(self, exception_type, exception_value, traceback):
        self.recreate_index()

上記のコードで、このように使える。

```python
with drop_and_recreate_index(TableName):
    # insert records to DB
```

なお途中で失敗した場合にインデクスがない状態のままDBが残る危険があることなど実装としては十分安全に詰められていないので実際に使う場合は注意されたい。

## (4-1) execute_sqlでのレコード挿入

In [16]:
def insert_records_peewee_execute_sql_drop_index():
    with drop_and_recreate_index(IndexedTable):
        insert_records_peewee_execute_sql(IndexedTable)
    assert IndexedTable.select().count() == N

In [17]:
%time insert_records_peewee_execute_sql_drop_index()
delete_all()

100%|██████████| 100000000/100000000 [07:23<00:00, 225438.44it/s]


CPU times: user 9min 45s, sys: 13.8 s, total: 9min 59s
Wall time: 9min 57s


## (4-2) sqlite3直叩きでのレコード挿入

In [18]:
def insert_records_direct_sqlite_drop_index():
    with drop_and_recreate_index(IndexedTable):
        insert_records_direct_sqlite(IndexedTable)
    assert IndexedTable.select().count() == N

In [19]:
%time insert_records_direct_sqlite_drop_index()
delete_all()

100%|██████████| 100000000/100000000 [03:02<00:00, 548998.48it/s]


CPU times: user 6min 15s, sys: 17.8 s, total: 6min 33s
Wall time: 6min 33s


| Table       | Method                                 | Wall Time |  Average speed   |
|-------------|----------------------------------------|-----------|------------------|
| NoIndexed   | (1a) Baseline (dict)                   | 19min 18s |  86356 records/s |
| NoIndexed   | (1b) Baseline (tuple)                  | 18min 10s |  91743 records/s |
| Indexed     | (1a) Baseline (dict)                   | 54min 54s |  30358 records/s |
| Indexed     | (1b) Baseline (tuple)                  | 53min 56s |  30902 records/s |
| Indexed     | (2) Drop index->insert->Reindex        | 20min 45s |  80321 records/s |
| NoIndexed   | (3-1) Raw SQL through ORM              |  7min 23s | 225734 records/s |
| Indexed     | (3-1) Raw SQL through ORM              | 39min 47s |  41893 records/s |
| NoIndexed   | (3-2) Direct SQLite                    | 4min 2s   | 413223 records/s |
| Indexed     | (3-2) Raw SQL through ORM              | 53min 23s |  31221 records/s |
| **Indexed** | **(4-1) (2)+(3-1)**                    | **9min 57s** |  **167504 records/s** |
| **Indexed** | **(4-2) (2)+(3-2)**                    | **6min 33s** |  **254453 records/s** |

インデクスをもつモデルでも1億件のinsertを6分33秒で完了（インデクス破棄・再構築の時間を全て含む）、平均して254000レコード毎秒を達成。通常のバッチ＋トランザクションでのinsertでは3.1万レコード毎秒が最速であったので、8倍以上の高速化を達成。

インデクスをもたないモデルの場合でも、9.2万レコード毎秒であったものが41万レコード毎秒と4倍以上の高速化。

ORMはもちろん柔軟性や安全性を担保するために必要な各種の仕組みのために細かなオーバヘッドが生じているが、DBが壊れるリスクを承知の上、また挿入する値がアプリケーションロジック上正当であることが確実である場合でありかつ極端に多数のinsertを行う場合については、これらのテクニックが有効。