forked from ethereum/py-evm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
atomic.py
127 lines (101 loc) · 3.8 KB
/
atomic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from contextlib import contextmanager
import logging
from typing import Generator
from eth_utils import (
ValidationError,
)
from eth.db.diff import (
DBDiff,
DBDiffTracker,
DiffMissingError,
)
from eth.db.backends.base import BaseDB, BaseAtomicDB
from eth.db.backends.memory import MemoryDB
class AtomicDB(BaseAtomicDB):
"""
This is nearly the same as BatchDB, but it immediately writes out changes if they are
not in an atomic_batch() context.
"""
logger = logging.getLogger("eth.db.AtomicDB")
wrapped_db = None # type: BaseDB
_track_diff = None # type: DBDiffTracker
def __init__(self, wrapped_db: BaseDB = None) -> None:
if wrapped_db is None:
self.wrapped_db = MemoryDB()
else:
self.wrapped_db = wrapped_db
def __getitem__(self, key: bytes) -> bytes:
return self.wrapped_db[key]
def __setitem__(self, key: bytes, value: bytes) -> None:
self.wrapped_db[key] = value
def __delitem__(self, key: bytes) -> None:
del self.wrapped_db[key]
def _exists(self, key: bytes) -> bool:
return key in self.wrapped_db
@contextmanager
def atomic_batch(self) -> Generator['AtomicDBWriteBatch', None, None]:
with AtomicDBWriteBatch.commit_unexceptional(self) as readable_batch:
yield readable_batch
class AtomicDBWriteBatch(BaseDB):
"""
This is returned by a BaseAtomicDB during an atomic_batch, to provide a temporary view
of the database, before commit.
"""
logger = logging.getLogger("eth.db.AtomicDBWriteBatch")
_write_target_db = None # type: BaseDB
_track_diff = None # type: DBDiffTracker
def __init__(self, _write_target_db: BaseDB) -> None:
self._write_target_db = _write_target_db
self._track_diff = DBDiffTracker()
def __getitem__(self, key: bytes) -> bytes:
if self._track_diff is None:
raise ValidationError("Cannot get data from a write batch, out of context")
try:
value = self._track_diff[key]
except DiffMissingError as missing:
if missing.is_deleted:
raise KeyError(key)
else:
return self._write_target_db[key]
else:
return value
def __setitem__(self, key: bytes, value: bytes) -> None:
if self._track_diff is None:
raise ValidationError("Cannot set data from a write batch, out of context")
self._track_diff[key] = value
def __delitem__(self, key: bytes) -> None:
if self._track_diff is None:
raise ValidationError("Cannot delete data from a write batch, out of context")
if key not in self:
raise KeyError(key)
del self._track_diff[key]
def _diff(self) -> DBDiff:
return self._track_diff.diff()
def _commit(self) -> None:
self._diff().apply_to(self._write_target_db, apply_deletes=True)
def _exists(self, key: bytes) -> bool:
if self._track_diff is None:
raise ValidationError("Cannot test data existance from a write batch, out of context")
try:
self[key]
except KeyError:
return False
else:
return True
@classmethod
@contextmanager
def commit_unexceptional(cls, write_target_db):
readable_write_batch = cls(write_target_db)
try:
yield readable_write_batch
except Exception:
cls.logger.exception(
"Unexpected error in atomic db write, dropped partial writes: %r",
readable_write_batch._diff(),
)
raise
else:
readable_write_batch._commit()
finally:
# force a shutdown of this batch, to prevent out-of-context usage
readable_write_batch._track_diff = None