Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support remote in dynamic #248

Merged
merged 9 commits into from Sep 28, 2023
Merged

Support remote in dynamic #248

merged 9 commits into from Sep 28, 2023

Conversation

honnix
Copy link
Collaborator

@honnix honnix commented Sep 27, 2023

TL;DR

This is an attempt to support remote tasks/launch plans in dynamic workflows.

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

I don't see a good reason why we don't support the remote in dynamic setup, while we already have code doing the latest lookup etc.

Tracking Issue

NA

Follow-up issue

NA


@Override
protected PartialTaskIdentifier visitTaskIdentifier(PartialTaskIdentifier value) {
if (value.project() == null && value.domain() == null && value.version() == null) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the current logic is, we only support task without any pdv coordinate, which means tasks in the workflow definition itself. For a remote entity, we do have pd(v).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take an example from IdentifierRewrite.

  PartialTaskIdentifier apply(PartialTaskIdentifier taskId) {
    String name = Preconditions.checkNotNull(taskId.name(), "name is null");
    String project = coalesce(taskId.project(), project());
    String domain = coalesce(taskId.domain(), domain());
    String version =
        coalesce(
            taskId.version(),
            () ->
                taskId.project() == null
                    ? version()
                    : getLatestTaskVersion(
                        /* project= */ project, /* domain= */ domain, /* name= */ name));
    return PartialTaskIdentifier.builder()
        .name(name)
        .domain(domain)
        .project(project)
        .version(version)
        .build();
  }

This means:

  • we take task name from task id, which is the same as DynamicWorkflowIdentifierRewrite
  • we take project if task id has it and in this case it is a remote one; but if it is a task in the workflow itself, this should be null, and this should cover the existing path in DynamicWorkflowIdentifierRewrite
  • the same applies to domain
  • we take version if task id has it and in this case it is a remote one with resolved (or hardcoded) version; but if it is a task in the workflow itself, this should be null and the same for project, so this should cover the existing path in DynamicWorkflowIdentifierRewrite; but if project is not null we will try to do a latest lookup to resolve the remote one

With this, we should both support local and remote tasks. The same applies to remote launch plan, and subworkflow.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be some corner cases that we may eventually need to write a new visitor, but I honestly don't see any blocker.

@honnix honnix marked this pull request as ready for review September 28, 2023 10:00
SdkNode<Void> hello =
builder.apply(
"hello",
SdkRemoteTask.create(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the remote task we reference from a dynamic workflow task.

builder.apply("fib-" + i, new SumTask(), SumInput.create(value, prev)).getOutputs();
builder
.apply(
"fib-" + i, new SumTask().withUpstreamNode(hello), SumInput.create(value, prev))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make a connection for fun, as it is not required.

DynamicJobSpec spec,
Map<TaskIdentifier, TaskTemplate> taskTemplates,
Map<WorkflowIdentifier, WorkflowTemplate> workflowTemplates) {

DynamicWorkflowIdentifierRewrite rewrite = new DynamicWorkflowIdentifierRewrite(config);
WorkflowNodeVisitor workflowNodeVisitor =
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the core change.

@honnix honnix mentioned this pull request Sep 28, 2023
8 tasks
@@ -33,7 +33,7 @@

public class FlyteSandboxContainer extends GenericContainer<FlyteSandboxContainer> {

public static final String IMAGE_NAME = "ghcr.io/flyteorg/flyte-sandbox:v1.1.0";
public static final String IMAGE_NAME = "ghcr.io/flyteorg/flyte-sandbox:v1.9.1";
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required by this PR, but it's nice to be on latest version.

@@ -200,7 +200,7 @@
<environment>
<FLYTE_INTERNAL_MODULE_DIR>/jflyte/modules</FLYTE_INTERNAL_MODULE_DIR>
<FLYTE_INTERNAL_IMAGE>${docker.image}:${docker.tag}</FLYTE_INTERNAL_IMAGE>
<FLYTE_PLATFORM_URL>flyte:30081</FLYTE_PLATFORM_URL>
<FLYTE_PLATFORM_URL>flyteadmin.flyte.svc.cluster.local:81</FLYTE_PLATFORM_URL>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required for remote lookup to work because the task container running in k3 will need to talk to flyteadmin and using service DNS name is the best option.

Strictly speaking this is a breaking change but it is extremely unlikely that the existing config flyte:30081 would work for any users anyway and they have likely already customized it.

return withTaskOutput0(task, input, output);
}

public <InputT, OutputT> SdkTestingExecutor withTaskOutput(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff is weird, but there is no change made to withTaskOutput(SdkRemoteTask, ...),

Signed-off-by: Hongxin Liang <honnix@users.noreply.github.com>
Signed-off-by: Hongxin Liang <honnix@users.noreply.github.com>
Signed-off-by: Hongxin Liang <honnix@users.noreply.github.com>
Signed-off-by: Hongxin Liang <honnix@users.noreply.github.com>
Signed-off-by: Hongxin Liang <honnix@users.noreply.github.com>
Signed-off-by: Hongxin Liang <honnix@users.noreply.github.com>
Signed-off-by: Hongxin Liang <honnix@users.noreply.github.com>
Signed-off-by: Hongxin Liang <honnix@users.noreply.github.com>
Signed-off-by: Hongxin Liang <honnix@users.noreply.github.com>
@honnix honnix merged commit 9b56702 into master Sep 28, 2023
4 checks passed
@honnix honnix deleted the remote-in-dynamic branch September 28, 2023 15:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants