Skip to content

Commit

Permalink
Git storage extra files (#4767)
Browse files Browse the repository at this point in the history
* Pass file path as __file__ when extracting flow from a file

* Add docs for Git storage extra files

* Use textwrap.dedent for test readability

* Add changelog

* Update typing to ignore less stuff

* Clarify typing
  • Loading branch information
zangell44 committed Jul 16, 2021
1 parent c423a77 commit 8cc4478
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 2 deletions.
2 changes: 2 additions & 0 deletions changes/pr4767.yaml
@@ -0,0 +1,2 @@
enhancement:
- "Enable and document loading additional repository files with `Git` storage - [#4767](https://github.com/PrefectHQ/prefect/pull/4767)"
41 changes: 41 additions & 0 deletions docs/orchestration/flow_config/storage.md
Expand Up @@ -309,7 +309,10 @@ storage = Git(
git_token_username="myuser" # username associated with the Deploy Token
)
```
:::

:::tip Loading additional files from git repository
`Git` storage allows you to load additional files alongside your flow file. For more information, see [Loading Additional Files with Git Storage](/orchestration/flow_config/storage.html#loading-additional-files-with-git-storage)
:::

### GitHub
Expand Down Expand Up @@ -529,6 +532,44 @@ secrets](/core/concepts/secrets.md) `SOME_TOKEN`. Because this resolution is
at runtime, this storage option never has your sensitive information stored in
it and that sensitive information is never sent to Prefect Cloud.

## Loading Additional Files with Git Storage

`Git` storage clones the full repository when loading a flow from storage. This allows you to load non-Python files that live alongside your flow in your repository. For example, you may have a `.sql` file containing a query run in your flow that you want to use in one of your tasks.

To get the file path of your flow, use Python's `__file__` builtin.

For example, let's say we want to say hello to a person and their name is specified by a `.txt` file in our repository.

Our git repository contains two files in the root directory, `flow.py` and `person.txt`.

`flow.py` contains our flow, including logic for loading information from `person.txt`, and should look like this

```python
from pathlib import Path

import prefect
from prefect import task, Flow
from prefect.storage import Git

# get the path to the flow file using pathlib and __file__
# this path is dynamically populated when the flow is loaded from storage
file_path = Path(__file__).resolve().parent

# using our flow path, load the file
with open(str(file_path) + '/person.txt', 'r') as my_file:
name = my_file.read()

@task
def say_hello(name):
logger = prefect.context.get("logger")
logger.info(f"Hi {name}")

with Flow("my-hello-flow") as flow:
say_hello(name)

# configure our flow to use `Git` storage
flow.storage = Git(flow_path="flow.py", repo='org/repo')
```

## SSH + Git Storage

Expand Down
6 changes: 4 additions & 2 deletions src/prefect/utilities/storage.py
Expand Up @@ -4,7 +4,7 @@
import sys
import warnings
from operator import attrgetter
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Dict, Any
from distutils.version import LooseVersion

import cloudpickle
Expand Down Expand Up @@ -82,7 +82,9 @@ def extract_flow_from_file(
raise ValueError("Provide either `file_path` or `file_contents`.")

# Load objects from file into dict
exec_vals = {} # type: ignore
# if a file_path has been provided, provide __file__ as a global variable
# so it resolves correctly during extraction
exec_vals: Dict[str, Any] = {"__file__": file_path} if file_path else {}
exec(contents, exec_vals)

# Grab flow name from values loaded via exec
Expand Down
30 changes: 30 additions & 0 deletions tests/utilities/test_storage.py
@@ -1,6 +1,7 @@
import os
import sys
import types
import textwrap

import pytest
import cloudpickle
Expand Down Expand Up @@ -61,6 +62,28 @@ def flow_path(self, tmpdir):

return full_path

@pytest.fixture
def flow_path_with_additional_file(self, tmpdir):
contents = """\
from prefect import Flow
from pathlib import Path
with open(str(Path(__file__).resolve().parent)+"/test.txt", "r") as f:
name = f.read()
f2 = Flow(name)
"""

full_path = os.path.join(tmpdir, "flow.py")

with open(full_path, "w") as f:
f.write(textwrap.dedent(contents))

with open(os.path.join(tmpdir, "test.txt"), "w") as f:
f.write("test-flow")

return full_path

def test_extract_flow_from_file_path(self, flow_path):
flow = extract_flow_from_file(file_path=flow_path)
assert flow.name == "flow-1"
Expand All @@ -72,6 +95,13 @@ def test_extract_flow_from_file_path(self, flow_path):
flow = extract_flow_from_file(file_path=flow_path, flow_name="flow-2")
assert flow.name == "flow-2"

def test_extract_flow_from_file_path_can_load_files_from_same_directory(
self, flow_path_with_additional_file
):
flow = extract_flow_from_file(file_path=flow_path_with_additional_file)
assert flow.name == "test-flow"
assert flow.run().is_successful()

def test_extract_flow_from_file_contents(self, flow_path):
with open(flow_path, "r") as f:
contents = f.read()
Expand Down

0 comments on commit 8cc4478

Please sign in to comment.