Skip to content

Commit

Permalink
[BEAM-12174] Propertly close ExecutableStageContext in DoFnOp#close (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kw2542 committed Apr 29, 2021
1 parent e284374 commit 3eb3dc9
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
private transient BundleManager<OutT> bundleManager;
private transient Instant sideInputWatermark;
private transient List<WindowedValue<InT>> pushbackValues;
private transient ExecutableStageContext stageContext;
private transient StageBundleFactory stageBundleFactory;
private DoFnSchemaInformation doFnSchemaInformation;
private transient boolean bundleDisabled;
Expand Down Expand Up @@ -222,8 +223,7 @@ public void open(
.stateInternalsForKey(null)
.state(StateNamespaces.global(), StateTags.bag(bundleStateId, windowedValueCoder));
final ExecutableStage executableStage = ExecutableStage.fromPayload(stagePayload);
final ExecutableStageContext stageContext =
SamzaExecutableStageContextFactory.getInstance().get(jobInfo);
stageContext = SamzaExecutableStageContextFactory.getInstance().get(jobInfo);
stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
this.fnRunner =
SamzaDoFnRunners.createPortable(
Expand Down Expand Up @@ -396,7 +396,8 @@ public void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> em
@Override
public void close() {
doFnInvoker.invokeTeardown();
try (AutoCloseable closer = stageBundleFactory) {
try (AutoCloseable factory = stageBundleFactory;
AutoCloseable context = stageContext) {
// do nothing
} catch (Exception e) {
LOG.error("Failed to close stage bundle factory", e);
Expand Down

0 comments on commit 3eb3dc9

Please sign in to comment.