Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement --export-slice-airflow #320

Merged
merged 16 commits into from
Oct 25, 2021
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ housing.csv
# Result of --export-slice
sliced_housing.py

# Result of --export-slice-to-airflow-dag
sliced_housing_dag.py

tracer
tracer.pdf
devcontainer.json
13 changes: 13 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@
"tests/housing.py",
],
},
{
"name": "lineapy --airflow",
"type": "python",
"request": "launch",
"module": "lineapy.cli.cli",
"args": [
"--slice",
"p value",
"--airflow",
"sliced_housing_dag",
"tests/housing.py",
],
},
{
"name": "Python: Current File",
"type": "python",
Expand Down
4 changes: 2 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"python.formatting.provider": "black",
"python.linting.mypyEnabled": true,
"python.linting.enabled": true,
"python.linting.enabled": false,
"python.testing.pytestEnabled": true,
"python.linting.flake8Enabled": true,
"editor.formatOnSave": true,
Expand All @@ -10,4 +10,4 @@
"source.organizeImports": true
}
}
}
}
13 changes: 13 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ jupyter nbconvert --to notebook --execute tests/test_notebook.ipynb --inplace --
Or you can open it in a notebook UI (JupyterLab, JupyterNotebook, VS Code, etc.)
and re-run it manually

### Airflow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should really be in the README.md, but since it's not ready for "release", maybe we can create another doc called EXPERIMENTAL.md or something and move this there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under impression that CONTRIBUTING.md serves this function.


Sliced code can be exported to an Airflow DAG using the following command:

```
lineapy tests/housing.py --slice "p value" --airflow sliced_housing_dag
```
This creates a `sliced_housing_dag.py` file in the current dir. It can be executed with:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this into a separate folder (when generating)? Cleaner organization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, how would you call it? output or export?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or something like airflow_jobs/airflow_dags?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this we are saving to the same dir where the original file is, which is tests for the housing.py. This seems to be OK for me.
I can change to any other place of course :)


```
airflow db init
airflow dags test sliced_housing_dag_dag $(date '+%Y-%m-%d') -S .
```
## Visual Graphs

Sometimes it's helpful to see a visual representation of the graph
Expand Down
22 changes: 21 additions & 1 deletion lineapy/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from lineapy.db.relational.db import RelationalLineaDB
from lineapy.instrumentation.tracer import Tracer
from lineapy.logging import configure_logging
from lineapy.plugins.airflow import sliced_aiflow_dag
from lineapy.transformer.node_transformer import transform
from lineapy.utils import prettify

Expand Down Expand Up @@ -39,6 +40,12 @@
default=None,
help="Requires --slice. Export the sliced code that {slice} depends on to {export_slice}.py",
)
@click.option(
"--export-slice-to-airflow-dag",
"--airflow",
default=None,
help="Requires --slice. Export the sliced code that {slice} depends on to an Airflow DAG {export_slice}.py",
)
@click.option(
"--print-source", help="Whether to print the source code", is_flag=True
)
Expand Down Expand Up @@ -66,6 +73,7 @@ def linea_cli(
mode,
slice,
export_slice,
export_slice_to_airflow_dag,
print_source,
print_graph,
verbose,
Expand Down Expand Up @@ -94,7 +102,8 @@ def linea_cli(

if visualize:
tracer.visualize()
if slice and not export_slice:

if slice and not export_slice and not export_slice_to_airflow_dag:
tree.add(
rich.console.Group(
f"Slice of {repr(slice)}",
Expand All @@ -109,6 +118,17 @@ def linea_cli(
full_code = tracer.sliced_func(slice, export_slice)
pathlib.Path(f"{export_slice}.py").write_text(full_code)

if export_slice_to_airflow_dag:
if not slice:
print(
"Please specify --slice. It is required for --export-slice-to-airflow-dag"
)
exit(1)
full_code = sliced_aiflow_dag(
tracer, slice, export_slice_to_airflow_dag
)
pathlib.Path(f"{export_slice_to_airflow_dag}.py").write_text(full_code)

tracer.db.close()
if print_graph:
graph_code = prettify(
Expand Down
18 changes: 9 additions & 9 deletions lineapy/db/relational/schema/relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def process_result_value(self, value, dialect):
return value


class SessionContextORM(Base):
class SessionContextORM(Base): # type: ignore
__tablename__ = "session_context"
id = Column(String, primary_key=True)
environment_type = Column(Enum(SessionType))
Expand All @@ -108,7 +108,7 @@ class SessionContextORM(Base):
execution_id = Column(String, ForeignKey("execution.id"))


class LibraryORM(Base):
class LibraryORM(Base): # type: ignore
__tablename__ = "library"
__table_args__ = (
UniqueConstraint(
Expand All @@ -125,7 +125,7 @@ class LibraryORM(Base):
path = Column(String)


class ArtifactORM(Base):
class ArtifactORM(Base): # type: ignore
"""
An artifact is a named pointer to a node.
"""
Expand All @@ -140,7 +140,7 @@ class ArtifactORM(Base):
)


class ExecutionORM(Base):
class ExecutionORM(Base): # type: ignore
"""
An execution represents one Python interpreter invocation of some number of nodes
"""
Expand All @@ -150,7 +150,7 @@ class ExecutionORM(Base):
timestamp = Column(DateTime, nullable=True, default=datetime.utcnow)


class NodeValueORM(Base):
class NodeValueORM(Base): # type: ignore
"""
A node value represents the value of a node during some execution.

Expand All @@ -170,7 +170,7 @@ class NodeValueORM(Base):
end_time = Column(DateTime, nullable=True)


class BaseNodeORM(Base):
class BaseNodeORM(Base): # type: ignore
"""
node.source_code has a path value if node.session.environment_type == "script"
otherwise the environment type is "jupyter" and it has a jupyter execution
Expand Down Expand Up @@ -214,7 +214,7 @@ class BaseNodeORM(Base):
}


class SourceCodeORM(Base):
class SourceCodeORM(Base): # type: ignore
__tablename__ = "source_code"

id = Column(String, primary_key=True)
Expand Down Expand Up @@ -261,7 +261,7 @@ class ImportNodeORM(BaseNodeORM):
# https://docs.sqlalchemy.org/en/14/orm/basic_relationships.html#association-object


class PositionalArgORM(Base):
class PositionalArgORM(Base): # type: ignore
__tablename__ = "positional_arg"
call_node_id: str = Column(
ForeignKey("call_node.id"), primary_key=True, nullable=False
Expand All @@ -273,7 +273,7 @@ class PositionalArgORM(Base):
argument = relationship(BaseNodeORM, uselist=False)


class KeywordArgORM(Base):
class KeywordArgORM(Base): # type: ignore
__tablename__ = "keyword_arg"
call_node_id: str = Column(
ForeignKey("call_node.id"), primary_key=True, nullable=False
Expand Down
33 changes: 21 additions & 12 deletions lineapy/instrumentation/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,9 @@ def artifacts(self) -> dict[str, str]:

def sliced_func(self, slice_name: str, func_name: str) -> str:
artifact = self.db.get_artifact_by_name(slice_name)
if not artifact.node:
artifact_var = self.slice_var_name(artifact)
if not artifact_var:
return "Unable to extract the slice"
_line_no = artifact.node.lineno if artifact.node.lineno else 0
artifact_line = str(artifact.node.source_code.code).split("\n")[
_line_no - 1
]
_col_offset = (
artifact.node.col_offset if artifact.node.col_offset else 0
)
if _col_offset < 3:
return "Unable to extract the slice"
artifact_name = artifact_line[: _col_offset - 3]
slice_code = get_program_slice(self.graph, [artifact.id])
# We split the code in import and code blocks and join them to full code test
import_block, code_block, main_block = split_code_blocks(
Expand All @@ -146,7 +137,7 @@ def sliced_func(self, slice_name: str, func_name: str) -> str:
import_block
+ "\n\n"
+ code_block
+ f"\n\treturn {artifact_name}"
+ f"\n\treturn {artifact_var}"
+ "\n\n"
+ main_block
)
Expand All @@ -163,6 +154,24 @@ def slice(self, name: str) -> str:
artifact = self.db.get_artifact_by_name(name)
return get_program_slice(self.graph, [artifact.id])

def slice_var_name(self, artifact: ArtifactORM) -> str:
"""
Returns the variable name for the given artifact.
i.e. in lineapy.linea_publish(p, "p value") "p" is returned
"""
if not artifact.node:
return ""
_line_no = artifact.node.lineno if artifact.node.lineno else 0
artifact_line = str(artifact.node.source_code.code).split("\n")[
_line_no - 1
]
_col_offset = (
artifact.node.col_offset if artifact.node.col_offset else 0
)
if _col_offset < 3:
return ""
return artifact_line[: _col_offset - 3]

def visualize(
self,
filename="tracer",
Expand Down
63 changes: 63 additions & 0 deletions lineapy/plugins/airflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from black import FileMode, format_str

from lineapy.graph_reader.program_slice import (
get_program_slice,
split_code_blocks,
)
from lineapy.instrumentation.tracer import Tracer

AIRFLOW_IMPORTS_TEMPLATE = """
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
"""

AIRFLOW_MAIN_TEMPLATE = """
default_dag_args = {"owner": "airflow", "retries": 2, "start_date": days_ago(1)}

dag = DAG(
dag_id="DAG_NAME_dag",
Copy link
Contributor

@yifanwu yifanwu Oct 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we have two artifacts? Will this logic break?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we can only slice one artifact through --slice, it produces code which we save as this DAG. If there are other artifacts in DB it won't be a problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm not following, do the dag_ids not need to be unique?

schedule_interval="*/15 * * * *", # Every 15 minutes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to figure out how to parametrize this for API design. I'm not sure about the workflow where the user goes in and manually modifies things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we need to think about the entire UX. We can't just keep adding options to CLI, right?
I think a config filler a set of configs are in order. I'd vote for YAML, but open to anything

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes let's talk about it this afternoon.

max_active_runs=1,
catchup=False,
default_args=default_dag_args,
)

DAG_NAME = PythonOperator(
dag=dag, task_id=f"DAG_NAME_task", python_callable=DAG_NAME,
)
"""


def sliced_aiflow_dag(tracer: Tracer, slice_name: str, func_name: str) -> str:
"""
Returns a an Airflow DAG of the sliced code.

:param tracer: the tracer object.
:param slice_name: name of the artifacts to get the code slice for.
:return: string containing the code of the Airflow DAG running this slice
"""
artifact = tracer.db.get_artifact_by_name(slice_name)
artifact_var = tracer.slice_var_name(artifact)
if not artifact_var:
return "Unable to extract the slice"
slice_code = get_program_slice(tracer.graph, [artifact.id])
# We split the code in import and code blocks and join them to full code test
import_block, code_block, main_block = split_code_blocks(
slice_code, func_name
)
full_code = (
import_block
+ "\n"
+ AIRFLOW_IMPORTS_TEMPLATE
+ "\n\n"
+ code_block
+ f"\n\tprint({artifact_var})" # TODO What to do with artifact_var in a DAG?
Copy link
Contributor

@yifanwu yifanwu Oct 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to discuss and design this in more detail before we can commit this PR. @marov you are better equipped than us to help design what to do with this artifact in a deployed airflow process. Let's look at some reference cases together on our next meeting (Friday).

+ "\n\n"
+ AIRFLOW_MAIN_TEMPLATE.replace("DAG_NAME", func_name)
)
# Black lint
black_mode = FileMode()
black_mode.line_length = 79
full_code = format_str(full_code, mode=black_mode)
return full_code
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ relative_files = true

# https://docs.sqlalchemy.org/en/14/orm/extensions/mypy.html
# https://pydantic-docs.helpmanual.io/mypy_plugin/#enabling-the-plugin
plugins = ["sqlalchemy.ext.mypy.plugin", "pydantic.mypy"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we try just removing the sqlalchemy plugin but keeping the mypy one? And commenting that its disabled until we can add sqlalchemy >= 1.4, which adds mypy support, but is blocked on flask-appbuilder, which is a dep of airflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do that. However, as I've mentioned to @yifanwu today, Airflow is but one of the data tools that linea can have "plugins" to.
New tools will come with new dependencies and more issues like this.
Hence I propose:

  • Merge this PR (removing sqlalchemy plugin and leaving a TODO in comments)
  • Next create in a different repo (or just a folder?) a place for external plugins with examples and tests
  • Each of the "plugins" will have its own setup.py decoupling it from the rest
  • Is Jupyter also falls in this category? One can argue both ways - we can say that Jupyter support is the core feature of lineapy, but I'd argue that it lineapy can work without Jupyter. And Jupyter is not the only notebook server either (Zeppelin etc.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next create in a different repo (or just a folder?) a place for external plugins with examples and tests

I am +1 on monorepos when the code is tightly coupled!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, @yifanwu @saulshanabrook how shall we proceed?

# plugins = ["sqlalchemy.ext.mypy.plugin", "pydantic.mypy"]


# Enable function body type checking, even if function types are not annotated
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def version(path):
"coveralls",
"seaborn",
"graphviz",
"apache-airflow==2.2.0",
]
},
include_package_data=True,
Expand Down
35 changes: 35 additions & 0 deletions tests/test_airflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import subprocess


def test_export_slice_housing_dag():
"""
Verifies that the "--airflow" CLI command produces a working Airflow DAG
"""
subprocess.check_call(
[
"lineapy",
"tests/housing.py",
"--slice",
"p value",
"--airflow",
"sliced_housing_dag",
]
)
subprocess.check_call(
[
"airflow",
"db",
"init",
]
)
subprocess.check_call(
[
"airflow",
"dags",
"test",
"sliced_housing_dag_dag",
"2020-10-19",
"-S",
".",
]
)