Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agent Sync Plugin #4107

Merged
merged 68 commits into from Dec 20, 2023
Merged

Agent Sync Plugin #4107

merged 68 commits into from Dec 20, 2023

Conversation

Future-Outlier
Copy link
Member

@Future-Outlier Future-Outlier commented Sep 30, 2023

Tracking issue

#3936
flyteorg/flytekit#1822

Describe your changes

  • flyteidl
  1. add a DoTask function in agent.proto
  2. change the purpose of runtime metadata's flavor in tasks.proto

https://github.com/flyteorg/flyte/pull/4107/files#diff-36edb1a8db489fabd420007c4542f657ed7e6d4f9d11052f81536c35f422ac14R91-R104
https://github.com/flyteorg/flyte/pull/4107/files#diff-d9c6122ef023db0768b904e6d6307684dfc35b4d03f0b2152ff7791344178a14R85

  • Add sync plugin routing mechanism to support sync plugins
  1. by adding the type Plugin interface here
    https://github.com/flyteorg/flyte/pull/4107/files#diff-3119631e7cd5a6d4fb999fc50bbb8509bb685347c4a6e3cceda14acec6aa0eb3R100-R103

https://github.com/flyteorg/flyte/pull/4107/files#diff-5e3f52f33e7ff5f718bc03e33dd574152be1a784901745e7c652b37e0bfc46a7R63-R114

  • Split Handle function into 2 functions, syncHandle and asyncHandle

  • Add test for the above changes

  • Finish above without introducing breaking changes

Note: api_task is a task type which has only Do Task Function, the sync plugin meets it's need.

How to test it?

Configuration Example

To set up agent sync plugins, users can now utilize the following configuration:

tasks:
  task-plugins:
    enabled-plugins:
      - agent-service
    default-for-task-types:
      api_task: agent-service
plugins:
  agent-service:
    supportedTaskTypes:
      - api_task

Run in Remote Environment

pyflyte run --remote --image futureoutlier/flytekit:chatgpt-v2 chatgpt_example.py wf

Routing Mechanism

To determine the route to the sync plugin, we're leveraging the task's metadata flavor.
The logic can be found in the PR's changes:

Here's the code
if taskTemplate.GetMetadata().GetRuntime().GetFlavor() == syncPlugin
https://github.com/flyteorg/flyte/pull/4107/files#diff-da53a6f262afa680bad44d5bc3e5a6a95f1e8b8764002e0db9a7f015af7d2831R77

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Note to reviewers

Please also take a look at this PR.
flyteorg/flytekit#1822
Both PRs are interconnected and address related functionalities.

Screen Shot

image
Note: You can find that the ChatGPT task is an api_task.

How it works at high level

image

Additional Information

The typecasting for the webapi.Plugin to webapi.SyncPlugin or webapi.AsyncPlugin might be a little bit tricky, webapi.Plugin's interface has a common function with webapi.SyncPlugin and webapi.AsyncPlugin.

In the file flyte/flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go, since the function p.GetConfig() performances the same no matter the plugin is webapi.AsyncPlugin or webapi.SyncPlugin, and all the parts for webapi.AsyncPlugin and webapi.SyncPlugin are the same, it is OK to share the same snippets in the function, but don't need to create more PluginLoaders interface as the conversation below.

func createRemotePlugin(pluginEntry webapi.PluginEntry, c clock.Clock) core.PluginEntry {
return core.PluginEntry{
ID: pluginEntry.ID,
RegisteredTaskTypes: pluginEntry.SupportedTaskTypes,
LoadPlugin: func(ctx context.Context, iCtx core.SetupContext) (
core.Plugin, error) {
p, err := pluginEntry.PluginLoader(ctx, iCtx)
if err != nil {
return nil, err
}
err = validateConfig(p.GetConfig())
if err != nil {
return nil, fmt.Errorf("config validation failed. Error: %w", err)
}
// If the plugin will use a custom state, register it to be able to
// serialize/deserialize interfaces later.
if customState := p.GetConfig().ResourceMeta; customState != nil {
gob.Register(customState)
}
if quotas := p.GetConfig().ResourceQuotas; len(quotas) > 0 {
for ns, quota := range quotas {
err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, ns, quota)
if err != nil {
return nil, err
}
}
}
resourceCache, err := NewResourceCache(ctx, pluginEntry.ID, p, p.GetConfig().Caching,
iCtx.MetricsScope().NewSubScope("cache"))
if err != nil {
return nil, err
}
err = resourceCache.Start(ctx)
if err != nil {
return nil, err
}
return CorePlugin{
id: pluginEntry.ID,
p: p,
cache: resourceCache,
metrics: newMetrics(iCtx.MetricsScope()),
tokenAllocator: newTokenAllocator(c),
}, nil
},
}
}

Signed-off-by: Future Outlier <eric901201@gmai.com>
@Future-Outlier Future-Outlier marked this pull request as draft September 30, 2023 14:37
@Future-Outlier Future-Outlier changed the title sync plugin idl Agent Sync Plugin Sep 30, 2023
@codecov
Copy link

codecov bot commented Oct 2, 2023

Codecov Report

Attention: 3 lines in your changes are missing coverage. Please review.

Comparison is base (1e397e6) 59.02% compared to head (36742d9) 59.03%.

Files Patch % Lines
...s/go/tasks/pluginmachinery/internal/webapi/core.go 0.00% 2 Missing ⚠️
...yteplugins/go/tasks/plugins/webapi/agent/plugin.go 92.30% 1 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##           master    #4107   +/-   ##
=======================================
  Coverage   59.02%   59.03%           
=======================================
  Files         622      622           
  Lines       52780    52793   +13     
=======================================
+ Hits        31153    31164   +11     
- Misses      19140    19142    +2     
  Partials     2487     2487           
Flag Coverage Δ
unittests 59.03% <80.00%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Future Outlier added 2 commits October 2, 2023 16:38
Signed-off-by: Future Outlier <eric901201@gmai.com>
Signed-off-by: Future Outlier <eric901201@gmai.com>
func (c CorePlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) {
taskTemplate, err := tCtx.TaskReader().Read(ctx)

if taskTemplate.Type == "dispatcher" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't taskTemplate.Type being used for agent routing in in agent/plugin.go?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, taskTemplate.Type is indeed utilized there.
However, we also need to ensure the plugin is directed to use the sync plugin interface.
This is why we have to make the routing here.

I chose not to incorporate a cache mechanism for the sync plugin.
And all cases of do task can go through the dispatcher task type.
You can refer this pr: flyteorg/flytekit#1822

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for replying.

I'm not sure how routing is supposed to be done for a sync agent. If I understood correctly, it would be utilizing custom of task template: https://github.com/flyteorg/flytekit/pull/1822/files#diff-cac07a79ac0ced0778cd505ccfb5f8f1b4e33f78e1c40a91bba926171f9d2246R39-R41, is that correct?

I'm also confused by https://github.com/flyteorg/flytekit/pull/1822/files#diff-9f7af27264f8773b069e8200804c224fe19a6fcaaf9dc33edc644f5351cbb3beR137. Does it mean you are assuming there is at most one "dispatcher" (i.e task_template.type == "dispatcher") agent connected to flyte? If so, I think this is a very strong assumption which at least won't work in our setup, because we won't have a centralized agent but multiple ones developed by different teams.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could extend task template proto to include a new field indicating it is a "dispatcher", or encode this information in custom with a preserved key, so here we don't need to rely on type, and all the routing is handled by agent plugin.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Yes you are right!
    In the case of the chatgpt task, the task type is designated as dispatcher. We then utilize the ChatGPTDispatcher as specified in the configuration.

  2. To illustrate a practical scenario:
    Consider a situation where there are two dispatchers - the ChatGPTDispatcher and the LangChainDispatcher.
    When either is invoked to call the agent-service, the task type remains dispatcher.
    And both of them can be used at the same time with only one server!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More clarification: I meant we have multiple gRPC endpoints, with everything being "dispatcher", agent plugin will not be able to know which endpoint to route to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any new development on this? I still think it is not a safe assumption that there is one and only one "dispatcher" typed agent endpoint in the system.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pingsutw and I is currently trying to fix this!
Maybe we can have a discussion on slack.
And yes, you are right, we will improve the implementation of it.
We will definitely not use only one "dispatcher" typed agent endpoint in the system.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is great! Sure please feel free to pull me to a Slack chat and I will follow up tomorrow. Sorry I am not able to discuss today.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, I will book a time with you, thanks for your attention!

Future Outlier and others added 10 commits October 3, 2023 12:12
Signed-off-by: Future Outlier <eric901201@gmai.com>
… agent-sync-plugin

Signed-off-by: Future Outlier <eric901201@gmai.com>
Signed-off-by: Future Outlier <eric901201@gmai.com>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
… agent-sync-plugin

Signed-off-by: Future Outlier <eric901201@gmai.com>
Signed-off-by: Future Outlier <eric901201@gmai.com>
Signed-off-by: Future Outlier <eric901201@gmai.com>
@Future-Outlier Future-Outlier marked this pull request as ready for review October 17, 2023 04:45
@Future-Outlier
Copy link
Member Author

@honnix if possible, please take a look at this PR and the relevant flytekit PR.
Thanks a lot!
flyteorg/flytekit#1822
Feel free to mention us to improve the PR, thanks really much!

@@ -20,7 +20,7 @@ import (

// A Lazy loading function, that will load the plugin. Plugins should be initialized in this method. It is guaranteed
// that the plugin loader will be called before any Handle/Abort/Finalize functions are invoked
type PluginLoader func(ctx context.Context, iCtx PluginSetupContext) (AsyncPlugin, error)
type PluginLoader func(ctx context.Context, iCtx PluginSetupContext) (Plugin, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a breaking change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, thanks for the review.
with this change, users who previously used AsyncPlugin will now need to modify their code in the same way as the BigQuery plugin does.
BigQuery example: https://github.com/flyteorg/flyte/pull/4107/files#diff-6a9d13d1a2c7f030da6eaa91b837eee26b780d7c26e1084d1a76f2349977ee2dR562

I get it might feel a bit forceful making users adjust their private plugins. But I genuinely think this route is smoother. Going the other way, like suggesting a change where existing plugins (take BigQuery, for example) need deeper tweaks, doesn't seem as clean. Imagine this:
type PluginLoader func(ctx context.Context, iCtx PluginSetupContext) (AsyncPlugin, SyncPlugin, error)
Here's the previous version:
adc1c2c#diff-3119631e7cd5a6d4fb999fc50bbb8509bb685347c4a6e3cceda14acec6aa0eb3R25

In comparison, the current plugin interface is far more easier for users to tweak their code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in general we should be very cautious with breaking changes given the number of flyte deployments and possible plugins in the wild. Unless it is absolutely necessary, we may not want to do that. This case to me doesn't seem to be so convincing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered this approach?

// Deprecated, please use AsyncPluginLoader instead for new plugin development
type PluginLoader func(ctx context.Context, iCtx PluginSetupContext) (AsyncPlugin, error)

type AsyncPluginLoader func(ctx context.Context, iCtx PluginSetupContext) (AsyncPlugin, error)
type SyncPluginLoader func(ctx context.Context, iCtx PluginSetupContext) (SyncPlugin, error)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good advice, looking

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not take my words as the proposal, I was just trying to explore how to ship this feature without introducing breaking changes, because I believe we should be able to achieve that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your thought is pretty awesome, I didn't think of this solution before.
I will discuss this with @pingsutw and tell you our thoughts.
If the implementation doesn't work, we will tell you why, and we will update the latest discussion with you tomorrow, thanks a lot for the review!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @honnix
Sorry for the late reply.
Your proposal works, but @pingsutw thought that we can come out a better solution to make the interface better and prettier, so please give us sometime and we will tell you our new implementation!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we face a problem that one of flytekit sync plugin, chatgpt, will need more than 10 seconds for the reply if we use openai API and gpt-4 as the model.
However, the grpc agent server shouldn't wait that long, so we need to either
add a new server to handle a task like this, or we need to
set the timeout limit in the request to the openai server.

If you have time, can you help us to think about the solution?
I think you are really worth learning for me, thanks a lot!
If you don't have time, I can understand your difficulty, since you might be busy.

The below are the 2 solutions' example:

(solution 1) add a new server
CREATE:

  • return a task ID to get the result from the API_TASK server,
  • API_TASK server create a thread to execute OPENAI REST API REQUEST

GET:

  • Use the task ID to check if the thread finish the execute, and store the output to a place.
  • if finished
    return result
    else
    return RUNNING

DELETE:

  • Kill the Thread

image

(solution 2) set the timeout limit in the request

timeout = aiohttp.ClientTimeout(total=10)

async with aiohttp.ClientSession(timeout=timeout) as session:
    async with session.post(
        url=openai_url, headers=get_header(openai_organization=self._openai_organization), data=data
    ) as resp:
        if resp.status != 200:
            raise Exception(f"Failed to execute chatgpt job with error: {resp.reason}")
        response = await resp.json()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will solve this in the future, reviewers can ignore this now.

Future Outlier added 2 commits October 26, 2023 11:01
Signed-off-by: Future Outlier <eric901201@gmai.com>

// Use the sync plugin to execute the task if the task template has the sync plugin flavor.
if taskTemplate.GetMetadata().GetRuntime().GetFlavor() == syncPlugin {
phaseInfo, err := c.p.(webapi.SyncPlugin).Do(ctx, tCtx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if the contract is broken? For example the tasks does not correctly set the flavour that is expected by the plugin?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task will be executed by AsyncPlugin.
It is implemented by flytekit this PR.
We will set the variable use_sync_plugin to True.
https://github.com/flyteorg/flytekit/pull/1822/files#diff-63787311eeb5747eb3170136f7c05abe69fe01da23d94222a39baf798273e687R20-R52
And then eventually set the flavor variable to here.
https://github.com/flyteorg/flytekit/pull/1822/files#diff-d123cf5b0acf27c386c1ceb74fd4b0de0775378fe1864c76908b5a334f58fffaR173-R440

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what's happening in flytekit, but there are also tasks not developed in the flytekit repository so we cannot assume they would all be designed correctly. The backend side should be resilient to this type of contract-breaking situation. I was just wondering what if task doesn't set the flag while the plugin handling the task expects that, and vice versa.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the agent plugin task will return an error.
As we know, flytepropeller will call the function invokePlugin, and if the plugin is agent service, it will send a grpc request to agent server, start by pyflyte serve.
I think we can write 2 things to help users understand how to use the sync and async plugins easily.

  1. add logger.errorf when the plugin doesn't execute successfully, tell users to check whether they use the correct ones or not.
  2. write more annotation for the function and the use_sync_plugin variable in both flytekit and flyteplugins.

Do you think this is a good solution?
Or there's any other ideas?
I would like to improve it, thanks a lot for your time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure we should provide good ways for users to figured out the error.

Maybe a more concrete question, what happens if the cast c.p.(webapi.SyncPlugin) fails here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't try it before, but I totally agree that we should add a cast to check if it is correct!
Maybe something like this. (please ignore the implementation error here)

plugin, err := c.p.(webapi.SyncPlugin)
if err != nil {
                       logger.Errorf("please check if the sync plugin interface is implemented or not")
			return core.UnknownTransition, err
		}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is the direction to go, and there might be other things to consider as well. Generally, if there exists a contract we need to be more protective on the backend side to avoid a total failure and have a better isolation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot, as a backend beginner, I really appreciate your advice!

Future Outlier added 4 commits November 7, 2023 16:24
Signed-off-by: Future Outlier <eric901201@gmai.com>
Signed-off-by: Future Outlier <eric901201@gmai.com>
nit
Signed-off-by: Future Outlier <eric901201@gmai.com>
Signed-off-by: Future Outlier <eric901201@gmai.com>
@Future-Outlier Future-Outlier marked this pull request as draft December 16, 2023 14:55
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
@Future-Outlier
Copy link
Member Author

Future-Outlier commented Dec 19, 2023

pyflyte run --remote --image futureoutlier/flytekit:chatgpt-0306 chatgpt_example.py wf

image

from flytekit import task, workflow, ImageSpec
from flytekitplugins.chatgpt import ChatGPTTask

chatgpt_job = ChatGPTTask(
    name="chatgpt",
    config={
        "openai_organization": "org-NayNG68kGnVXMJ8Ak4PMgQv7",
        "chatgpt_conf": {
            "model": "gpt-3.5-turbo",
            "temperature": 0.7,
        },
    },
)

@workflow
def wf() -> str:
    message = chatgpt_job(message="Who are you?")
    return t1(s=message)

@task(
    # container_image=image_spec,
)
def t1(s: str) -> str:
    s = "Repsonse: " + s
    return s

Future Outlier and others added 2 commits December 19, 2023 15:50
Signed-off-by: Future Outlier <eric901201@gmai.com>
@Future-Outlier Future-Outlier marked this pull request as ready for review December 19, 2023 07:52
@dosubot dosubot bot added the enhancement New feature or request label Dec 19, 2023
Future-Outlier and others added 2 commits December 19, 2023 23:33
Signed-off-by: Future Outlier <eric901201@gmai.com>
Signed-off-by: Future Outlier <eric901201@gmai.com>
Future Outlier added 4 commits December 20, 2023 11:30
Signed-off-by: Future Outlier <eric901201@gmai.com>
Signed-off-by: Future Outlier <eric901201@gmai.com>
Signed-off-by: Future Outlier <eric901201@gmai.com>
… agent-sync-plugin

Signed-off-by: Future Outlier <eric901201@gmai.com>
@Future-Outlier
Copy link
Member Author

Future-Outlier commented Dec 20, 2023

update:
sync task
image
async task
image

Future Outlier and others added 2 commits December 20, 2023 18:13
Signed-off-by: Future Outlier <eric901201@gmai.com>
Signed-off-by: Kevin Su <pingsutw@apache.org>
pingsutw
pingsutw previously approved these changes Dec 20, 2023
@Future-Outlier
Copy link
Member Author

I just test it again, it is correct.
image

@pingsutw pingsutw merged commit 6a7e620 into flyteorg:master Dec 20, 2023
45 checks passed
@Future-Outlier Future-Outlier deleted the agent-sync-plugin branch April 23, 2024 10:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request size:XXL This PR changes 1000+ lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants