Skip to content

Commit

Permalink
commit 0
Browse files Browse the repository at this point in the history
- unfinished code.
  • Loading branch information
valayDave committed Mar 2, 2022
1 parent 3e69f3f commit b176311
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 0 deletions.
2 changes: 2 additions & 0 deletions metaflow/plugins/__init__.py
Expand Up @@ -79,6 +79,7 @@ def get_plugin_cli():
from .aws.batch import batch_cli
from .aws.eks import kubernetes_cli
from .aws.step_functions import step_functions_cli
from .airflow import airflow_cli
from .cards import card_cli

return _ext_plugins["get_plugin_cli"]() + [
Expand All @@ -87,6 +88,7 @@ def get_plugin_cli():
card_cli.cli,
kubernetes_cli.cli,
step_functions_cli.cli,
airflow_cli.cli,
]


Expand Down
Empty file.
36 changes: 36 additions & 0 deletions metaflow/plugins/airflow/airflow.py
@@ -0,0 +1,36 @@
class Airflow(object):
def __init__(
self,
name,
graph,
flow,
code_package_sha,
code_package_url,
metadata,
flow_datastore,
environment,
event_logger,
monitor,
tags=None,
namespace=None,
username=None,
max_workers=None,
is_project=False,
):
self.name = name
self.graph = graph
self.flow = flow
self.code_package_sha = code_package_sha
self.code_package_url = code_package_url
self.metadata = metadata
self.flow_datastore = flow_datastore
self.environment = environment
self.event_logger = event_logger
self.monitor = monitor
self.tags = tags
self.namespace = namespace
self.username = username
self.max_workers = max_workers

def compile(self):
pass
91 changes: 91 additions & 0 deletions metaflow/plugins/airflow/airflow_cli.py
@@ -0,0 +1,91 @@
from metaflow._vendor import click
from metaflow import decorators
from metaflow.util import get_username
from metaflow.package import MetaflowPackage
from .airflow import Airflow


@click.group()
def cli():
pass


@cli.group(help="Commands related to Airflow.")
@click.pass_context
def airflow(ctx):
pass


def make_flow(obj, name, tags, namespace, worker_pools, is_project):
# Attach AWS Batch decorator to the flow
decorators._init_step_decorators(
obj.flow, obj.graph, obj.environment, obj.flow_datastore, obj.logger
)

obj.package = MetaflowPackage(
obj.flow, obj.environment, obj.echo, obj.package_suffixes
)
package_url, package_sha = obj.flow_datastore.save_data(
[obj.package.blob], len_hint=1
)[0]
return Airflow(
name,
obj.graph,
obj.flow,
package_sha,
package_url,
obj.metadata,
obj.flow_datastore,
obj.environment,
obj.event_logger,
obj.monitor,
tags=tags,
namespace=namespace,
max_workers=worker_pools,
username=get_username(),
is_project=is_project,
)


@airflow.command(help="Create an airflow workflow from this metaflow workflow")
@click.option(
"--tag",
"tags",
multiple=True,
default=None,
help="Annotate all objects produced by AWS Step Functions runs "
"with the given tag. You can specify this option multiple "
"times to attach multiple tags.",
)
@click.option(
"--namespace",
"user_namespace",
default=None,
)
@click.option(
"--only-json",
is_flag=True,
default=False,
help="Only print out JSON",
)
@click.option(
"--worker-pools",
default=100,
show_default=True,
)
@click.pass_obj
def create(
obj,
tags=None,
user_namespace=None,
only_json=False,
worker_pools=None,
):
flow = make_flow(
obj,
obj.state_machine_name,
tags,
user_namespace,
worker_pools,
obj.is_project,
)

0 comments on commit b176311

Please sign in to comment.