Skip to content

Commit

Permalink
Merge pull request #5758 from ichorid/feature/upgrade_76
Browse files Browse the repository at this point in the history
Add upgrade procedure for Channels DB v8 to v10
  • Loading branch information
ichorid committed Dec 1, 2020
2 parents 7ce99c4 + 334e32d commit abec721
Show file tree
Hide file tree
Showing 13 changed files with 249 additions and 71 deletions.
15 changes: 9 additions & 6 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/Tribler/mirrors-autoflake
rev: v1.1
rev: v1.4
hooks:
- id: autoflake
args: ['--in-place', '--remove-all-unused-imports', '--remove-unused-variable']
Expand All @@ -13,7 +13,7 @@ repos:
types: [file, python]

- repo: https://github.com/ambv/black
rev: 19.3b0
rev: 20.8b1
hooks:
- id: black
files: |
Expand All @@ -24,8 +24,11 @@ repos:
)
types: [file, python]

- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.2.3
- repo: https://gitlab.com/pycqa/flake8
rev: 3.7.3
hooks:
- id: flake8
types: [file, python]
- id: flake8
types: [file, python]
additional_dependencies: [
'flake8-import-order==0.18',
]
5 changes: 3 additions & 2 deletions src/tribler-core/tribler_core/modules/metadata_store/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ def __init__(self, db_filename, channels_dir, my_key, disable_sync=False, notifi
def sqlite_disable_sync(_, connection):
cursor = connection.cursor()
cursor.execute("PRAGMA journal_mode = WAL")
cursor.execute("PRAGMA synchronous = 1")
cursor.execute("PRAGMA temp_store = 2")
cursor.execute("PRAGMA synchronous = NORMAL")
cursor.execute("PRAGMA temp_store = MEMORY")
cursor.execute("PRAGMA foreign_keys = ON")

# Disable disk sync for special cases
if disable_sync:
Expand Down
5 changes: 1 addition & 4 deletions src/tribler-core/tribler_core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,7 @@ async def start(self):
if self.upgrader_enabled and not self.core_test_mode:
self.upgrader = TriblerUpgrader(self)
self.readable_status = STATE_UPGRADING_READABLE
try:
await self.upgrader.run()
except Exception as e:
self._logger.error("Error in Upgrader callback chain: %s", e)
await self.upgrader.run()

self.tracker_manager = TrackerManager(self)

Expand Down
124 changes: 124 additions & 0 deletions src/tribler-core/tribler_core/upgrade/db8_to_db10.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import contextlib
import datetime
import logging
import sqlite3
from asyncio import sleep

TABLE_NAMES = (
"ChannelNode", "TorrentState", "TorrentState_TrackerState", "ChannelPeer", "ChannelVote", "TrackerState", "Vsids")


class PonyToPonyMigration(object):

def __init__(self, old_db_path, new_db_path, notifier_callback=None, logger=None):
self._logger = logger or logging.getLogger(self.__class__.__name__)
self.notifier_callback = notifier_callback
self.old_db_path = old_db_path
self.new_db_path = new_db_path
self.shutting_down = False

async def update_convert_progress(self, amount, total, elapsed, message=""):
if self.notifier_callback:
elapsed = 0.0001 if elapsed == 0.0 else elapsed
amount = amount or 1
est_speed = amount / elapsed
eta = str(datetime.timedelta(seconds=int((total - amount) / est_speed)))
self.notifier_callback(
f"{message}\nConverted: {amount}/{total} ({(amount * 100) // total}%).\nTime remaining: {eta}"
)
await sleep(0.001)

async def convert_async(self, convert_command, total_to_convert, offset=0, message=""):
"""
This method copies entries from one Pony db into another one splitting the process into chunks dynamically.
Chunks splitting uses congestion-control-like algorithm. Awaits are necessary so the
reactor can get an opportunity at serving other tasks, such as sending progress notifications to
the GUI through the REST API.
"""
start_time = datetime.datetime.utcnow()
batch_size = 100

reference_timedelta = datetime.timedelta(milliseconds=1000)
while offset < total_to_convert:
if self.shutting_down:
break
end = offset + batch_size

batch_start_time = datetime.datetime.now()
convert_command(offset, batch_size)
batch_end_time = datetime.datetime.now() - batch_start_time

elapsed = (datetime.datetime.utcnow() - start_time).total_seconds()
await self.update_convert_progress(offset, total_to_convert, elapsed, message)
target_coeff = (batch_end_time.total_seconds() / reference_timedelta.total_seconds())
if target_coeff < 0.8:
batch_size += batch_size
elif target_coeff > 1.1:
batch_size = int(float(batch_size) / target_coeff)
# we want to guarantee that at least some entries will go through
batch_size = batch_size if batch_size > 10 else 10
self._logger.info("Converted: %i/%i %f ",
offset + batch_size, total_to_convert, float(batch_end_time.total_seconds()))
offset = end

def get_table_entries_count(self, cursor, table_name):
return cursor.execute(f"SELECT COUNT(*) FROM {table_name};").fetchone()[0]

async def convert_table(self, cursor, table_name, column_names):
def convert_command(offset, batch_size):
sql_command = None
try:
column_names_joined = ", ".join(column_names)
if "rowid" in column_names:
order_column = "rowid"
else:
order_column = column_names[0]
sql_command = f"INSERT OR IGNORE INTO {table_name} ({column_names_joined}) " + \
f"SELECT {column_names_joined} FROM old_db.{table_name} " + \
f"ORDER BY {order_column} " + \
f"LIMIT {batch_size} {('OFFSET ' + str(offset)) if offset else ''} ;"
cursor.execute(sql_command)
except Exception as e:
# Bail out and stop the upgrade process
self.shutting_down = True
self._logger.error("Error while executing conversion command: %s, SQL %s ", str(e), sql_command)

old_entries_count = self.get_table_entries_count(cursor, f"old_db.{table_name}")
await self.convert_async(convert_command, old_entries_count, message=f"Converting DB table {table_name}")

async def do_migration(self):

old_table_columns = {}
for table_name in TABLE_NAMES:
old_table_columns[table_name] = get_table_columns(self.old_db_path, table_name)

with contextlib.closing(sqlite3.connect(self.new_db_path)) as connection, connection:
cursor = connection.cursor()
cursor.execute("PRAGMA journal_mode = OFF;")
cursor.execute("PRAGMA synchronous = OFF;")
cursor.execute("PRAGMA foreign_keys = OFF;")
cursor.execute("PRAGMA temp_store = MEMORY;")
cursor.execute(f'ATTACH DATABASE "{self.old_db_path}" as old_db;')

for table_name in TABLE_NAMES:
cursor.execute("BEGIN TRANSACTION;")
if not self.shutting_down:
await self.convert_table(cursor, table_name, old_table_columns[table_name])
cursor.execute("COMMIT;")
self.notifier_callback("Synchronizing the upgraded DB to disk, please wait.")


def get_table_columns(db_path, table_name):
with contextlib.closing(sqlite3.connect(db_path)) as connection, connection:
cursor = connection.cursor()
cursor.execute(f'SELECT * FROM {table_name} LIMIT 1')
names = [description[0] for description in cursor.description]
return names


def get_db_version(db_path):
with contextlib.closing(sqlite3.connect(db_path)) as connection, connection:
cursor = connection.cursor()
cursor.execute('SELECT value FROM MiscData WHERE name == "db_version"')
version = int(cursor.fetchone()[0])
return version
20 changes: 17 additions & 3 deletions src/tribler-core/tribler_core/upgrade/tests/test_upgrader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from tribler_common.simpledefs import NTFY

from tribler_core.modules.metadata_store.orm_bindings.channel_metadata import CHANNEL_DIR_NAME_LENGTH
from tribler_core.modules.metadata_store.store import MetadataStore
from tribler_core.modules.metadata_store.store import CURRENT_DB_VERSION, MetadataStore
from tribler_core.tests.tools.common import TESTS_DATA_DIR
from tribler_core.upgrade.upgrade import cleanup_noncompliant_channel_torrents
from tribler_core.utilities.configparser import CallbackConfigParser
Expand Down Expand Up @@ -66,7 +66,7 @@ def test_upgrade_pony_db_6to7(upgrader, session):

def test_upgrade_pony_db_7to8(upgrader, session):
"""
Test that proper additionald index is created.
Test that proper additional index is created.
Also, check that the DB version is upgraded.
"""
old_db_sample = TESTS_DATA_DIR / 'upgrade_databases' / 'pony_v7.db'
Expand Down Expand Up @@ -98,7 +98,7 @@ async def test_upgrade_pony_db_complete(upgrader, session):
with db_session:
assert mds.TorrentMetadata.select().count() == 23
assert mds.ChannelMetadata.select().count() == 2
assert int(mds.MiscData.get(name="db_version").value) == 8
assert int(mds.MiscData.get(name="db_version").value) == CURRENT_DB_VERSION
assert list(mds._db.execute('PRAGMA index_info("idx_channelnode__metadata_type")'))
mds.shutdown()

Expand Down Expand Up @@ -140,3 +140,17 @@ def test_delete_noncompliant_state(tmpdir):
pstate = CallbackConfigParser()
pstate.read_file(file_path)
assert CHANNEL_DIR_NAME_LENGTH == len(pstate.get('state', 'metainfo')['info']['name'])

@pytest.mark.asyncio
async def test_upgrade_pony_8to10(upgrader, session):
old_db_sample = TESTS_DATA_DIR / 'upgrade_databases' / 'pony_v6.db'
database_path = session.config.get_state_dir() / 'sqlite' / 'metadata.db'
shutil.copyfile(old_db_sample, database_path)

await upgrader.run()
channels_dir = session.config.get_chant_channels_dir()
mds = MetadataStore(database_path, channels_dir, session.trustchain_keypair)
with db_session:
assert int(mds.MiscData.get(name="db_version").value) == CURRENT_DB_VERSION
assert mds.ChannelNode.select().count() == 23
mds.shutdown()
51 changes: 48 additions & 3 deletions src/tribler-core/tribler_core/upgrade/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@

from tribler_core.modules.category_filter.l2_filter import is_forbidden
from tribler_core.modules.metadata_store.orm_bindings.channel_metadata import CHANNEL_DIR_NAME_LENGTH
from tribler_core.modules.metadata_store.store import MetadataStore
from tribler_core.upgrade.config_converter import convert_config_to_tribler74, convert_config_to_tribler75, \
convert_config_to_tribler76
from tribler_core.modules.metadata_store.store import CURRENT_DB_VERSION, MetadataStore
from tribler_core.upgrade.config_converter import (
convert_config_to_tribler74,
convert_config_to_tribler75,
convert_config_to_tribler76,
)
from tribler_core.upgrade.db72_to_pony import DispersyToPonyMigration, cleanup_pony_experimental_db, should_upgrade
from tribler_core.upgrade.db8_to_db10 import PonyToPonyMigration, get_db_version
from tribler_core.utilities.configparser import CallbackConfigParser


Expand Down Expand Up @@ -66,12 +70,15 @@ def __init__(self, session):
self.failed = True

self._dtp72 = None
self._pony2pony = None
self.skip_upgrade_called = False

def skip(self):
self.skip_upgrade_called = True
if self._dtp72:
self._dtp72.shutting_down = True
if self._pony2pony:
self._pony2pony.shutting_down = True

async def run(self):
"""
Expand All @@ -84,10 +91,48 @@ async def run(self):
await self.upgrade_72_to_pony()
self.upgrade_pony_db_6to7()
self.upgrade_pony_db_7to8()
await self.upgrade_pony_db_8to10()
convert_config_to_tribler74(self.session.config.get_state_dir())
convert_config_to_tribler75(self.session.config.get_state_dir())
convert_config_to_tribler76(self.session.config.get_state_dir())

async def upgrade_pony_db_8to10(self):
"""
Upgrade GigaChannel DB from version 8 (7.5.x) to version 10 (7.6.x).
This will recreate the database anew, which can take quite some time.
The code is based on the copy-pasted upgrade_72_to_pony routine which is asynchronous and
reports progress to the user.
"""
database_path = self.session.config.get_state_dir() / 'sqlite' / 'metadata.db'
if not database_path.exists() or get_db_version(database_path) >= CURRENT_DB_VERSION:
# Either no old db exists, or the old db version is up to date - nothing to do
return

# Otherwise, start upgrading
self.notify_starting()
tmp_database_path = database_path.parent / 'metadata_upgraded.db'
# Clean the previous temp database
if tmp_database_path.exists():
tmp_database_path.unlink()
# Create the new database
mds = MetadataStore(tmp_database_path, None, self.session.trustchain_keypair, disable_sync=True)
mds.shutdown()

self._pony2pony = PonyToPonyMigration(database_path, tmp_database_path, self.update_status, logger=self._logger)

await self._pony2pony.do_migration()

# Remove the old DB
database_path.unlink()
if not self._pony2pony.shutting_down:
# Move the upgraded db in its place
tmp_database_path.rename(database_path)
else:
# The upgrade process was either skipped or interrupted. Delete the temp upgrade DB.
if tmp_database_path.exists():
tmp_database_path.unlink()
self.notify_done()

def upgrade_pony_db_7to8(self):
"""
Upgrade GigaChannel DB from version 7 (7.4.x) to version 8 (7.5.x).
Expand Down
22 changes: 12 additions & 10 deletions src/tribler-core/tribler_core/upgrade/version_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@
from distutils.version import LooseVersion
from pathlib import Path

from tribler_common.simpledefs import (
STATEDIR_CHANNELS_DIR,
STATEDIR_CHECKPOINT_DIR,
STATEDIR_DB_DIR,
)
from tribler_common.simpledefs import STATEDIR_CHANNELS_DIR, STATEDIR_CHECKPOINT_DIR, STATEDIR_DB_DIR

from tribler_core.utilities import json_util as json
from tribler_core.utilities.osutils import dir_copy
Expand Down Expand Up @@ -137,7 +133,7 @@ def copy_state_directory(src_dir, tgt_dir):
shutil.copy(src_dir / filename, tgt_dir / filename)


def fork_state_directory_if_necessary(root_state_dir, code_version):
def should_fork_state_directory(root_state_dir, code_version):
version_history = VersionHistory(root_state_dir / VERSION_HISTORY_FILE)
# The previous version has the same major/minor number as the code version, and there exists
# a corresponding versioned state directory. Nothing to do here (except possibly updating version history).
Expand All @@ -146,8 +142,8 @@ def fork_state_directory_if_necessary(root_state_dir, code_version):
and LooseVersion(version_history.last_version).version[:2] == LooseVersion(code_version).version[:2]
and code_version_dir.exists()):
if str(version_history.last_version) != str(code_version):
version_history.update(code_version)
return
return version_history, code_version, None, None
return None, None, None, None

src_dir = None
tgt_dir = code_version_dir
Expand All @@ -161,12 +157,18 @@ def fork_state_directory_if_necessary(root_state_dir, code_version):
# Legacy version
src_dir = root_state_dir

return version_history, code_version, src_dir, tgt_dir


def fork_state_directory_if_necessary(root_state_dir, code_version):
version_history, code_version, src_dir, tgt_dir = should_fork_state_directory(root_state_dir, code_version)
if src_dir is not None:
# Borderline case where user got an unused code version directory: rename it out of the way
if tgt_dir.exists():
if tgt_dir is not None and tgt_dir.exists():
moved_out_of_way_dirname = "unused_v" + str(tgt_dir.name) + "_" + datetime.now().strftime(
"%Y-%m-%d_%Hh%Mm%Ss")
tgt_dir.rename(tgt_dir.with_name(moved_out_of_way_dirname))
copy_state_directory(src_dir, tgt_dir)

version_history.update(code_version)
if version_history:
version_history.update(code_version)
2 changes: 1 addition & 1 deletion src/tribler-core/tribler_core/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version_id = "7.5.0-GIT"
version_id = "7.6.0-GIT"
build_date = "Mon Jan 01 00:00:01 1970"
commit_id = "none"
sentry_url = ""
10 changes: 10 additions & 0 deletions src/tribler-gui/tribler_gui/core_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

from tribler_common.utilities import is_frozen

from tribler_core.upgrade.version_manager import should_fork_state_directory
from tribler_core.utilities.osutils import get_root_state_directory
from tribler_core.version import version_id

from tribler_gui.event_request_manager import EventRequestManager
from tribler_gui.tribler_request_manager import TriblerNetworkRequest
from tribler_gui.utilities import get_base_path
Expand Down Expand Up @@ -86,6 +90,12 @@ def on_request_error(_):

self.events_manager.connect()
self.events_manager.reply.error.connect(on_request_error)
# This is a hack to determine if we have notify the user to wait for the directory fork to finish
_, _, src_dir, tgt_dir = should_fork_state_directory(get_root_state_directory(), version_id)
if src_dir is not None:
# There is going to be a directory fork, so we extend the core connection timeout and notify the user
self.events_manager.remaining_connection_attempts = 1200
self.events_manager.change_loading_text.emit("Copying data from previous Tribler version, please wait")

def start_tribler_core(self, core_args=None, core_env=None):
if not START_FAKE_API:
Expand Down

0 comments on commit abec721

Please sign in to comment.