[BEAM-2718] Improve performance of bundle retry#3815
[BEAM-2718] Improve performance of bundle retry#3815mariapython wants to merge 4 commits intoapache:masterfrom
Conversation
eba397b to
8650ea6
Compare
| cloned_object.global_state = copy.deepcopy(self.global_state) | ||
| cloned_object.state = copy.copy(self.state) | ||
| for window in self.state: | ||
| cloned_object.state[window] = copy.copy(self.state[window]) |
There was a problem hiding this comment.
This can just be an empty dictionary.
| def custom_copy(self): | ||
| cloned_object = copy.copy(self) | ||
| cloned_object.timers = copy.deepcopy(self.timers) | ||
| cloned_object.global_state = copy.deepcopy(self.global_state) |
There was a problem hiding this comment.
We should do similar logic to the inner loop below here too.
| cloned_object = copy.copy(self) | ||
| cloned_object.timers = copy.deepcopy(self.timers) | ||
| cloned_object.global_state = copy.deepcopy(self.global_state) | ||
| cloned_object.state = copy.copy(self.state) |
There was a problem hiding this comment.
We can just make this an empty dictionary.
| self.global_state = {} | ||
| self.defensive_copy = defensive_copy | ||
|
|
||
| def custom_copy(self): |
There was a problem hiding this comment.
Let's just rename this to clone().
| if not self.partial_keyed_state.get(key): | ||
| self.partial_keyed_state[key] = self.existing_keyed_state[key].clone() | ||
| self.partial_keyed_state[key] = ( | ||
| self.existing_keyed_state[key].custom_copy()) |
There was a problem hiding this comment.
Let's just rename this to clone().
8650ea6 to
320f609
Compare
| cloned_object = copy.copy(self) | ||
| cloned_object.timers = copy.deepcopy(self.timers) | ||
| cloned_object.global_state = copy.deepcopy(self.global_state) | ||
| cloned_object.state = copy.copy(self.state) |
There was a problem hiding this comment.
charlesccychen wrote:
We can just make this an empty dictionary.
Done.
| self.global_state = {} | ||
| self.defensive_copy = defensive_copy | ||
|
|
||
| def custom_copy(self): |
There was a problem hiding this comment.
charlesccychen wrote:
Let's just rename this toclone().
See my comments above.
| if not self.partial_keyed_state.get(key): | ||
| self.partial_keyed_state[key] = self.existing_keyed_state[key].clone() | ||
| self.partial_keyed_state[key] = ( | ||
| self.existing_keyed_state[key].custom_copy()) |
There was a problem hiding this comment.
charlesccychen wrote:
Let's just rename this toclone().
I preferred to avoid that name, as it was not really a deep copy. I changed it to copy().
| cloned_object.global_state = copy.deepcopy(self.global_state) | ||
| cloned_object.state = copy.copy(self.state) | ||
| for window in self.state: | ||
| cloned_object.state[window] = copy.copy(self.state[window]) |
There was a problem hiding this comment.
charlesccychen wrote:
This can just be an empty dictionary.
Done.
| def custom_copy(self): | ||
| cloned_object = copy.copy(self) | ||
| cloned_object.timers = copy.deepcopy(self.timers) | ||
| cloned_object.global_state = copy.deepcopy(self.global_state) |
There was a problem hiding this comment.
charlesccychen wrote:
We should do similar logic to the inner loop below here too.
global_state doesn't appear to be used by the transform evaluators. In fact, I was thinking of eliminating the line completely (same reason why defensive_copy is not copied).
mariapython
left a comment
There was a problem hiding this comment.
mariapython wrote:
PTAL
Done.
320f609 to
3f1a714
Compare
| cloned_object.state = ( | ||
| collections.defaultdict(lambda: collections.defaultdict(list))) | ||
| for window in self.state: | ||
| cloned_object.state[window] = collections.defaultdict(list) |
There was a problem hiding this comment.
You can omit this line--the dict is created for you by the defaultdict mechanism on cloned_object.state.
| cloned_object.timers = copy.deepcopy(self.timers) | ||
| cloned_object.global_state = copy.deepcopy(self.global_state) | ||
| cloned_object.state = ( | ||
| collections.defaultdict(lambda: collections.defaultdict(list))) |
There was a problem hiding this comment.
You can remove these two lines if you use the approach in the comment above.
| self.defensive_copy = defensive_copy | ||
|
|
||
| def copy(self): | ||
| cloned_object = copy.copy(self) |
There was a problem hiding this comment.
Let's omit the copy and just create a new object:
cloned_object = InMemoryUnmergedState(defensive_copy=self.defensive_copy)3f1a714 to
585d8f9
Compare
| cloned_object.timers = copy.deepcopy(self.timers) | ||
| cloned_object.global_state = copy.deepcopy(self.global_state) | ||
| cloned_object.state = ( | ||
| collections.defaultdict(lambda: collections.defaultdict(list))) |
There was a problem hiding this comment.
charlesccychen wrote:
You can remove these two lines if you use the approach in the comment above.
Done.
| cloned_object.state = ( | ||
| collections.defaultdict(lambda: collections.defaultdict(list))) | ||
| for window in self.state: | ||
| cloned_object.state[window] = collections.defaultdict(list) |
There was a problem hiding this comment.
charlesccychen wrote:
You can omit this line--the dict is created for you by the defaultdict mechanism oncloned_object.state.
Done.
| self.defensive_copy = defensive_copy | ||
|
|
||
| def copy(self): | ||
| cloned_object = copy.copy(self) |
There was a problem hiding this comment.
charlesccychen wrote:
Let's omit the copy and just create a new object:cloned_object = InMemoryUnmergedState(defensive_copy=self.defensive_copy)
Done.
charlesccychen
left a comment
There was a problem hiding this comment.
Thanks!
R: @chamikaramj for merge
In order to keep a consistent state while handling bundles, every time the state is accessed, a copy is made and kept until the bundle is committed. Here a custom copy is used replicating only the inner structures that are needed and shallow-copying the rest.