Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Working Kube Cancel. #3983

Merged
merged 2 commits into from
Jun 9, 2021
Merged

Conversation

davinchia
Copy link
Contributor

@davinchia davinchia commented Jun 9, 2021

What

Continuation of #3922 - get Kube cancel working.

After these changes, we still have one noisy exception remaining, but this is from temporal and not easily muted.

complete.
2021-06-09 08:55:39 INFO (/workspace/22/0) KubePodProcess(getReturnCode):475 - Unable to find pod airbyte-worker-22-0-epznb to retrieve exit value. Defaulting to 143. This is expected if the job was cancelled.
2021-06-09 08:55:39 INFO (/workspace/22/0) KubeProcessFactory(lambda$create$0):82 - Port consumer skipping releasing: 9024
2021-06-09 08:55:39 INFO (/workspace/22/0) KubeProcessFactory(lambda$create$0):82 - Port consumer skipping releasing: 9025
2021-06-09 08:55:39 INFO (/workspace/22/0) KubePodProcess(getReturnCode):475 - Unable to find pod airbyte-worker-22-0-epznb to retrieve exit value. Defaulting to 143. This is expected if the job was cancelled.
2021-06-09 08:55:39 INFO (/workspace/22/0) DefaultAirbyteDestination(cancel):140 - Cancelled destination process!
2021-06-09 08:55:39 INFO (/workspace/22/0) DefaultReplicationWorker(cancel):276 - Cancelling source...
2021-06-09 08:55:39 INFO (/workspace/22/0) DefaultAirbyteSource(cancel):134 - Attempting to cancel source process...
2021-06-09 08:55:39 INFO (/workspace/22/0) DefaultAirbyteSource(cancel):139 - Source process exists, cancelling...
2021-06-09 08:55:39 INFO (/workspace/22/0) KubePodProcess(destroy):435 - Destroying Kube process: airbyte-worker-22-0-rbysz
2021-06-09 08:55:39 WARN (/workspace/22/0) LineGobbler(voidCall):88 - airbyte-source gobbler IOException: Socket closed. Typically happens when cancelling a job.
2021-06-09 08:55:39 INFO (/workspace/22/0) KubeProcessFactory(lambda$create$0):80 - Port consumer releasing: 9026
2021-06-09 08:55:39 INFO (/workspace/22/0) KubeProcessFactory(lambda$create$0):80 - Port consumer releasing: 9027
2021-06-09 08:55:39 INFO (/workspace/22/0) KubePodProcess(destroy):441 - Destroyed Kube process: airbyte-worker-22-0-rbysz
2021-06-09 08:55:49 INFO (/workspace/22/0) KubeProcessFactory(lambda$create$0):82 - Port consumer skipping releasing: 9026
2021-06-09 08:55:49 INFO (/workspace/22/0) KubeProcessFactory(lambda$create$0):82 - Port consumer skipping releasing: 9027
2021-06-09 08:55:49 INFO (/workspace/22/0) KubePodProcess(destroy):435 - Destroying Kube process: airbyte-worker-22-0-rbysz
2021-06-09 08:55:49 INFO (/workspace/22/0) KubeProcessFactory(lambda$create$0):82 - Port consumer skipping releasing: 9026
2021-06-09 08:55:49 INFO (/workspace/22/0) KubeProcessFactory(lambda$create$0):82 - Port consumer skipping releasing: 9027
2021-06-09 08:55:49 INFO (/workspace/22/0) KubePodProcess(destroy):441 - Destroyed Kube process: airbyte-worker-22-0-rbysz
2021-06-09 08:55:49 INFO (/workspace/22/0) DefaultAirbyteSource(cancel):141 - Cancelled source process!
2021-06-09 08:55:49 INFO (/workspace/22/0) TemporalAttemptExecution(lambda$getCancellationChecker$3):185 - Interrupting worker thread...
2021-06-09 08:55:49 INFO (/workspace/22/0) KubeProcessFactory(lambda$create$0):82 - Port consumer skipping releasing: 9026
2021-06-09 08:55:49 INFO (/workspace/22/0) KubeProcessFactory(lambda$create$0):82 - Port consumer skipping releasing: 9027
2021-06-09 08:55:49 INFO (/workspace/22/0) TemporalAttemptExecution(lambda$getCancellationChecker$3):188 - Cancelling completable future...
2021-06-09 08:55:49 INFO (/workspace/22/0) TemporalAttemptExecution(get):134 - Stopping cancellation check scheduling...
2021-06-09 08:55:49 ERROR (/workspace/22/0) WorkerUtils(gentleCloseWithHeartbeat):116 - Exception during grace period for process to finish. This can happen when cancelling jobs.
2021-06-09 08:55:49 WARN (/workspace/22/0) CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):71 - Job either timeout-ed or was cancelled.
2021-06-09 08:55:49 WARN (/workspace/22/0) POJOActivityTaskHandler$POJOActivityImplementation(execute):243 - Activity failure. ActivityId=5df00f51-d9db-3660-928e-3a4a69f8f406, activityType=Replicate, attempt=1
java.util.concurrent.CancellationException: null
at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2468) ~[?:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$3(TemporalAttemptExecution.java:191) ~[io.airbyte-airbyte-workers-0.24.7-alpha.jar:?]
at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:70) ~[io.airbyte-airbyte-workers-0.24.7-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$4(TemporalAttemptExecution.java:194) ~[io.airbyte-airbyte-workers-0.24.7-alpha.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
2021-06-09 08:55:49 WARN (/workspace/22/0) WorkerUtils(forceShutdown):128 - Process is taking too long to finish. Killing it
2021-06-09 08:55:49 INFO (/workspace/22/0) KubePodProcess(destroy):435 - Destroying Kube process: airbyte-worker-22-0-rbysz
2021-06-09 08:55:49 INFO (/workspace/22/0) KubeProcessFactory(lambda$create$0):82 - Port consumer skipping releasing: 9026
2021-06-09 08:55:49 INFO (/workspace/22/0) KubeProcessFactory(lambda$create$0):82 - Port consumer skipping releasing: 9027
2021-06-09 08:55:49 INFO (/workspace/22/0) KubePodProcess(destroy):441 - Destroyed Kube process: airbyte-worker-22-0-rbysz
2021-06-09 08:55:49 INFO (/workspace/22/0) LoggingTrackingClient(track):60 - track. version: dev, userId: d6452139-98e9-4958-b170-5c5a51aa85c5, action: Connector Jobs, metadata: {job_type=sync, connector_source=Postgres, config.source.host=set, config.source.port=set, config.source.username=set, frequency=manual, connector_source_definition_id=decd338e-5647-4c0b-adf4-da0e75f5a750, config.source.ssl=false, config.source.database=set, config.source.password=set, attempt_stage=ENDED, attempt_completion_status=FAILED, catalog.sync_mode.full_refresh=set, connection_id=85aa5cff-24e7-4127-8e65-1468cd1f692a, job_id=22, connector_source_version=0.3.2, config.destination.destination_path=set, catalog.destination_sync_mode.append=set, connector_destination_version=0.2.6, attempt_id=0, connector_destination=Local CSV, connector_destination_definition_id=8be1cf83-fde1-477f-a4ad-318d23c9f3c6}
2021-06-09 08:56:19 INFO (/workspace/22/0) KubePodProcess(getReturnCode):475 - Unable to find pod airbyte-worker-22-0-rbysz to retrieve exit value. Defaulting to 143. This is expected if the job was cancelled.
2021-06-09 08:56:19 INFO (/workspace/22/0) KubeProcessFactory(lambda$create$0):82 - Port consumer skipping releasing: 9026
2021-06-09 08:56:19 INFO (/workspace/22/0) KubeProcessFactory(lambda$create$0):82 - Port consumer skipping releasing: 9027
2021-06-09 08:56:19 INFO (/workspace/22/0) KubePodProcess(getReturnCode):475 - Unable to find pod airbyte-worker-22-0-rbysz to retrieve exit value. Defaulting to 143. This is expected if the job was cancelled.
2021-06-09 08:56:19 INFO (/workspace/22/0) KubePodProcess(getReturnCode):475 - Unable to find pod airbyte-worker-22-0-rbysz to retrieve exit value. Defaulting to 143. This is expected if the job was cancelled.
2021-06-09 08:56:19 INFO (/workspace/22/0) KubePodProcess(getReturnCode):475 - Unable to find pod airbyte-worker-22-0-rbysz to retrieve exit value. Defaulting to 143. This is expected if the job was cancelled.
2021-06-09 08:56:19 INFO (/workspace/22/0) KubePodProcess(getReturnCode):475 - Unable to find pod airbyte-worker-22-0-rbysz to retrieve exit value. Defaulting to 143. This is expected if the job was cancelled.
2021-06-09 08:56:19 INFO (/workspace/22/0) KubePodProcess(getReturnCode):475 - Unable to find pod airbyte-worker-22-0-rbysz to retrieve exit value. Defaulting to 143. This is expected if the job was cancelled.
2021-06-09 08:56:19 WARN (/workspace/22/0) DefaultAirbyteSource(close):126 - Source process might not have shut down correctly. source process alive: false, source process exit value: 143. This warning is normal if the job was cancelled.
2021-06-09 08:56:19 INFO (/workspace/22/0) KubePodProcess(getReturnCode):475 - Unable to find pod airbyte-worker-22-0-epznb to retrieve exit value. Defaulting to 143. This is expected if the job was cancelled.
2021-06-09 08:56:19 INFO (/workspace/22/0) KubeProcessFactory(lambda$create$0):82 - Port consumer skipping releasing: 9024
2021-06-09 08:56:19 INFO (/workspace/22/0) KubeProcessFactory(lambda$create$0):82 - Port consumer skipping releasing: 9025
2021-06-09 08:56:19 INFO (/workspace/22/0) KubePodProcess(getReturnCode):475 - Unable to find pod airbyte-worker-22-0-epznb to retrieve exit value. Defaulting to 143. This is expected if the job was cancelled.
2021-06-09 08:56:19 INFO (/workspace/22/0) KubePodProcess(getReturnCode):475 - Unable to find pod airbyte-worker-22-0-epznb to retrieve exit value. Defaulting to 143. This is expected if the job was cancelled.
2021-06-09 08:56:19 INFO (/workspace/22/0) KubePodProcess(getReturnCode):475 - Unable to find pod airbyte-worker-22-0-epznb to retrieve exit value. Defaulting to 143. This is expected if the job was cancelled.
2021-06-09 08:56:19 INFO (/workspace/22/0) KubePodProcess(getReturnCode):475 - Unable to find pod airbyte-worker-22-0-epznb to retrieve exit value. Defaulting to 143. This is expected if the job was cancelled.
2021-06-09 08:56:19 INFO (/workspace/22/0) KubePodProcess(getReturnCode):475 - Unable to find pod airbyte-worker-22-0-epznb to retrieve exit value. Defaulting to 143. This is expected if the job was cancelled.
2021-06-09 08:56:19 WARN (/workspace/22/0) DefaultAirbyteDestination(close):125 - Destination process might not have shut down correctly. destination process alive: false, destination process exit value: 143. This warning is normal if the job was cancelled.
2021-06-09 08:56:19 INFO (/workspace/22/0) DefaultReplicationWorker(run):169 - sync summary: io.airbyte.config.ReplicationAttemptSummary@1eae3eef[status=cancelled,recordsSynced=0,bytesSynced=0,startTime=1623228910570,endTime=1623228979233]
2021-06-09 08:56:19 INFO (/workspace/22/0) DefaultReplicationWorker(run):178 - Source did not output any state messages
2021-06-09 08:56:19 WARN (/workspace/22/0) DefaultReplicationWorker(run):189 - State capture: No state retained.

How

As we discussed, the only remaining gotcha is we cannot retrieve exit value when the pod is terminated (since Kube doesn't save this). Instead, we set a boolean and return a 143, which is the default SIGTERM exit code.

Also added some comments to explain functions.

Recommended reading order

  • KubePodProcess

@github-actions github-actions bot added area/platform issues related to the platform area/worker Related to worker labels Jun 9, 2021
@davinchia davinchia changed the base branch from master to davinchia/kube-queueing-poc June 9, 2021 09:01
/**
* Intended to gracefully clean up after a completed Kube Pod. This should only be called if the
* process is successful.
*/
@Override
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrhizor did I correctly interpret the difference between the two waitFor functions? I double checked the WorkerUtils class to confirm this is how they are used today.

@davinchia davinchia marked this pull request as ready for review June 9, 2021 09:06
@auto-assign auto-assign bot requested review from cgardens and sherifnada June 9, 2021 09:06
@davinchia davinchia requested review from jrhizor and removed request for sherifnada and cgardens June 9, 2021 09:07
@davinchia davinchia mentioned this pull request Jun 9, 2021
@jrhizor jrhizor merged commit c7db626 into davinchia/kube-queueing-poc Jun 9, 2021
@jrhizor jrhizor deleted the davinchia/kube-cancel-2 branch June 9, 2021 20:32
jrhizor added a commit that referenced this pull request Jun 10, 2021
* Use CDK to generate source that can be configured to emit a certain number of records and always works.

* Checkpoint: socat works from inside the docker container.

* Override the entry point.

* Clean up and add ReadMe.

* Clean up socat.

* Checkpoint: connect to Kube cluster and list all the pods.

* Checkpoint: Sync worker pod is able to send output to the destination pod.

* Checkpoint: Sync worker creates Dest pod if none existed previously. It also waits for the pod to be ready before doing anything else. Sync worker will also remove the pod on termination.

* update readme

* Checkpoint: Dest pod does nott restart after finishing. Comment out delete command in Sync worker.

* working towards named pipes

* named pipes working

* update readme

* WIP named pipe / socat sidecar kube port forwarding (#3518)

* nearly working sources

* update

* stdin example

* move all kube testing yamls into the airbyte-workers directories. sort the airbyte-workers resource folder; place all the poc yamls together.

* Format.

* Put back the original KubeProcessBuilderFactory.

* Fix slight errors.

* Checkpoint: Worker pod knows its own IP. Successfully starts and writes to Dest pod after refactor.

* remove unused file and update readme

* Dest pod loops back into worker pod. However, the right messages do not seem to be passing in.

* Switch back to worker ip.

* SWEET VICTORY!.

* wrap kube pod in process (#3540)

also clean up kubernetes deploys.

* More clean up. (#3586)

The first 6 points of #3464.

The only interesting thing about this PR is the kube pod shutdown. For whatever reason, the OkHttpPool isn't respecting the evictAll call and 1 idle thread remains. So instead of shutting down immediately, the worker pod shuts down after 5 mins when the idle thread id reaped. There isn't an easy way to modify the pool's idle reap configuration now. I do not think this issue is blocking since it's relatively benign, so I vote we create a ticket and come back to this once we do an e2e test.

* Implements redirecting standard error as well. (#3623)

* Clean up before next implementation.

* kube process launching (#3790)

* processes must handle file mounting

* remove comment

* default to base entrypoint

* use process builder factory / select stdin / use a pool of ports

* fix up

* add super hacky copying example

* Checkpoint: Works end to end!

* Checkpoint: Use API to make sure init container is ready instead of blind sleep. Propagate exception in DefaultCheckConnectionWorker.

* Refactor KubePodProcess. Checked to make sure everything still works.

* Format.

* Clean up code. Begin putting this into variables and breaking up long constructor function.

* Add comments to explain what is happening.

* fix normalization test

* increase timeout for initcontainer

Co-authored-by: Davin Chia <davinchia@gmail.com>

* facepalm moment

* clean up kube poc pr (#3834)

* clean up

* remove source-always-works

* create separate commons-docker

* fix test

* enable kube e2e tests (#3866)

* enable kube e2e tests

* use more generally accepted env definition

* use new runners

* use its own runner and install minikube differently

* update name

* use kubectl alias

* use link instead of alias that doesn't propagate

* start minikube

* use driver=none

* go back to using action

* mess with versions

* revert runner

* install socat

* print logs after run

* also try re-runnining tasks

* always wait for file transfer

* use ports

* increase wait timeout for kube

* use different localhost ips and bump normalization to include an entrypoint

* proposed fix

* all working locally

* revert temporary changes

* revert normalization image change that's happening in a separate pr

* readability

* final comment

* Working Kube Cancel. (#3983)

* Port over the basic changes.

* Add logic to return proper exit code in the event of termination. Add comments to explain why.

* revert envs change and merge master to fix kube acceptance tests (#4012)

* use older env format

* fix build

Co-authored-by: jrhizor <me@jaredrhizor.com>
Co-authored-by: Jared Rhizor <jared@dataline.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/platform issues related to the platform area/worker Related to worker
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants