-
Notifications
You must be signed in to change notification settings - Fork 174
/
objects.py
198 lines (157 loc) · 5.92 KB
/
objects.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
"""Storing objects in tables.
This is also used to store data structures such as sets/lists.
"""
import abc
from typing import (
Any,
Callable,
ClassVar,
Dict,
Iterable,
MutableMapping,
Optional,
Set,
Type,
)
from mode import Service
from faust.stores.base import Store
from faust.streams import current_event
from faust.types import TP, EventT
from faust.types.stores import StoreT
from faust.types.tables import CollectionT
from .table import Table
class ChangeloggedObject:
"""A changelogged object in a :class:`ChangeloggedObjectManager` store."""
manager: "ChangeloggedObjectManager"
def __init__(self, manager: "ChangeloggedObjectManager", key: Any) -> None:
self.manager = manager
self.key = key
self.__post_init__()
def __post_init__(self) -> None: # pragma: no cover
...
@abc.abstractmethod
def sync_from_storage(self, value: Any) -> None:
"""Sync value from storage."""
...
@abc.abstractmethod
def as_stored_value(self) -> Any:
"""Return value as represented in storage."""
...
@abc.abstractmethod
def apply_changelog_event(self, operation: int, value: Any) -> None:
"""Apply event in changelog topic to local table state."""
...
class ChangeloggedObjectManager(Store):
"""Store of changelogged objects."""
ValueType: ClassVar[Type[ChangeloggedObject]]
table: Table
data: MutableMapping
_storage: Optional[StoreT] = None
_dirty: Set
def __init__(self, table: Table, **kwargs: Any) -> None:
self.table = table
self.table_name = self.table.name
self.data = {}
self._dirty = set()
Service.__init__(self, **kwargs)
def send_changelog_event(self, key: Any, operation: int, value: Any) -> None:
"""Send changelog event to the tables changelog topic."""
event = current_event()
self._dirty.add(key)
self.table._send_changelog(event, (operation, key), value)
def __getitem__(self, key: Any) -> ChangeloggedObject:
if key in self.data:
return self.data[key]
s = self.data[key] = self.ValueType(self, key)
return s
def __setitem__(self, key: Any, value: Any) -> None:
raise NotImplementedError(f"{self._table_type_name}: cannot set key")
def __delitem__(self, key: Any) -> None:
raise NotImplementedError(f"{self._table_type_name}: cannot del key")
@property
def _table_type_name(self) -> str:
return f"{type(self.table).__name__}"
async def on_start(self) -> None:
"""Call when the changelogged object manager starts."""
await self.add_runtime_dependency(self.storage)
async def on_stop(self) -> None:
"""Call when the changelogged object manager stops."""
self.flush_to_storage()
def persisted_offset(self, tp: TP) -> Optional[int]:
"""Get the last persisted offset for changelog topic partition."""
return self.storage.persisted_offset(tp)
def set_persisted_offset(self, tp: TP, offset: int) -> None:
"""Set the last persisted offset for changelog topic partition."""
self.storage.set_persisted_offset(tp, offset)
async def on_rebalance(
self,
table: CollectionT,
assigned: Set[TP],
revoked: Set[TP],
newly_assigned: Set[TP],
) -> None:
"""Call when cluster is rebalancing."""
await self.storage.on_rebalance(table, assigned, revoked, newly_assigned)
async def on_recovery_completed(
self, active_tps: Set[TP], standby_tps: Set[TP]
) -> None:
"""Call when table recovery is completed after rebalancing."""
self.sync_from_storage()
def sync_from_storage(self) -> None:
"""Sync set contents from storage."""
for key, value in self.storage.items():
self[key].sync_from_storage(value)
def flush_to_storage(self) -> None:
"""Flush set contents to storage."""
for key in self._dirty:
self.storage[key] = self.data[key].as_stored_value()
self._dirty.clear()
@Service.task
async def _periodic_flush(self) -> None: # pragma: no cover
async for sleep_time in self.itertimer(2.0, name="SetManager.flush"):
await self.sleep(sleep_time)
self.flush_to_storage()
def reset_state(self) -> None:
"""Reset table local state."""
# delegate to underlying RocksDB store.
self.storage.reset_state()
@property
def storage(self) -> StoreT:
"""Return underlying storage used by this set table."""
if self._storage is None:
self._storage = self.table._new_store_by_url(
self.table._store or self.table.app.conf.store
)
return self._storage
def apply_changelog_batch(
self,
batch: Iterable[EventT],
to_key: Callable[[Any], Any],
to_value: Callable[[Any], Any],
) -> None:
"""Apply batch of changelog events to local state."""
tp_offsets: Dict[TP, int] = {}
for event in batch:
tp, offset = event.message.tp, event.message.offset
tp_offsets[tp] = (
offset if tp not in tp_offsets else max(offset, tp_offsets[tp])
)
if event.key is None:
raise RuntimeError("Changelog key cannot be None")
operation, key = event.key
key = to_key(key)
value: Any = to_value(event.value)
self[key].apply_changelog_event(operation, value)
for tp, offset in tp_offsets.items():
self.set_persisted_offset(tp, offset)
async def backup_partition(
self, tp, flush: bool = True, purge: bool = False, keep: int = 1
) -> None:
raise NotImplementedError
def restore_backup(
self,
tp,
latest: bool = True,
backup_id: int = 0,
) -> None:
raise NotImplementedError