Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Jupyter run #1742

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ jobs:
- name: Install dependencies
run: |
make setup
# remove this when https://github.com/flyteorg/flytekit/pull/1746 is merged
pip install marshmallow-enum
pip freeze
- name: Test with coverage
run: |
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ htmlcov
*dat
docs/source/_tags/
.hypothesis
!workflow.ipynb
57 changes: 57 additions & 0 deletions flyte_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import pandas as pd
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression

from flytekit import task, workflow


@task
def get_data() -> pd.DataFrame:
"""Get the wine dataset."""
return load_wine(as_frame=True).frame


@task
def process_data(data: pd.DataFrame) -> pd.DataFrame:
"""Simplify the task from a 3-class to a binary classification problem."""
return data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))


@task
def train_model(data: pd.DataFrame, hyperparameters: dict) -> LogisticRegression:
"""Train a model on the wine dataset."""
features = data.drop("target", axis="columns")
target = data["target"]
return LogisticRegression(max_iter=3000, **hyperparameters).fit(features, target)


@workflow
def training_workflow(hyperparameters: dict) -> LogisticRegression:
"""Put all of the steps together into a single workflow."""
data = get_data()
processed_data = process_data(data=data)
return train_model(
data=processed_data,
hyperparameters=hyperparameters,
)


if __name__ == "__main__":
from flytekit.configuration import Config
from flytekit.remote import FlyteRemote

remote = FlyteRemote(
Config.for_sandbox(),
default_project="flytesnacks",
default_domain="development",
)

wf = remote.register_script(
training_workflow,
source_path=".",
module_name="flyte_workflow",
)

print(f"registered workflow {wf.id}")
execution = remote.execute(wf, inputs={"hyperparameters": {"C": 0.1}})
print(remote.generate_console_url(execution))
10 changes: 2 additions & 8 deletions flytekit/core/python_auto_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,8 @@ def load_task(self, loader_args: List[str]) -> PythonAutoContainerTask:
return task_def

def loader_args(self, settings: SerializationSettings, task: PythonAutoContainerTask) -> List[str]: # type:ignore
from flytekit.core.python_function_task import PythonFunctionTask

if isinstance(task, PythonFunctionTask):
_, m, t, _ = extract_task_module(task.task_function)
return ["task-module", m, "task-name", t]
if isinstance(task, TrackedInstance):
_, m, t, _ = extract_task_module(task)
return ["task-module", m, "task-name", t]
_, m, t, _ = extract_task_module(task)
return ["task-module", m, "task-name", t]

def get_all_tasks(self) -> List[PythonAutoContainerTask]: # type: ignore
raise Exception("should not be needed")
Expand Down
9 changes: 9 additions & 0 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ def execution_mode(self) -> ExecutionBehavior:
def task_function(self):
return self._task_function

@property
def name(self) -> str:
"""
Returns the name of the task.
"""
if self.instantiated_in and self.instantiated_in not in self._name:
return f"{self.instantiated_in}.{self._name}"
return self._name

def execute(self, **kwargs) -> Any:
"""
This method will be invoked to execute the task. If you do decide to override this method you must also
Expand Down
114 changes: 94 additions & 20 deletions flytekit/core/tracker.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
import importlib
import importlib as _importlib
import inspect
import inspect as _inspect
import os
import sys
import typing
from pathlib import Path
from types import ModuleType
from typing import Callable, Tuple, Union
from typing import Callable, Optional, Tuple, Union

from flytekit.configuration.feature_flags import FeatureFlags
from flytekit.exceptions import system as _system_exceptions
from flytekit.loggers import logger


def import_module_from_file(module_name, file):
try:
spec = importlib.util.spec_from_file_location(module_name, file)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
except AssertionError:
# handle where we can't determine the module of functions within the module
return importlib.import_module(module_name)
except Exception as exc:
raise ModuleNotFoundError(f"Module from file {file} cannot be loaded") from exc


class InstanceTrackingMeta(type):
"""
Please see the original class :py:class`flytekit.common.mixins.registerable._InstanceTracker` also and also look
Expand All @@ -22,18 +35,49 @@ class InstanceTrackingMeta(type):
variable that the instance was assigned to.
"""

@staticmethod
def _get_module_from_main(globals) -> Optional[str]:
curdir = Path.cwd()
file = globals.get("__file__")
if file is None:
return None

file = Path(file)
try:
file_relative = file.relative_to(curdir)
except ValueError:
return None

module_components = [*file_relative.with_suffix("").parts]
module_name = ".".join(module_components)
if len(module_components) == 0:
return None

# make sure current directory is in the PYTHONPATH.
sys.path.insert(0, str(curdir))
return import_module_from_file(module_name, file)

@staticmethod
def _find_instance_module():
frame = _inspect.currentframe()
frame = inspect.currentframe()
while frame:
if frame.f_code.co_name == "<module>" and "__name__" in frame.f_globals:
return frame.f_globals["__name__"]
if frame.f_globals["__name__"] != "__main__":
return frame.f_globals["__name__"], frame.f_globals["__file__"]
# if the remote_deploy command is invoked in the same module as where
# the app is defined, get the module from the file name
mod = InstanceTrackingMeta._get_module_from_main(frame.f_globals)
if mod is None:
return None, None
return mod.__name__, mod.__file__
frame = frame.f_back
return None
return None, None

def __call__(cls, *args, **kwargs):
o = super(InstanceTrackingMeta, cls).__call__(*args, **kwargs)
o._instantiated_in = InstanceTrackingMeta._find_instance_module()
mod_name, mod_file = InstanceTrackingMeta._find_instance_module()
o._instantiated_in = mod_name
o._module_file = mod_file
return o


Expand All @@ -51,6 +95,7 @@ class TrackedInstance(metaclass=InstanceTrackingMeta):

def __init__(self, *args, **kwargs):
self._instantiated_in = None
self._module_file = None
self._lhs = None
super().__init__(*args, **kwargs)

Expand All @@ -77,7 +122,7 @@ def find_lhs(self) -> str:
raise _system_exceptions.FlyteSystemException(f"Object {self} does not have an _instantiated in")

logger.debug(f"Looking for LHS for {self} from {self._instantiated_in}")
m = _importlib.import_module(self._instantiated_in)
m = importlib.import_module(self._instantiated_in)
for k in dir(m):
try:
if getattr(m, k) is self:
Expand All @@ -92,6 +137,29 @@ def find_lhs(self) -> str:
# continue looping through m.
logger.warning("Caught ValueError {} while attempting to auto-assign name".format(err))

# try to find object in module when the tracked instance is defined in the __main__ module
module = import_module_from_file(self._instantiated_in, self._module_file)

def _candidate_name_matches(candidate) -> bool:
if not hasattr(candidate, "name") or not hasattr(self, "name"):
return False
return candidate.name == self.name

for k in dir(module):
try:
candidate = getattr(module, k)
# consider the variable equivalent to self if it's of the same type, name
if (
type(candidate) == type(self)
and _candidate_name_matches(candidate)
and candidate.instantiated_in == self.instantiated_in
):
self._lhs = k
return k
except ValueError as err:
logger.warning(f"Caught ValueError {err} while attempting to auto-assign name")
pass

logger.error(f"Could not find LHS for {self} in {self._instantiated_in}")
raise _system_exceptions.FlyteSystemException(f"Error looking for LHS in {self._instantiated_in}")

Expand Down Expand Up @@ -216,6 +284,13 @@ def get_absolute_module_name(self, path: str, package_root: typing.Optional[str]
_mod_sanitizer = _ModuleSanitizer()


def _task_module_from_callable(f: Callable):
mod = inspect.getmodule(f)
mod_name = getattr(mod, "__name__", f.__module__)
name = f.__name__.split(".")[-1]
return mod, mod_name, name


def extract_task_module(f: Union[Callable, TrackedInstance]) -> Tuple[str, str, str, str]:
"""
Returns the task-name, absolute module and the string name of the callable.
Expand All @@ -224,21 +299,20 @@ def extract_task_module(f: Union[Callable, TrackedInstance]) -> Tuple[str, str,
"""

if isinstance(f, TrackedInstance):
mod = importlib.import_module(f.instantiated_in)
mod_name = mod.__name__
name = f.lhs
# We cannot get the sourcefile for an instance, so we replace it with the module
g = mod
inspect_file = inspect.getfile(g)
if f.instantiated_in:
mod = importlib.import_module(f.instantiated_in)
mod_name = mod.__name__
name = f.lhs
elif hasattr(f, "task_function"):
mod, mod_name, name = _task_module_from_callable(f.task_function)
else:
mod = inspect.getmodule(f) # type: ignore
if mod is None:
raise AssertionError(f"Unable to determine module of {f}")
mod_name = mod.__name__
name = f.__name__.split(".")[-1]
inspect_file = inspect.getfile(f)
mod, mod_name, name = _task_module_from_callable(f)

if mod is None:
raise AssertionError(f"Unable to determine module of {f}")

if mod_name == "__main__":
inspect_file = inspect.getfile(f) # type: ignore
return name, "", name, os.path.abspath(inspect_file)

mod_name = get_full_module_path(mod, mod_name)
Expand Down
47 changes: 47 additions & 0 deletions run_notebook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import json
import os
import subprocess

import click


@click.command(
context_settings=dict(
ignore_unknown_options=True,
)
)
@click.option("--remote", is_flag=True, default=False)
@click.argument("notebook")
@click.argument("pyflyte_run_args", nargs=-1, type=click.UNPROCESSED)
def run_notebook(remote, notebook, pyflyte_run_args):
with open(notebook) as f:
nb_raw = json.load(f)

src = []
for cell in nb_raw["cells"]:
if (
cell["cell_type"] != "code"
# remove notebook-mode cells
or "%%notebook_mode" in "".join(cell["source"])
):
continue

# remove notebook-mode import
lines = [l for l in cell["source"] if "notebook_mode" not in l]
src.extend([*lines, "\n\n"])
src = "".join(src)

script = notebook.replace(".ipynb", ".py")
with open(script, "w") as f:
f.write(src)

args = ["--remote"] if remote else []
args.extend([script, *pyflyte_run_args])
try:
subprocess.run(["pyflyte", "run", *args])
finally:
os.remove(script)


if __name__ == "__main__":
run_notebook()