Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to explicit atomic_batch() database handle #1304

Merged
merged 3 commits into from Sep 18, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
98 changes: 63 additions & 35 deletions eth/db/atomic.py
@@ -1,6 +1,5 @@
from contextlib import contextmanager
import logging
import threading
from typing import Generator

from eth_utils import (
Expand All @@ -19,7 +18,7 @@
class AtomicDB(BaseAtomicDB):
"""
This is nearly the same as BatchDB, but it immediately writes out changes if they are
not in a batch_write() context.
not in an atomic_batch() context.
"""
logger = logging.getLogger("eth.db.AtomicDB")

Expand All @@ -31,69 +30,98 @@ def __init__(self, wrapped_db: BaseDB = None) -> None:
self.wrapped_db = MemoryDB()
else:
self.wrapped_db = wrapped_db
self._track_diff = DBDiffTracker()
self._batch_lock = threading.Lock()

def __getitem__(self, key: bytes) -> bytes:
return self.wrapped_db[key]

def __setitem__(self, key: bytes, value: bytes) -> None:
self.wrapped_db[key] = value

def __delitem__(self, key: bytes) -> None:
del self.wrapped_db[key]

def _exists(self, key: bytes) -> bool:
return key in self.wrapped_db

@contextmanager
def atomic_batch(self) -> Generator[None, None, None]:
if self._batch_lock.locked():
raise ValidationError("AtomicDB does not support recursive batching of writes")
def atomic_batch(self) -> Generator['AtomicDBWriteBatch', None, None]:
with AtomicDBWriteBatch.commit_unexceptional(self) as readable_batch:
yield readable_batch

try:
with self._batch_lock:
yield
except Exception:
self.logger.exception(
"Unexpected error in atomic db write, dropped partial writes: %r",
self._diff(),
)
self._clear()
raise
else:
self._commit()

class AtomicDBWriteBatch(BaseDB):
"""
This is returned by a BaseAtomicDB during an atomic_batch, to provide a temporary view
of the database, before commit.
"""
logger = logging.getLogger("eth.db.AtomicDBWriteBatch")

_write_target_db = None # type: BaseDB
_track_diff = None # type: DBDiffTracker

def __init__(self, _write_target_db: BaseDB) -> None:
self._write_target_db = _write_target_db
self._track_diff = DBDiffTracker()

def __getitem__(self, key: bytes) -> bytes:
if not self._batch_lock.locked():
return self.wrapped_db[key]
if self._track_diff is None:
raise ValidationError("Cannot get data from a write batch, out of context")

try:
value = self._track_diff[key]
except DiffMissingError as missing:
if missing.is_deleted:
raise KeyError(key)
else:
return self.wrapped_db[key]
return self._write_target_db[key]
else:
return value

def __setitem__(self, key: bytes, value: bytes) -> None:
if self._batch_lock.locked():
self._track_diff[key] = value
else:
self.wrapped_db[key] = value
if self._track_diff is None:
raise ValidationError("Cannot set data from a write batch, out of context")

self._track_diff[key] = value

def __delitem__(self, key: bytes) -> None:
if self._track_diff is None:
raise ValidationError("Cannot delete data from a write batch, out of context")

if key not in self:
raise KeyError(key)
if self._batch_lock.locked():
del self._track_diff[key]
else:
del self.wrapped_db[key]
del self._track_diff[key]

def _diff(self) -> DBDiff:
return self._track_diff.diff()

def _clear(self):
self._track_diff = DBDiffTracker()

def _commit(self) -> None:
self._diff().apply_to(self.wrapped_db, apply_deletes=True)
self._clear()
self._diff().apply_to(self._write_target_db, apply_deletes=True)

def _exists(self, key: bytes) -> bool:
if self._track_diff is None:
raise ValidationError("Cannot test data existance from a write batch, out of context")

try:
self[key]
except KeyError:
return False
else:
return True

@classmethod
@contextmanager
def commit_unexceptional(cls, write_target_db):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is odd, but am I correct that it's an internal API. If so, then I don't really care that much other than maybe prefixing with an _ to make that clear.

readable_write_batch = cls(write_target_db)
try:
yield readable_write_batch
except Exception:
cls.logger.exception(
"Unexpected error in atomic db write, dropped partial writes: %r",
readable_write_batch._diff(),
)
raise
else:
readable_write_batch._commit()
finally:
# force a shutdown of this batch, to prevent out-of-context usage
readable_write_batch._track_diff = None
4 changes: 2 additions & 2 deletions eth/db/backends/base.py
Expand Up @@ -64,8 +64,8 @@ class BaseAtomicDB(BaseDB):

::

db = AtomicDB()
with db.atomic_batch():
atomic_db = AtomicDB()
with atomic_db.atomic_batch() as db:
# changes are not immediately saved to the db, inside this context
db[key] = val

Expand Down
97 changes: 87 additions & 10 deletions eth/db/backends/level.py
@@ -1,12 +1,25 @@
from contextlib import contextmanager
import logging
from pathlib import Path
from typing import Generator
from typing import (
Generator,
TYPE_CHECKING,
)

from eth_utils import ValidationError

from eth.db.diff import (
DBDiffTracker,
DiffMissingError,
)
from .base import (
BaseAtomicDB,
BaseDB,
)

if TYPE_CHECKING:
import plyvel # noqa: F401


class LevelDB(BaseAtomicDB):
logger = logging.getLogger("eth.db.backends.LevelDB")
Expand All @@ -16,10 +29,11 @@ def __init__(self, db_path: Path = None) -> None:
if not db_path:
raise TypeError("Please specifiy a valid path for your database.")
try:
import plyvel
import plyvel # noqa: F811
except ImportError:
raise ImportError("LevelDB requires the plyvel \
library which is not available for import.")
raise ImportError(
"LevelDB requires the plyvel library which is not available for import."
)
self.db_path = db_path
self.db = plyvel.DB(str(db_path), create_if_missing=True, error_if_exists=False)

Expand All @@ -39,10 +53,73 @@ def __delitem__(self, key: bytes) -> None:
self.db.delete(key)

@contextmanager
def atomic_batch(self) -> Generator[None, None, None]:
with self.db.write_batch(transaction=True):
def atomic_batch(self) -> Generator['LevelDBWriteBatch', None, None]:
with self.db.write_batch(transaction=True) as atomic_batch:
readable_batch = LevelDBWriteBatch(self, atomic_batch)
try:
yield
except Exception:
self.logger.exception("Unexpected error occurred during atomic database write")
raise
yield readable_batch
finally:
readable_batch.shutdown()


class LevelDBWriteBatch(BaseDB):
"""
A native leveldb write batch does not permit reads on the in-progress data.
This class fills that gap, by tracking the in-progress diff, and adding
a read interface.
"""
logger = logging.getLogger("eth.db.backends.LevelDBWriteBatch")

def __init__(self, original_read_db: BaseDB, write_batch: 'plyvel.WriteBatch') -> None:
self._original_read_db = original_read_db
self._write_batch = write_batch
# keep track of the temporary changes made
self._track_diff = DBDiffTracker()

def __getitem__(self, key: bytes) -> bytes:
if self._track_diff is None:
raise ValidationError("Cannot get data from a write batch, out of context")

try:
changed_value = self._track_diff[key]
except DiffMissingError as missing:
if missing.is_deleted:
raise KeyError(key)
else:
return self._original_read_db[key]
else:
return changed_value

def __setitem__(self, key: bytes, value: bytes) -> None:
if self._track_diff is None:
raise ValidationError("Cannot set data from a write batch, out of context")

self._write_batch.put(key, value)
self._track_diff[key] = value

def _exists(self, key: bytes) -> bool:
if self._track_diff is None:
raise ValidationError("Cannot test data existance from a write batch, out of context")

try:
self._track_diff[key]
except DiffMissingError as missing:
if missing.is_deleted:
return False
else:
return key in self._original_read_db
else:
return True

def __delitem__(self, key: bytes) -> None:
if self._track_diff is None:
raise ValidationError("Cannot delete data from a write batch, out of context")

self._write_batch.delete(key)
del self._track_diff[key]

def shutdown(self) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename to decomission

"""
Prevent any further actions to be taken on this write batch, called after leaving context
"""
self._track_diff = None