Skip to content

Commit

Permalink
sync plugin, DoTask function interface
Browse files Browse the repository at this point in the history
Signed-off-by: Future Outlier <eric901201@gmai.com>
  • Loading branch information
Future Outlier committed Sep 7, 2023
1 parent 25d6a5d commit 970bf3b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
25 changes: 25 additions & 0 deletions flytekit/extend/backend/agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,28 @@ async def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerCon
logger.error(f"failed to delete task with error {e}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to delete task with error {e}")

async def DoTask(self, request: CreateTaskRequest, context: grpc.ServicerContext) -> GetTaskResponse:
try:
tmp = TaskTemplate.from_flyte_idl(request.template)
inputs = LiteralMap.from_flyte_idl(request.inputs) if request.inputs else None
agent = AgentRegistry.get_agent(context, tmp.type)
logger.info(f"{agent.task_type} agent start doing the job")

Check warning on line 100 in flytekit/extend/backend/agent_service.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extend/backend/agent_service.py#L95-L100

Added lines #L95 - L100 were not covered by tests
if agent.asynchronous:
try:
return await agent.async_do(

Check warning on line 103 in flytekit/extend/backend/agent_service.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extend/backend/agent_service.py#L102-L103

Added lines #L102 - L103 were not covered by tests
context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp
)
except Exception as e:
logger.error(f"failed to run async do with error {e}")
raise e
try:
return await asyncio.to_thread(

Check warning on line 110 in flytekit/extend/backend/agent_service.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extend/backend/agent_service.py#L106-L110

Added lines #L106 - L110 were not covered by tests
agent.do, context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp
)
except Exception as e:
logger
except Exception as e:
logger.error(f"failed to do task with error {e}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to create task with error {e}")

Check warning on line 118 in flytekit/extend/backend/agent_service.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extend/backend/agent_service.py#L113-L118

Added lines #L113 - L118 were not covered by tests
24 changes: 24 additions & 0 deletions flytekit/extend/backend/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ def delete(self, context: grpc.ServicerContext, resource_meta: bytes) -> DeleteT
"""
raise NotImplementedError

def do(
self,
context: grpc.ServicerContext,
output_prefix: str,
task_template: TaskTemplate,
inputs: typing.Optional[LiteralMap] = None,
) -> CreateTaskResponse:
"""
Return the result of executing a task. It should return error code if the task creation failed.
"""
raise NotImplementedError

Check warning on line 92 in flytekit/extend/backend/base_agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extend/backend/base_agent.py#L92

Added line #L92 was not covered by tests

async def async_create(
self,
context: grpc.ServicerContext,
Expand All @@ -105,6 +117,18 @@ async def async_delete(self, context: grpc.ServicerContext, resource_meta: bytes
"""
raise NotImplementedError

def async_do(
self,
context: grpc.ServicerContext,
output_prefix: str,
task_template: TaskTemplate,
inputs: typing.Optional[LiteralMap] = None,
) -> CreateTaskResponse:
"""
Return the result of executing a task. It should return error code if the task creation failed.
"""
raise NotImplementedError

Check warning on line 130 in flytekit/extend/backend/base_agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extend/backend/base_agent.py#L130

Added line #L130 was not covered by tests


class AgentRegistry(object):
"""
Expand Down

0 comments on commit 970bf3b

Please sign in to comment.