Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task Runs Concurrency slots not released when flow runs in Kubernetes are cancelled #8566

Open
4 tasks done
masonmenges opened this issue Feb 16, 2023 · 5 comments
Open
4 tasks done
Assignees
Labels
api Related the Prefect REST API bug Something isn't working concurrency

Comments

@masonmenges
Copy link
Contributor

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

When running a prefect flow as a kubernetes job if the flow run is cancelled while tasks are in a running state the concurrency slots used by the tasks are not released though the tasks are in a cancelled state.

This is reproducible via the following steps with the code below with a flow run triggered as a kuberenetes job

  1. Create a concurrency limit in Prefect Cloud
  2. Add a task label to use that concurrency limit
  3. Trigger the flow and cancel the flow once the tasks that use that concurrency limit are in a “running” state and populate the concurrency limit queue
  4. The tasks’ state will change from “running” to “canceled”, but will remain in the concurrency limit queue

KubernetesJob Config:
k8sjobconfig

potentially related but separate issue:
#7732

Reproduction

from prefect import flow, task, get_run_logger
import time

@task(tags=["some_concurrency_tag"])
def log_something(x):
    logger = get_run_logger()
    logger.info(f"this is log number {x}")
    time.sleep(60)

@flow
def smoke_test_flow():
    for x in range(0, 100):
        log_something.submit(x)

if __name__ == "__main__":
    smoke_test_flow()

Error

No response

Versions

runs from the base docker image prefecthq/prefect:2.8.0-python3.10

Additional context

Cluster config, minus any sensitive information

{
    "location": "southcentralus",
    "name": "prefect-k8s-dev",
    "tags": {
        "Application": "",
        "BudgetAlert": "",
        "BusinessGroup": "Data Analytics",
        "CostCode": "",
        "Priority": "",
        "TechnicalContact": "",
        "environment": "dev",
        "prefect": "true"
    },
    "type": "Microsoft.ContainerService/ManagedClusters",
    "properties": {
        "provisioningState": "Succeeded",
        "powerState": {
            "code": "Running"
        },
        "kubernetesVersion": "1.24.9",
        "dnsPrefix": "prefect-k8s-dev",
        "agentPoolProfiles": [
            {
                "name": "default",
                "count": 2,
                "vmSize": "Standard_DS2_v2",
                "osDiskSizeGB": 50,
                "osDiskType": "Ephemeral",
                "kubeletDiskType": "OS",
                "maxPods": 110,
                "type": "VirtualMachineScaleSets",
                "enableAutoScaling": false,
                "provisioningState": "Succeeded",
                "powerState": {
                    "code": "Running"
                },
                "orchestratorVersion": "1.24.9",
                "enableNodePublicIP": false,
                "mode": "System",
                "enableEncryptionAtHost": false,
                "enableUltraSSD": false,
                "osType": "Linux",
                "osSKU": "Ubuntu",
                "nodeImageVersion": "AKSUbuntu-1804gen2containerd-2023.01.20",
                "upgradeSettings": {},
                "enableFIPS": false
            }
        ]
    }
}
@masonmenges masonmenges added bug Something isn't working status:triage labels Feb 16, 2023
@zanieb zanieb added status:accepted api Related the Prefect REST API and removed status:triage labels Feb 23, 2023
@Samreay
Copy link

Samreay commented Mar 27, 2023

I've posted to the slack group about this too, but this is not exclusive to Kubernetes, I have stock standard tasks being sent to a dask cluster, and when the parent flow Crashes for any reason, the slots aren't released.

I wonder if this is the same issue as reported over in #5995

@zanieb
Copy link
Contributor

zanieb commented Mar 27, 2023

Thanks @Samreay — is this helped by #8408 ?

@Samreay
Copy link

Samreay commented Mar 28, 2023

Hmmm, if I've understood the merge, then potentially, though it would be good to have that cli endpoint invoked by prefect. I can see the reset method seems to be available in https://docs.prefect.io/api-ref/prefect/cli/concurrency_limit/, so I could add a flow that runs every few minutes which simply calls reset on all active limits Ive got defined.

That said, does that reset endpoint clear all slots, or just zombie slots? It looks like the slot override would end up being none, and so it would remove even valid still running tasks from the slot, right?

@ghislainp
Copy link

I've the same problem with tasks run with the concurrent runner (the defaut runner). They become zombie, probably because they use dask for the xarray math and some deadlock occurs in dask.
The problem is that my tasks have a timeout of 1000s, but still can be stuck for much longer times (hours).

@task(tags=["memory_intensive"], retries=2, retry_delay_seconds=400, timeout_seconds=1000)

The fact that the task timeout does not work well is a main problem, but in any case concurrency_limit should be able to release the long-running tasks. Concurrency_limit could use the task timeout and release them, even if still running. Or a specific timeout in concurrency_limit.

Currently this has a big impact in my work, I'm under-using a cluster because I've often 50% of the tasks stuck every ~10h of processing. I've to reset the concurrency_limit, which is a problem because it also release the effectively running (50%) tasks. This takes more memory and make my all system more instable.

@ytl-liva
Copy link

ytl-liva commented Dec 7, 2023

I have similar issue on task concurrency with kubernetes as well. mentioned this in slack
I also notice this in a task.map() runs. loop runs that go item by item does not have this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api Related the Prefect REST API bug Something isn't working concurrency
Projects
None yet
Development

No branches or pull requests

9 participants