Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up Cancel Exception Handling. #3891

Merged
merged 9 commits into from
Jun 5, 2021

Conversation

davinchia
Copy link
Contributor

@davinchia davinchia commented Jun 4, 2021

What

As part of working on the Kube cancelling, I realised our exception handling for the cancel operation is noisy and confusing. It was quite scary to me as I tried to debug why Kube was not happening. Decided to clean this up as is before nailing down Kube cancel behaviour.

This PR tries to clean this up. Unnecessary exceptions have been removed. Exceptions that happen on cancel have been converted into log lines that state this exception is typically thrown on cancel.

Overall this felt much cleaner after I was done with this. However, this isn't a 'standard' PR since the bulk of the changes are judgement based, so feel free to push back on this.

Logs Before Changes

�[32mINFO�[m i.a.i.b.IntegrationRunner(run):82 - {} - Command: READ
2021-06-04 12:53:36 INFO (/tmp/workspace/39/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:53:36 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):83 - {} - Integration config: IntegrationConfig{command=READ, configPath='source_config.json', catalogPath='source_catalog.json', statePath='input_state.json'}
2021-06-04 12:53:37 INFO (/tmp/workspace/39/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:53:36 �[32mINFO�[m i.a.i.s.p.PostgresSource(isCdc):277 - {} - using CDC: false
2021-06-04 12:53:37 INFO (/tmp/workspace/39/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:53:37 �[32mINFO�[m i.a.i.s.j.AbstractJdbcSource(lambda$getCheckOperations$1):150 - {} - Attempting to get metadata from the database to see if we can connect.
2021-06-04 12:53:38 INFO (/tmp/workspace/39/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:53:38 �[32mINFO�[m i.a.i.d.l.LocalJsonDestination$JsonConsumer(<init>):158 - {} - initializing consumer.
2021-06-04 12:53:41 INFO (/tmp/workspace/39/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:53:41 �[32mINFO�[m i.a.i.s.j.JdbcCdcStateManager(<init>):46 - {} - Initialized CDC state with: null
2021-06-04 12:53:41 INFO (/tmp/workspace/39/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:53:41 �[32mINFO�[m i.a.i.s.j.JdbcStateManager(createCursorInfoForStream):138 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='med_table', namespace='public'}, New Cursor Field: null. Resetting cursor value
2021-06-04 12:53:57 INFO (/tmp/workspace/39/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:53:57 �[32mINFO�[m i.a.i.s.p.PostgresSource(isCdc):277 - {} - using CDC: false
2021-06-04 12:53:57 INFO (/tmp/workspace/39/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:53:57 �[32mINFO�[m i.a.i.s.j.AbstractJdbcSource(queryTableFullRefresh):518 - {} - Queueing query for table: med_table
2021-06-04 12:54:01 INFO (/tmp/workspace/39/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:54:01 �[32mINFO�[m i.a.i.s.j.AbstractJdbcSource(lambda$queryTableFullRefresh$29):523 - {} - Preparing query for table: med_table
2021-06-04 12:54:01 INFO (/tmp/workspace/39/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:54:01 �[32mINFO�[m i.a.i.s.j.AbstractJdbcSource(lambda$queryTableFullRefresh$29):528 - {} - Executing query for table: med_table
2021-06-04 12:54:25 INFO (/tmp/workspace/39/0) TemporalAttemptExecution(lambda$getCancellationChecker$3):162 - Running sync worker cancellation...
2021-06-04 12:54:25 INFO (/tmp/workspace/39/0) DefaultReplicationWorker(cancel):246 - Cancelling replication worker...
2021-06-04 12:54:25 INFO (/tmp/workspace/39/0) DefaultReplicationWorker(cancel):249 - Cancelling source...
2021-06-04 12:54:25 INFO (/tmp/workspace/39/0) DefaultAirbyteSource(cancel):136 - Attempting to cancel source process...
2021-06-04 12:54:25 INFO (/tmp/workspace/39/0) DefaultAirbyteSource(cancel):141 - Source process exists, cancelling...
2021-06-04 12:54:25 INFO (/tmp/workspace/39/0) DefaultAirbyteSource(cancel):143 - Cancelled source process!
2021-06-04 12:54:25 INFO (/tmp/workspace/39/0) DefaultReplicationWorker(cancel):256 - Cancelling destination...
2021-06-04 12:54:25 INFO (/tmp/workspace/39/0) DefaultAirbyteDestination(cancel):132 - Attempting to cancel destination process...
2021-06-04 12:54:25 INFO (/tmp/workspace/39/0) DefaultAirbyteDestination(cancel):137 - Destination process exists, cancelling...
2021-06-04 12:54:25 ERROR (/tmp/workspace/39/0) LineGobbler(voidCall):72 - Error when reading stream
java.io.IOException: Stream closed
at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:168) ~[?:?]
at java.io.BufferedInputStream.read1(BufferedInputStream.java:281) ~[?:?]
at java.io.BufferedInputStream.read(BufferedInputStream.java:343) ~[?:?]
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:297) ~[?:?]
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) ~[?:?]
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:188) ~[?:?]
at java.io.InputStreamReader.read(InputStreamReader.java:181) ~[?:?]
at java.io.BufferedReader.fill(BufferedReader.java:161) ~[?:?]
at java.io.BufferedReader.readLine(BufferedReader.java:326) ~[?:?]
at java.io.BufferedReader.readLine(BufferedReader.java:392) ~[?:?]
at io.airbyte.commons.io.LineGobbler.voidCall(LineGobbler.java:68) [io.airbyte-airbyte-commons-0.24.6-alpha.jar:?]
at io.airbyte.commons.concurrency.VoidCallable.call(VoidCallable.java:35) [io.airbyte-airbyte-commons-0.24.6-alpha.jar:?]
at io.airbyte.commons.concurrency.VoidCallable.call(VoidCallable.java:29) [io.airbyte-airbyte-commons-0.24.6-alpha.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
2021-06-04 12:54:25 INFO (/tmp/workspace/39/0) DefaultAirbyteDestination(cancel):139 - Cancelled destination process!
2021-06-04 12:54:25 INFO (/tmp/workspace/39/0) TemporalAttemptExecution(lambda$getCancellationChecker$3):165 - Interrupting worker thread...
2021-06-04 12:54:25 INFO (/tmp/workspace/39/0) TemporalAttemptExecution(lambda$getCancellationChecker$3):168 - Cancelling completable future...
2021-06-04 12:54:25 INFO (/tmp/workspace/39/0) TemporalAttemptExecution(get):134 - Stopping cancellation check scheduling...
2021-06-04 12:54:25 ERROR (/tmp/workspace/39/0) TemporalAttemptExecution(lambda$getCancellationChecker$4):174 - Cancellation checker exception
io.airbyte.workers.WorkerException: Worker cleaned up after exception
at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:67) ~[io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$4(TemporalAttemptExecution.java:172) [io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
Caused by: io.temporal.client.ActivityCanceledException: WorkflowId=401948d0-6c04-49ad-aa2a-3633821583de, RunId=8650a318-e211-46dc-a7be-cfef4273ae38, ActivityType=Replicate, ActivityId=f0e2ed74-5abf-35b7-bfbf-fc6a615152b4
at io.temporal.internal.sync.ActivityExecutionContextImpl.sendHeartbeatRequest(ActivityExecutionContextImpl.java:205) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.ActivityExecutionContextImpl.doHeartBeat(ActivityExecutionContextImpl.java:147) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.ActivityExecutionContextImpl.heartbeat(ActivityExecutionContextImpl.java:108) ~[temporal-sdk-1.0.4.jar:?]
at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:64) ~[io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
... 7 more
2021-06-04 12:54:25 WARN (/tmp/workspace/39/0) POJOActivityTaskHandler$POJOActivityImplementation(execute):243 - Activity failure. ActivityId=f0e2ed74-5abf-35b7-bfbf-fc6a615152b4, activityType=Replicate, attempt=1
java.util.concurrent.CancellationException: null
at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2468) ~[?:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$3(TemporalAttemptExecution.java:169) ~[io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:66) ~[io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$4(TemporalAttemptExecution.java:172) ~[io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
2021-06-04 12:54:25 ERROR (/tmp/workspace/39/0) DefaultReplicationWorker(run):146 - Sync worker failed.
java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.UncheckedIOException: java.io.IOException: Stream closed
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:138) [io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:51) [io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:147) [io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
Suppressed: io.airbyte.workers.WorkerException: Source process wasn't successful
at io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource.close(DefaultAirbyteSource.java:130) ~[io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:119) [io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:51) [io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:147) [io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
Suppressed: io.airbyte.workers.WorkerException: destination process wasn't successful
at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:126) ~[io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:119) [io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:51) [io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:147) [io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
Caused by: java.lang.RuntimeException: java.io.UncheckedIOException: java.io.IOException: Stream closed
at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$2(DefaultReplicationWorker.java:218) ~[io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[?:?]
... 1 more
Caused by: java.io.UncheckedIOException: java.io.IOException: Stream closed
at java.io.BufferedReader$1.hasNext(BufferedReader.java:577) ~[?:?]
at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811) ~[?:?]
at java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294) ~[?:?]
at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206) ~[?:?]
at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:169) ~[?:?]
at java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300) ~[?:?]
at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) ~[?:?]
at io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource.attemptRead(DefaultAirbyteSource.java:112) ~[io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$2(DefaultReplicationWorker.java:208) ~[io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[?:?]
... 1 more
Caused by: java.io.IOException: Stream closed
at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:168) ~[?:?]
at java.io.BufferedInputStream.read(BufferedInputStream.java:334) ~[?:?]
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:297) ~[?:?]
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) ~[?:?]
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:188) ~[?:?]
at java.io.InputStreamReader.read(InputStreamReader.java:181) ~[?:?]
at java.io.BufferedReader.fill(BufferedReader.java:161) ~[?:?]
at java.io.BufferedReader.readLine(BufferedReader.java:326) ~[?:?]
at java.io.BufferedReader.readLine(BufferedReader.java:392) ~[?:?]
at java.io.BufferedReader$1.hasNext(BufferedReader.java:574) ~[?:?]
at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811) ~[?:?]
at java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294) ~[?:?]
at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206) ~[?:?]
at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:169) ~[?:?]
at java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300) ~[?:?]
at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) ~[?:?]
at io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource.attemptRead(DefaultAirbyteSource.java:112) ~[io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$2(DefaultReplicationWorker.java:208) ~[io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[?:?]
... 1 more
2021-06-04 12:54:25 INFO (/tmp/workspace/39/0) DefaultReplicationWorker(run):167 - sync summary: io.airbyte.config.ReplicationAttemptSummary@3b8cff96[status=cancelled,recordsSynced=6000,bytesSynced=30606000,startTime=1622811215221,endTime=1622811265379]
2021-06-04 12:54:25 INFO (/tmp/workspace/39/0) DefaultReplicationWorker(run):176 - Source did not output any state messages
2021-06-04 12:54:25 WARN (/tmp/workspace/39/0) DefaultReplicationWorker(run):184 - State capture: No new state, falling back on input state: io.airbyte.config.State@118f4432[state={}]

Logs After Changes (truncated from READ onwards, the lines with ===== are debug lines and have been removed)
The last exception is printed by Temporal and outside our control.

2021-06-04 12:33:26 INFO (/tmp/workspace/35/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:33:26 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):82 - {} - Command: READ
2021-06-04 12:33:26 INFO (/tmp/workspace/35/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:33:26 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):83 - {} - Integration config: IntegrationConfig{command=READ, configPath='source_config.json', catalogPath='source_catalog.json', statePath='null'}
2021-06-04 12:33:26 INFO (/tmp/workspace/35/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:33:26 �[32mINFO�[m i.a.i.d.l.LocalJsonDestination$JsonConsumer(<init>):158 - {} - initializing consumer.
2021-06-04 12:33:26 INFO (/tmp/workspace/35/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:33:26 �[32mINFO�[m i.a.i.s.p.PostgresSource(isCdc):277 - {} - using CDC: false
2021-06-04 12:33:26 INFO (/tmp/workspace/35/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:33:26 �[32mINFO�[m i.a.i.s.j.AbstractJdbcSource(lambda$getCheckOperations$1):150 - {} - Attempting to get metadata from the database to see if we can connect.
2021-06-04 12:33:32 INFO (/tmp/workspace/35/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:33:32 �[32mINFO�[m i.a.i.s.j.JdbcCdcStateManager(<init>):46 - {} - Initialized CDC state with: null
2021-06-04 12:33:32 INFO (/tmp/workspace/35/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-04 12:33:32 �[32mINFO�[m i.a.i.s.j.JdbcStateManager(createCursorInfoForStream):138 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='tiny_table', namespace='public'}, New Cursor Field: null. Resetting cursor value
2021-06-04 12:33:34 INFO (/tmp/workspace/35/0) TemporalAttemptExecution(lambda$getCancellationChecker$3):172 - Running sync worker cancellation...
2021-06-04 12:33:34 INFO (/tmp/workspace/35/0) DefaultReplicationWorker(cancel):256 - Cancelling replication worker...
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) DefaultReplicationWorker(cancel):263 - ====== cancelled set to True
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) DefaultReplicationWorker(cancel):265 - Cancelling destination...
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) DefaultAirbyteDestination(cancel):134 - Attempting to cancel destination process...
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) DefaultAirbyteDestination(cancel):139 - Destination process exists, cancelling...
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) LineGobbler(close):102 - closing line gobbler
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) LineGobbler(close):104 - closed line gobbler
2021-06-04 12:33:44 WARN (/tmp/workspace/35/0) LineGobbler(voidCall):92 - airbyte-destination gobbler IOException: Stream closed. Typically happens when cancelling a job.
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) DefaultAirbyteDestination(cancel):142 - Cancelled destination process!
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) DefaultReplicationWorker(cancel):272 - Cancelling source...
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) DefaultAirbyteSource(cancel):138 - Attempting to cancel source process...
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) DefaultAirbyteSource(cancel):143 - Source process exists, cancelling...
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) LineGobbler(close):102 - closing line gobbler
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) LineGobbler(close):104 - closed line gobbler
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) DefaultAirbyteSource(cancel):146 - Cancelled source process!
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) TemporalAttemptExecution(lambda$getCancellationChecker$3):176 - Interrupting worker thread...
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) DefaultReplicationWorker(run):141 - Source thread complete.
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) DefaultReplicationWorker(run):142 - Waiting for destination thread to join.
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) DefaultReplicationWorker(run):144 - Destination thread complete.
2021-06-04 12:33:44 WARN (/tmp/workspace/35/0) DefaultAirbyteSource(close):132 - Source process might not have shut down correctly. source process alive: false, source process exit value: 143
2021-06-04 12:33:44 WARN (/tmp/workspace/35/0) DefaultAirbyteDestination(close):128 - Destination process might not have shut down correctly. destination process alive: false, destination process exit value: 1
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) TemporalAttemptExecution(lambda$getCancellationChecker$3):179 - Cancelling completable future...
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) TemporalAttemptExecution(lambda$getCancellationChecker$3):180 - ===== future: java.util.concurrent.CompletableFuture@4f27ecde[Not completed, 1 dependents]
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) DefaultReplicationWorker(run):169 - sync summary: io.airbyte.config.ReplicationAttemptSummary@1153ace0[status=cancelled,recordsSynced=0,bytesSynced=0,startTime=1622810004409,endTime=1622810024573]
2021-06-04 12:33:44 WARN (/tmp/workspace/35/0) CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):72 - Job either timeout-ed or was cancelled.
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) TemporalAttemptExecution(get):136 - Stopping cancellation check scheduling...
2021-06-04 12:33:44 INFO (/tmp/workspace/35/0) DefaultReplicationWorker(run):178 - Source did not output any state messages
2021-06-04 12:33:44 WARN (/tmp/workspace/35/0) DefaultReplicationWorker(run):189 - State capture: No state retained.
2021-06-04 12:33:44 WARN (/tmp/workspace/35/0) POJOActivityTaskHandler$POJOActivityImplementation(execute):243 - Activity failure. ActivityId=6e970354-969b-35b5-9ced-54772bf598a5, activityType=Replicate, attempt=1
java.util.concurrent.CancellationException: null
at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2468) ~[?:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$3(TemporalAttemptExecution.java:182) ~[io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:71) ~[io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$4(TemporalAttemptExecution.java:185) ~[io.airbyte-airbyte-workers-0.24.6-alpha.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
2021-06-04 12:33:44 WARN (/tmp/workspace/35/0) CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):72 - Job either timeout-ed or was cancelled.

How

Rationale have either been left as source comments or PR comments.

Recommended reading order

  1. DefaultReplicationWorker
  2. CancellationHandler and TemporalAttemptExecution
  3. LineGobbler, DefaultAirbyteSource and DefaultAirbyteDestination

@@ -64,7 +69,7 @@ public void checkAndHandleCancellation(Runnable onCancellationCallback) throws W
context.heartbeat(null);
} catch (ActivityCompletionException e) {
onCancellationCallback.run();
throw new WorkerException("Worker cleaned up after exception", e);
LOGGER.warn("Job either timeout-ed or was cancelled.");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of throwing a noisy exception, a log line here felt more useful to me. this also simplifies the interface by letting us get rid of the throws

outputFuture.cancel(false);
};

cancellationHandler.checkAndHandleCancellation(onCancellationCallback);
} catch (WorkerException e) {
} catch (Exception e) {
Copy link
Contributor Author

@davinchia davinchia Jun 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the cancellation handler no longer throws a worker exception, this becomes a general exception check in case we run into anything. We could get rid of this; felt it was safer to keep it.

@@ -147,14 +147,4 @@ public void testCloseNotifiesLifecycle() throws Exception {
verify(outputStream).close();
}

@Test
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got rid of these test because I didn't feel there gave us much.

As is, the cancel operation was resulting in exit values of 1 and 127 and resulting in exception even though the system was doing as instructed. 127 is the general sig term exit code and is expected. The meaning of the 1 exit code depends on the operating system.

Regardless of exit code, my tests showed that the containers/processes exited cleanly.

}

private final BufferedReader is;
private final Consumer<String> consumer;
private final ExecutorService executor;
private final Map<String, String> mdc;
private final String caller;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without the caller field, it was difficult to debug what gobbler was running into errors.

@@ -68,8 +86,10 @@ public void voidCall() {
while ((line = is.readLine()) != null) {
consumer.accept(line);
}
} catch (IOException i) {
Copy link
Contributor Author

@davinchia davinchia Jun 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen if the Gobbler tries to read the error stream after the process has been closed.

The first thing I tried was adding an atomic boolean running, changing the while loop to be while(running.get() && (line = is.readLine()) != null) and a close method to be called before the process is closed.

However this was still happening since the boolean evaluation isn't atomic i.e. running is set to false after we evaluate it and the input stream is closed. We could use a lock here, but that felt like too much complexity for little gain. This felt like a good middle ground for now. What do you think?

@davinchia davinchia marked this pull request as ready for review June 4, 2021 14:09
@davinchia davinchia removed the request for review from ChristopheDuong June 4, 2021 14:09
@davinchia davinchia merged commit 70a48f5 into master Jun 5, 2021
@davinchia davinchia deleted the davinchia/clean-up-exception-checking branch June 5, 2021 01:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants