Skip to content

Commit

Permalink
use synchronous sqlite3 library in db conversion function (#10259)
Browse files Browse the repository at this point in the history
  • Loading branch information
arvidn committed Feb 17, 2022
1 parent 4920a46 commit a7c73e4
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 79 deletions.
178 changes: 100 additions & 78 deletions chia/cmds/db_upgrade_func.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
from typing import Dict, Optional
import sqlite3
from pathlib import Path
import sys
from time import time

import asyncio
import zstd

from chia.util.config import load_config, save_config
from chia.util.path import mkdir, path_from_root
from chia.full_node.block_store import BlockStore
from chia.full_node.coin_store import CoinStore
from chia.full_node.hint_store import HintStore
from chia.util.ints import uint32
from chia.types.blockchain_format.sized_bytes import bytes32


Expand Down Expand Up @@ -46,7 +40,7 @@ def db_upgrade_func(
out_db_path = path_from_root(root_path, db_path_replaced)
mkdir(out_db_path.parent)

asyncio.run(convert_v1_to_v2(in_db_path, out_db_path))
convert_v1_to_v2(in_db_path, out_db_path)

if update_config:
print("updating config.yaml")
Expand All @@ -65,40 +59,40 @@ def db_upgrade_func(
COIN_COMMIT_RATE = 30000


async def convert_v1_to_v2(in_path: Path, out_path: Path) -> None:
import aiosqlite
from chia.util.db_wrapper import DBWrapper
def convert_v1_to_v2(in_path: Path, out_path: Path) -> None:
import sqlite3
import zstd

from contextlib import closing

if out_path.exists():
print(f"output file already exists. {out_path}")
raise RuntimeError("already exists")

print(f"opening file for reading: {in_path}")
async with aiosqlite.connect(in_path) as in_db:
with closing(sqlite3.connect(in_path)) as in_db:
try:
async with in_db.execute("SELECT * from database_version") as cursor:
row = await cursor.fetchone()
with closing(in_db.execute("SELECT * from database_version")) as cursor:
row = cursor.fetchone()
if row is not None and row[0] != 1:
print(f"blockchain database already version {row[0]}\nDone")
raise RuntimeError("already v2")
except aiosqlite.OperationalError:
except sqlite3.OperationalError:
pass

store_v1 = await BlockStore.create(DBWrapper(in_db, db_version=1))

print(f"opening file for writing: {out_path}")
async with aiosqlite.connect(out_path) as out_db:
await out_db.execute("pragma journal_mode=OFF")
await out_db.execute("pragma synchronous=OFF")
await out_db.execute("pragma cache_size=131072")
await out_db.execute("pragma locking_mode=exclusive")
with closing(sqlite3.connect(out_path)) as out_db:
out_db.execute("pragma journal_mode=OFF")
out_db.execute("pragma synchronous=OFF")
out_db.execute("pragma cache_size=131072")
out_db.execute("pragma locking_mode=exclusive")

print("initializing v2 version")
await out_db.execute("CREATE TABLE database_version(version int)")
await out_db.execute("INSERT INTO database_version VALUES(?)", (2,))
out_db.execute("CREATE TABLE database_version(version int)")
out_db.execute("INSERT INTO database_version VALUES(?)", (2,))

print("initializing v2 block store")
await out_db.execute(
out_db.execute(
"CREATE TABLE full_blocks("
"header_hash blob PRIMARY KEY,"
"prev_hash blob,"
Expand All @@ -109,16 +103,22 @@ async def convert_v1_to_v2(in_path: Path, out_path: Path) -> None:
"block blob,"
"block_record blob)"
)
await out_db.execute(
out_db.execute(
"CREATE TABLE sub_epoch_segments_v3(" "ses_block_hash blob PRIMARY KEY," "challenge_segments blob)"
)
await out_db.execute("CREATE TABLE current_peak(key int PRIMARY KEY, hash blob)")

peak_hash, peak_height = await store_v1.get_peak()
out_db.execute("CREATE TABLE current_peak(key int PRIMARY KEY, hash blob)")

with closing(in_db.execute("SELECT header_hash, height from block_records WHERE is_peak = 1")) as cursor:
peak_row = cursor.fetchone()
if peak_row is None:
print("v1 database does not have a peak block, there is no blockchain to convert")
raise RuntimeError("no blockchain")
peak_hash = bytes32(bytes.fromhex(peak_row[0]))
peak_height = uint32(peak_row[1])
print(f"peak: {peak_hash.hex()} height: {peak_height}")

await out_db.execute("INSERT INTO current_peak VALUES(?, ?)", (0, peak_hash))
await out_db.commit()
out_db.execute("INSERT INTO current_peak VALUES(?, ?)", (0, peak_hash))
out_db.commit()

print("[1/5] converting full_blocks")
height = peak_height + 1
Expand All @@ -130,23 +130,27 @@ async def convert_v1_to_v2(in_path: Path, out_path: Path) -> None:
block_start_time = start_time
block_values = []

async with in_db.execute(
"SELECT header_hash, prev_hash, block, sub_epoch_summary FROM block_records ORDER BY height DESC"
with closing(
in_db.execute(
"SELECT header_hash, prev_hash, block, sub_epoch_summary FROM block_records ORDER BY height DESC"
)
) as cursor:
async with in_db.execute(
"SELECT header_hash, height, is_fully_compactified, block FROM full_blocks ORDER BY height DESC"
with closing(
in_db.execute(
"SELECT header_hash, height, is_fully_compactified, block FROM full_blocks ORDER BY height DESC"
)
) as cursor_2:

await out_db.execute("begin transaction")
async for row in cursor:
out_db.execute("begin transaction")
for row in cursor:

header_hash = bytes.fromhex(row[0])
if header_hash != hh:
continue

# progress cursor_2 until we find the header hash
while True:
row_2 = await cursor_2.fetchone()
row_2 = cursor_2.fetchone()
if row_2 is None:
print(f"ERROR: could not find block {hh.hex()}")
raise RuntimeError(f"block {hh.hex()} not found")
Expand All @@ -158,7 +162,7 @@ async def convert_v1_to_v2(in_path: Path, out_path: Path) -> None:
is_fully_compactified = row_2[2]
block_bytes = row_2[3]

prev_hash = bytes.fromhex(row[1])
prev_hash = bytes32.fromhex(row[1])
block_record = row[2]
ses = row[3]

Expand All @@ -185,18 +189,18 @@ async def convert_v1_to_v2(in_path: Path, out_path: Path) -> None:
commit_in -= 1
if commit_in == 0:
commit_in = BLOCK_COMMIT_RATE
await out_db.executemany(
out_db.executemany(
"INSERT OR REPLACE INTO full_blocks VALUES(?, ?, ?, ?, ?, ?, ?, ?)", block_values
)
await out_db.commit()
await out_db.execute("begin transaction")
out_db.commit()
out_db.execute("begin transaction")
block_values = []
end_time = time()
rate = BLOCK_COMMIT_RATE / (end_time - start_time)
start_time = end_time

await out_db.executemany("INSERT OR REPLACE INTO full_blocks VALUES(?, ?, ?, ?, ?, ?, ?, ?)", block_values)
await out_db.commit()
out_db.executemany("INSERT OR REPLACE INTO full_blocks VALUES(?, ?, ?, ?, ?, ?, ?, ?)", block_values)
out_db.commit()
end_time = time()
print(f"\r {end_time - block_start_time:.2f} seconds ")

Expand All @@ -205,10 +209,12 @@ async def convert_v1_to_v2(in_path: Path, out_path: Path) -> None:
commit_in = SES_COMMIT_RATE
ses_values = []
ses_start_time = time()
async with in_db.execute("SELECT ses_block_hash, challenge_segments FROM sub_epoch_segments_v3") as cursor:
with closing(
in_db.execute("SELECT ses_block_hash, challenge_segments FROM sub_epoch_segments_v3")
) as cursor:
count = 0
await out_db.execute("begin transaction")
async for row in cursor:
out_db.execute("begin transaction")
for row in cursor:
block_hash = bytes32.fromhex(row[0])
ses = row[1]
ses_values.append((block_hash, ses))
Expand All @@ -220,13 +226,13 @@ async def convert_v1_to_v2(in_path: Path, out_path: Path) -> None:
commit_in -= 1
if commit_in == 0:
commit_in = SES_COMMIT_RATE
await out_db.executemany("INSERT INTO sub_epoch_segments_v3 VALUES (?, ?)", ses_values)
await out_db.commit()
await out_db.execute("begin transaction")
out_db.executemany("INSERT INTO sub_epoch_segments_v3 VALUES (?, ?)", ses_values)
out_db.commit()
out_db.execute("begin transaction")
ses_values = []

await out_db.executemany("INSERT INTO sub_epoch_segments_v3 VALUES (?, ?)", ses_values)
await out_db.commit()
out_db.executemany("INSERT INTO sub_epoch_segments_v3 VALUES (?, ?)", ses_values)
out_db.commit()

end_time = time()
print(f"\r {end_time - ses_start_time:.2f} seconds ")
Expand All @@ -236,32 +242,32 @@ async def convert_v1_to_v2(in_path: Path, out_path: Path) -> None:
commit_in = HINT_COMMIT_RATE
hint_start_time = time()
hint_values = []
await out_db.execute("CREATE TABLE hints(coin_id blob, hint blob, UNIQUE (coin_id, hint))")
await out_db.commit()
out_db.execute("CREATE TABLE hints(coin_id blob, hint blob, UNIQUE (coin_id, hint))")
out_db.commit()
try:
async with in_db.execute("SELECT coin_id, hint FROM hints") as cursor:
with closing(in_db.execute("SELECT coin_id, hint FROM hints")) as cursor:
count = 0
await out_db.execute("begin transaction")
async for row in cursor:
out_db.execute("begin transaction")
for row in cursor:
hint_values.append((row[0], row[1]))
commit_in -= 1
if commit_in == 0:
commit_in = HINT_COMMIT_RATE
await out_db.executemany("INSERT OR IGNORE INTO hints VALUES(?, ?)", hint_values)
await out_db.commit()
await out_db.execute("begin transaction")
out_db.executemany("INSERT OR IGNORE INTO hints VALUES(?, ?)", hint_values)
out_db.commit()
out_db.execute("begin transaction")
hint_values = []
except sqlite3.OperationalError:
print(" no hints table, skipping")

await out_db.executemany("INSERT OR IGNORE INTO hints VALUES (?, ?)", hint_values)
await out_db.commit()
out_db.executemany("INSERT OR IGNORE INTO hints VALUES (?, ?)", hint_values)
out_db.commit()

end_time = time()
print(f"\r {end_time - hint_start_time:.2f} seconds ")

print("[4/5] converting coin_store")
await out_db.execute(
out_db.execute(
"CREATE TABLE coin_record("
"coin_name blob PRIMARY KEY,"
" confirmed_index bigint,"
Expand All @@ -272,21 +278,24 @@ async def convert_v1_to_v2(in_path: Path, out_path: Path) -> None:
" amount blob," # we use a blob of 8 bytes to store uint64
" timestamp bigint)"
)
await out_db.commit()
out_db.commit()

commit_in = COIN_COMMIT_RATE
rate = 1.0
start_time = time()
coin_values = []
coin_start_time = start_time
async with in_db.execute(
"SELECT coin_name, confirmed_index, spent_index, coinbase, puzzle_hash, coin_parent, amount, timestamp "
"FROM coin_record WHERE confirmed_index <= ?",
(peak_height,),
with closing(
in_db.execute(
"SELECT coin_name, confirmed_index, spent_index, coinbase, "
"puzzle_hash, coin_parent, amount, timestamp "
"FROM coin_record WHERE confirmed_index <= ?",
(peak_height,),
)
) as cursor:
count = 0
await out_db.execute("begin transaction")
async for row in cursor:
out_db.execute("begin transaction")
for row in cursor:
spent_index = row[2]

# in order to convert a consistent snapshot of the
Expand Down Expand Up @@ -314,26 +323,39 @@ async def convert_v1_to_v2(in_path: Path, out_path: Path) -> None:
commit_in -= 1
if commit_in == 0:
commit_in = COIN_COMMIT_RATE
await out_db.executemany("INSERT INTO coin_record VALUES(?, ?, ?, ?, ?, ?, ?, ?)", coin_values)
await out_db.commit()
await out_db.execute("begin transaction")
out_db.executemany("INSERT INTO coin_record VALUES(?, ?, ?, ?, ?, ?, ?, ?)", coin_values)
out_db.commit()
out_db.execute("begin transaction")
coin_values = []
end_time = time()
rate = COIN_COMMIT_RATE / (end_time - start_time)
start_time = end_time

await out_db.executemany("INSERT INTO coin_record VALUES(?, ?, ?, ?, ?, ?, ?, ?)", coin_values)
await out_db.commit()
out_db.executemany("INSERT INTO coin_record VALUES(?, ?, ?, ?, ?, ?, ?, ?)", coin_values)
out_db.commit()
end_time = time()
print(f"\r {end_time - coin_start_time:.2f} seconds ")

print("[5/5] build indices")
index_start_time = time()
print(" block store")
await BlockStore.create(DBWrapper(out_db, db_version=2))
out_db.execute("CREATE INDEX height on full_blocks(height)")
out_db.execute(
"CREATE INDEX is_fully_compactified ON"
" full_blocks(is_fully_compactified, in_main_chain) WHERE in_main_chain=1"
)
out_db.execute("CREATE INDEX main_chain ON full_blocks(height, in_main_chain) WHERE in_main_chain=1")
out_db.commit()
print(" coin store")
await CoinStore.create(DBWrapper(out_db, db_version=2))

out_db.execute("CREATE INDEX IF NOT EXISTS coin_confirmed_index on coin_record(confirmed_index)")
out_db.execute("CREATE INDEX IF NOT EXISTS coin_spent_index on coin_record(spent_index)")
out_db.execute("CREATE INDEX IF NOT EXISTS coin_puzzle_hash on coin_record(puzzle_hash)")
out_db.execute("CREATE INDEX IF NOT EXISTS coin_parent_index on coin_record(coin_parent)")
out_db.commit()
print(" hint store")
await HintStore.create(DBWrapper(out_db, db_version=2))

out_db.execute("CREATE TABLE IF NOT EXISTS hints(coin_id blob, hint blob, UNIQUE (coin_id, hint))")
out_db.commit()
end_time = time()
print(f"\r {end_time - index_start_time:.2f} seconds ")
4 changes: 4 additions & 0 deletions chia/full_node/block_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ async def create(cls, db_wrapper: DBWrapper):
# peak. The "key" field is there to make update statements simple
await self.db.execute("CREATE TABLE IF NOT EXISTS current_peak(key int PRIMARY KEY, hash blob)")

# If any of these indices are altered, they should also be altered
# in the chia/cmds/db_upgrade.py file
await self.db.execute("CREATE INDEX IF NOT EXISTS height on full_blocks(height)")

# Sub epoch segments for weight proofs
Expand All @@ -61,6 +63,8 @@ async def create(cls, db_wrapper: DBWrapper):
"challenge_segments blob)"
)

# If any of these indices are altered, they should also be altered
# in the chia/cmds/db_upgrade.py file
await self.db.execute(
"CREATE INDEX IF NOT EXISTS is_fully_compactified ON"
" full_blocks(is_fully_compactified, in_main_chain) WHERE in_main_chain=1"
Expand Down
2 changes: 1 addition & 1 deletion tests/core/test_db_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async def test_blocks(self, default_1000_blocks, with_hints: bool):
assert err is None

# now, convert v1 in_file to v2 out_file
await convert_v1_to_v2(in_file, out_file)
convert_v1_to_v2(in_file, out_file)

async with aiosqlite.connect(in_file) as conn, aiosqlite.connect(out_file) as conn2:

Expand Down

0 comments on commit a7c73e4

Please sign in to comment.