[BEAM-2718] Add test to fix partial writeouts after a bundle retry#3833
[BEAM-2718] Add test to fix partial writeouts after a bundle retry#3833mariapython wants to merge 5 commits intoapache:masterfrom
Conversation
438e576 to
af94b7b
Compare
|
|
||
| def start_bundle(self): | ||
| self.step_context = self._execution_context.get_step_context() | ||
| self.step_context.clear_partial_states() |
There was a problem hiding this comment.
I think it would be better to have a reset() method on _ExecutionContext that can be called to clear the step context and remove clear_partial_states(). We can then call this in attempt_call().
| from collections import defaultdict | ||
| from apache_beam.runners.direct.transform_evaluator import _TransformEvaluator | ||
| from apache_beam.runners.direct.transform_evaluator import _GroupByKeyOnlyEvaluator | ||
| from apache_beam.runners.direct.evaluation_context import _ExecutionContext |
There was a problem hiding this comment.
These can be at the top of the file (underscore imports are ok in tests).
| self.partial_keyed_state[key] = self.existing_keyed_state[key].clone() | ||
| return self.partial_keyed_state[key] | ||
|
|
||
| def clear_partial_states(self): |
There was a problem hiding this comment.
I think it would be better to have a reset() method on _ExecutionContext that can be called to clear the step context and remove clear_partial_states(). We can then call this in attempt_call().
af94b7b to
3000042
Compare
| from collections import defaultdict | ||
| from apache_beam.runners.direct.transform_evaluator import _TransformEvaluator | ||
| from apache_beam.runners.direct.transform_evaluator import _GroupByKeyOnlyEvaluator | ||
| from apache_beam.runners.direct.evaluation_context import _ExecutionContext |
There was a problem hiding this comment.
charlesccychen wrote:
These can be at the top of the file (underscore imports are ok in tests).
Done.
| self.partial_keyed_state[key] = self.existing_keyed_state[key].clone() | ||
| return self.partial_keyed_state[key] | ||
|
|
||
| def clear_partial_states(self): |
There was a problem hiding this comment.
charlesccychen wrote:
I think it would be better to have areset()method on_ExecutionContextthat can be called to clear the step context and removeclear_partial_states(). We can then call this inattempt_call().
Done.
|
|
||
| def start_bundle(self): | ||
| self.step_context = self._execution_context.get_step_context() | ||
| self.step_context.clear_partial_states() |
There was a problem hiding this comment.
charlesccychen wrote:
I think it would be better to have areset()method on_ExecutionContextthat can be called to clear the step context and removeclear_partial_states(). We can then call this inattempt_call().
Done.
|
Retest this please |
1 similar comment
|
Retest this please |
charlesccychen
left a comment
There was a problem hiding this comment.
Thanks!
At sdks/python/apache_beam/runners/direct/evaluation_context.py:344:
self.existing_keyed_state[key].copy())This can fit on just one line.
| side_input_values, scoped_metrics_container) | ||
| evaluator._execution_context.reset() | ||
| if hasattr(evaluator, 'step_context'): | ||
| evaluator.global_state = evaluator.step_context.get_keyed_state(None) |
There was a problem hiding this comment.
Why do we set the global_state property here? The evaluator should be responsible for doing this.
| return self._step_context | ||
|
|
||
| def reset(self): | ||
| if self._step_context: |
There was a problem hiding this comment.
This method can be simplified to:
# Reset step context, which may contain partial state.
self._step_context = None3000042 to
2bc3b24
Compare
mariapython
left a comment
There was a problem hiding this comment.
At sdks/python/apache_beam/runners/direct/evaluation_context.py:344:
self.existing_keyed_state[key].copy())charlesccychen wrote:
This can fit on just one line.
Done.
| side_input_values, scoped_metrics_container) | ||
| evaluator._execution_context.reset() | ||
| if hasattr(evaluator, 'step_context'): | ||
| evaluator.global_state = evaluator.step_context.get_keyed_state(None) |
There was a problem hiding this comment.
charlesccychen wrote:
Why do we set theglobal_stateproperty here? The evaluator should be responsible for doing this.
Done.
| return self._step_context | ||
|
|
||
| def reset(self): | ||
| if self._step_context: |
There was a problem hiding this comment.
charlesccychen wrote:
This method can be simplified to:# Reset step context, which may contain partial state. self._step_context = None
Done.
|
Changes Unknown when pulling 2bc3b24 on mariapython:retry_tests into ** on apache:master**. |
| if not self.partial_keyed_state.get(key): | ||
| self.partial_keyed_state[key] = ( | ||
| self.existing_keyed_state[key].copy()) | ||
| self.partial_keyed_state[key] = (self.existing_keyed_state[key].copy()) |
There was a problem hiding this comment.
No need for extra parentheses.
| return self._step_context | ||
|
|
||
| def reset(self): | ||
| if self._step_context: |
There was a problem hiding this comment.
You don't need the if here.
|
|
||
| def __init__(self): | ||
| self._execution_context = _ExecutionContext(None, {}) | ||
| self._execution_context.get_step_context().get_keyed_state(None) |
There was a problem hiding this comment.
Why do we do this here? Can you clarify or comment?
| from collections import defaultdict | ||
| from apache_beam.runners.direct.transform_evaluator import _TransformEvaluator | ||
| from apache_beam.runners.direct.transform_evaluator import _GroupByKeyOnlyEvaluator | ||
| from apache_beam.runners.direct.evaluation_context import _ExecutionContext |
There was a problem hiding this comment.
mariapython wrote:
Done.
I don't see the change here.
2bc3b24 to
be5e425
Compare
| if not self.partial_keyed_state.get(key): | ||
| self.partial_keyed_state[key] = ( | ||
| self.existing_keyed_state[key].copy()) | ||
| self.partial_keyed_state[key] = (self.existing_keyed_state[key].copy()) |
There was a problem hiding this comment.
charlesccychen wrote:
No need for extra parentheses.
Done.
| return self._step_context | ||
|
|
||
| def reset(self): | ||
| if self._step_context: |
There was a problem hiding this comment.
charlesccychen wrote:
You don't need the if here.
Done.
|
|
||
| def __init__(self): | ||
| self._execution_context = _ExecutionContext(None, {}) | ||
| self._execution_context.get_step_context().get_keyed_state(None) |
There was a problem hiding this comment.
charlesccychen wrote:
Why do we do this here? Can you clarify or comment?
Done.
| from collections import defaultdict | ||
| from apache_beam.runners.direct.transform_evaluator import _TransformEvaluator | ||
| from apache_beam.runners.direct.transform_evaluator import _GroupByKeyOnlyEvaluator | ||
| from apache_beam.runners.direct.evaluation_context import _ExecutionContext |
There was a problem hiding this comment.
charlesccychen wrote:
I don't see the change here.
Done.
No description provided.