Skip to content

Commit

Permalink
Improve upgrade speed by creating indexes at separate stage
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovsky authored and ichorid committed Dec 1, 2020
1 parent abec721 commit 3c9fb74
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 25 deletions.
36 changes: 29 additions & 7 deletions src/tribler-core/tribler_core/modules/metadata_store/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,11 @@ def sqlite_disable_sync(_, connection):
self.ChannelMetadata._channels_dir = channels_dir

self._db.bind(provider='sqlite', filename=str(db_filename), create_db=create_db, timeout=120.0)
if create_db:
with db_session:
self._db.execute(sql_create_fts_table)
self._db.generate_mapping(create_tables=create_db) # Must be run out of session scope
if create_db:
with db_session:
self._db.execute(sql_add_fts_trigger_insert)
self._db.execute(sql_add_fts_trigger_delete)
self._db.execute(sql_add_fts_trigger_update)
with db_session(ddl=True):
self._db.execute(sql_create_fts_table)
self.create_fts_triggers()

if create_db:
with db_session:
Expand All @@ -162,6 +158,32 @@ def sqlite_disable_sync(_, connection):
default_vsids = self.Vsids.create_default_vsids()
self.ChannelMetadata.votes_scaling = default_vsids.max_val

def drop_indexes(self):
cursor = self._db.get_connection().cursor()
cursor.execute("select name from sqlite_master where type='index' and name like 'idx_%'")
for [index_name] in cursor.fetchall():
cursor.execute("drop index %s" % index_name)

def recreate_indexes(self):
connection = self._db.get_connection()
self._db.schema.create_tables(self._db.provider, connection)

def drop_fts_triggers(self):
cursor = self._db.get_connection().cursor()
cursor.execute("select name from sqlite_master where type='trigger' and name like 'fts_%'")
for [trigger_name] in cursor.fetchall():
cursor.execute("drop trigger %s" % trigger_name)

def create_fts_triggers(self):
cursor = self._db.get_connection().cursor()
cursor.execute(sql_add_fts_trigger_insert)
cursor.execute(sql_add_fts_trigger_delete)
cursor.execute(sql_add_fts_trigger_update)

def fill_fts_index(self):
cursor = self._db.get_connection().cursor()
cursor.execute("insert into FtsIndex(rowid, title) select rowid, title from ChannelNode")

@db_session
def upsert_vote(self, channel, peer_pk):
voter = self.ChannelPeer.get(public_key=peer_pk)
Expand Down
49 changes: 32 additions & 17 deletions src/tribler-core/tribler_core/upgrade/db8_to_db10.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
import sqlite3
from asyncio import sleep
from collections import deque
from time import time as now

TABLE_NAMES = (
"ChannelNode", "TorrentState", "TorrentState_TrackerState", "ChannelPeer", "ChannelVote", "TrackerState", "Vsids")
Expand All @@ -17,50 +19,62 @@ def __init__(self, old_db_path, new_db_path, notifier_callback=None, logger=None
self.new_db_path = new_db_path
self.shutting_down = False

async def update_convert_progress(self, amount, total, elapsed, message=""):
async def update_convert_progress(self, amount, total, eta, message=""):
if self.notifier_callback:
elapsed = 0.0001 if elapsed == 0.0 else elapsed
amount = amount or 1
est_speed = amount / elapsed
eta = str(datetime.timedelta(seconds=int((total - amount) / est_speed)))
self.notifier_callback(
f"{message}\nConverted: {amount}/{total} ({(amount * 100) // total}%).\nTime remaining: {eta}"
)
await sleep(0.001)

async def convert_async(self, convert_command, total_to_convert, offset=0, message=""):
async def convert_async(self, cursor, convert_command, total_to_convert, offset=0, message=""):
"""
This method copies entries from one Pony db into another one splitting the process into chunks dynamically.
Chunks splitting uses congestion-control-like algorithm. Awaits are necessary so the
reactor can get an opportunity at serving other tasks, such as sending progress notifications to
the GUI through the REST API.
"""
start_time = datetime.datetime.utcnow()
last_commit_time = now()
batch_size = 100

reference_timedelta = datetime.timedelta(milliseconds=1000)
speed_list = deque(maxlen=20)

reference_timedelta = 1.0
while offset < total_to_convert:
if self.shutting_down:
break
end = offset + batch_size

batch_start_time = datetime.datetime.now()
batch_start_time = now()
convert_command(offset, batch_size)
batch_end_time = datetime.datetime.now() - batch_start_time
batch_end_time = now()
batch_duration = batch_end_time - batch_start_time

remaining = total_to_convert - offset
est_speed = batch_size / max(batch_duration, 0.001)
speed_list.append(est_speed)
avg_est_speed = sum(speed_list) / len(speed_list)
eta = str(datetime.timedelta(seconds=int(remaining / avg_est_speed)))

await self.update_convert_progress(offset, total_to_convert, eta, message)

elapsed = (datetime.datetime.utcnow() - start_time).total_seconds()
await self.update_convert_progress(offset, total_to_convert, elapsed, message)
target_coeff = (batch_end_time.total_seconds() / reference_timedelta.total_seconds())
target_coeff = batch_duration / reference_timedelta
if target_coeff < 0.8:
batch_size += batch_size
elif target_coeff > 1.1:
batch_size = int(float(batch_size) / target_coeff)
batch_size = int(batch_size / target_coeff)
# we want to guarantee that at least some entries will go through
batch_size = batch_size if batch_size > 10 else 10
batch_size = max(10, batch_size)

self._logger.info("Converted: %i/%i %f ",
offset + batch_size, total_to_convert, float(batch_end_time.total_seconds()))
offset + batch_size, total_to_convert, batch_duration)
offset = end

if offset >= total_to_convert or now() - last_commit_time > 10:
cursor.execute("commit")
cursor.execute("begin transaction")
self._logger.info('batch size: %d' % batch_size)
last_commit_time = now()

def get_table_entries_count(self, cursor, table_name):
return cursor.execute(f"SELECT COUNT(*) FROM {table_name};").fetchone()[0]

Expand All @@ -84,7 +98,7 @@ def convert_command(offset, batch_size):
self._logger.error("Error while executing conversion command: %s, SQL %s ", str(e), sql_command)

old_entries_count = self.get_table_entries_count(cursor, f"old_db.{table_name}")
await self.convert_async(convert_command, old_entries_count, message=f"Converting DB table {table_name}")
await self.convert_async(cursor, convert_command, old_entries_count, message=f"Converting DB table {table_name}")

async def do_migration(self):

Expand All @@ -106,6 +120,7 @@ async def do_migration(self):
await self.convert_table(cursor, table_name, old_table_columns[table_name])
cursor.execute("COMMIT;")
self.notifier_callback("Synchronizing the upgraded DB to disk, please wait.")
await sleep(0.001)


def get_table_columns(db_path, table_name):
Expand Down
24 changes: 23 additions & 1 deletion src/tribler-core/tribler_core/upgrade/upgrade.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import shutil
from asyncio import sleep
from configparser import MissingSectionHeaderError, ParsingError

from pony.orm import db_session
Expand All @@ -15,8 +16,8 @@
convert_config_to_tribler75,
convert_config_to_tribler76,
)
from tribler_core.upgrade.db72_to_pony import DispersyToPonyMigration, cleanup_pony_experimental_db, should_upgrade
from tribler_core.upgrade.db8_to_db10 import PonyToPonyMigration, get_db_version
from tribler_core.upgrade.db72_to_pony import DispersyToPonyMigration, cleanup_pony_experimental_db, should_upgrade
from tribler_core.utilities.configparser import CallbackConfigParser


Expand Down Expand Up @@ -114,14 +115,34 @@ async def upgrade_pony_db_8to10(self):
# Clean the previous temp database
if tmp_database_path.exists():
tmp_database_path.unlink()

# Create the new database
mds = MetadataStore(tmp_database_path, None, self.session.trustchain_keypair, disable_sync=True)
with db_session(ddl=True):
mds.drop_indexes()
mds.drop_fts_triggers()
mds.shutdown()

self._pony2pony = PonyToPonyMigration(database_path, tmp_database_path, self.update_status, logger=self._logger)

await self._pony2pony.do_migration()

try:
if not self._pony2pony.shutting_down:
# Connect again to recreate indexes and FTS
self.update_status("recreating indexes...")
await sleep(0.001)
with db_session(ddl=True):
mds.recreate_indexes()
self.update_status("adding full text search index...")
await sleep(0.001)
with db_session(ddl=True):
mds.create_fts_triggers()
mds.fill_fts_index()
mds.shutdown()
except Exception as e:
self._pony2pony.shutting_down = True

# Remove the old DB
database_path.unlink()
if not self._pony2pony.shutting_down:
Expand All @@ -131,6 +152,7 @@ async def upgrade_pony_db_8to10(self):
# The upgrade process was either skipped or interrupted. Delete the temp upgrade DB.
if tmp_database_path.exists():
tmp_database_path.unlink()

self.notify_done()

def upgrade_pony_db_7to8(self):
Expand Down

0 comments on commit 3c9fb74

Please sign in to comment.