Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOM-52828] Add interface vals to the domino task #194

Conversation

ddl-ryan-connor
Copy link
Collaborator

Link to JIRA

https://dominodatalab.atlassian.net/browse/DOM-52824

What issue does this pull request solve?

Flyte input downloader init container and output uploader sidecar container need the interfaces for the input/output provided as args to their container commands in base64 string encoded format.

What is the solution?

After instantiating the DominoTask, serialize the tasks input/output interfaces to base64 string and provide those as job config so they can be sent along to the job api by the flyte agent.

Testing

Tested the serialized args produced from the python code with an arbitrary domino job that uses the init and sidecar containers, and placed data matching the interface at the expected input location in s3. Init sidecar downloaded the data, the code used the data and saved to the local output dir, then the sidecar successfully uploaded the output to s3.

Pull Request Reminders

References (optional)

@@ -149,6 +170,7 @@ def _resolve_job_properties(self, domino_job_config: DominoJobConfig) -> Dict[st
resolved_job_config["apiKey"] = domino_job_config.ApiKey
resolved_job_config["command"] = domino_job_config.Command
resolved_job_config["title"] = domino_job_config.Title
resolved_job_config["pipelineConfig"] = asdict(PipelineConfig())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we just return a dict with no required keys and you're just inserting an empty PipelineConfig its probably fine to omit this line altogether.

Comment on lines 137 to 143
serialized_input_interface = task._interface.to_flyte_idl().inputs.SerializeToString()
serialized_input_output_interface = job._interface.to_flyte_idl().SerializeToString()
pipelineConfig = PipelineConfig(
InputInterfaceBase64=base64.b64encode(serialized_input_interface),
InputOutputInterfaceBase64=base64.b64encode(serialized_input_output_interface),
)
task.task_config.PipelineConfig = pipelineConfig
Copy link
Contributor

@noahjax noahjax Jan 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like you could create the Interface you use here before you call super.__init__() and avoid having to update the task after it is created. Not sure if it matters either way.

With my recent changes task.task_config is no longer strongly typed, it's just a dict, so you'll need to update it differently. Sorry about that, the multiple places where we needed to serialize and deserialize the nested dicts was giving me a headache so using a dict internally seemed like the easier option. This probably means you don't need the PipelineConfig class since it isn't user facing and we are using a dict internally anyways.

Also, is there a reason that one of these uses task._interface and the other uses job._interface? I'm not sure where job is defined here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to the job question -- it was a copy/paste messup from my other code -- will change to task

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i re-worked this to address your recent changes (also I forgot that the __init__ methods dont return anything, can just work with self, so fixed that).

agree ideally it would be better to get the base64 interface vals first and not modify, but i cant see an easy way to do that, made a comment about it.

domino/flyte/task.py Outdated Show resolved Hide resolved
domino/flyte/task.py Outdated Show resolved Hide resolved
Comment on lines 137 to 138
serialized_input_interface = self._interface.to_flyte_idl().inputs.SerializeToString()
serialized_input_output_interface = self._interface.to_flyte_idl().SerializeToString()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between serialized_input_interface and serialized_input_output_interface? Seems like they are identical

Copy link
Collaborator Author

@ddl-ryan-connor ddl-ryan-connor Jan 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are different: one is the input interface, and one is both the input and output interface.

this is how the flyte containers expect the args.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, am illiterate

@ddl-ryan-connor ddl-ryan-connor merged commit 99e0d66 into noahjax.DOM-52828.define-domino-job-flyte-task Jan 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants