Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LocalStorage Implementation #13981

Merged
merged 15 commits into from
Jun 20, 2024
11 changes: 7 additions & 4 deletions src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@
from prefect.results import ResultSerializer, ResultStorage
from prefect.runner.storage import (
BlockStorageAdapter,
LocalStorage,
RunnerStorage,
create_storage_from_url,
create_storage_from_source,
)
from prefect.settings import (
PREFECT_DEFAULT_WORK_POOL_NAME,
Expand Down Expand Up @@ -910,7 +911,7 @@ async def from_source(
```
"""
if isinstance(source, str):
storage = create_storage_from_url(source)
storage = create_storage_from_source(source)
elif isinstance(source, RunnerStorage):
storage = source
elif hasattr(source, "get_directory"):
Expand All @@ -920,9 +921,11 @@ async def from_source(
f"Unsupported source type {type(source).__name__!r}. Please provide a"
" URL to remote storage or a storage object."
)

with tempfile.TemporaryDirectory() as tmpdir:
storage.set_base_path(Path(tmpdir))
await storage.pull_code()
if not isinstance(storage, LocalStorage):
storage.set_base_path(Path(tmpdir))
await storage.pull_code()

full_entrypoint = str(storage.destination / entrypoint)
flow: "Flow" = await from_async.wait_for_call_in_new_thread(
Expand Down
85 changes: 79 additions & 6 deletions src/prefect/runner/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,8 +564,74 @@ def __eq__(self, __value) -> bool:
return False


def create_storage_from_url(
url: str, pull_interval: Optional[int] = 60
class LocalStorage:
"""
Sets the working directory in the local filesystem.

Parameters:
Path: Local file path to set the working directory for the flow

Examples:
Sets the working directory for the local path to the flow:

```python
from prefect.runner.storage import Localstorage

storage = LocalStorage(
path="/path/to/local/flow_directory",
)
```
"""

def __init__(
self,
path: str,
pull_interval: Optional[int] = None,
):
self._path = Path(path).resolve()
self._logger = get_logger("runner.storage.local-storage")
self._storage_base_path = Path.cwd()
self._pull_interval = pull_interval

@property
def destination(self) -> Path:
return self._path

def set_base_path(self, path: Path):
self._storage_base_path = path

@property
def pull_interval(self) -> Optional[int]:
zzstoatzz marked this conversation as resolved.
Show resolved Hide resolved
return self._pull_interval

async def pull_code(self):
# Local storage assumes the code already exists on the local filesystem
# and does not need to be pulled from a remote location
pass

def to_pull_step(self) -> dict:
"""
Returns a dictionary representation of the storage object that can be
used as a deployment pull step.
"""
step = {
"prefect.deployments.steps.set_working_directory": {
"directory": str(self.destination)
}
}
return step

def __eq__(self, __value) -> bool:
if isinstance(__value, LocalStorage):
return self._path == __value._path
return False

def __repr__(self) -> str:
return f"LocalStorage(path={self._path!r})"


def create_storage_from_source(
source: str, pull_interval: Optional[int] = 60
) -> RunnerStorage:
"""
Creates a storage object from a URL.
Expand All @@ -579,11 +645,18 @@ def create_storage_from_url(
Returns:
RunnerStorage: A runner storage compatible object
"""
parsed_url = urlparse(url)
if parsed_url.scheme == "git" or parsed_url.path.endswith(".git"):
return GitRepository(url=url, pull_interval=pull_interval)
logger = get_logger("runner.storage")
parsed_source = urlparse(source)
if parsed_source.scheme == "git" or parsed_source.path.endswith(".git"):
return GitRepository(url=source, pull_interval=pull_interval)
elif parsed_source.scheme in ("file", "local"):
source_path = source.split("://", 1)[-1]
return LocalStorage(path=source_path, pull_interval=pull_interval)
elif parsed_source.scheme in fsspec.available_protocols():
return RemoteStorage(url=source, pull_interval=pull_interval)
else:
return RemoteStorage(url=url, pull_interval=pull_interval)
logger.debug("No valid fsspec protocol found for URL, assuming local storage.")
return LocalStorage(path=source, pull_interval=pull_interval)


def _format_token_from_credentials(netloc: str, credentials: dict) -> str:
Expand Down
66 changes: 61 additions & 5 deletions tests/runner/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
from prefect.runner.storage import (
BlockStorageAdapter,
GitRepository,
LocalStorage,
RemoteStorage,
RunnerStorage,
create_storage_from_url,
create_storage_from_source,
)
from prefect.testing.utilities import AsyncMock, MagicMock
from prefect.utilities.filesystem import tmpchdir
Expand All @@ -26,7 +27,7 @@
import pytest


class TestCreateStorageFromUrl:
class TestCreateStorageFromSource:
@pytest.mark.parametrize(
"url, expected_type",
[
Expand All @@ -35,7 +36,7 @@ class TestCreateStorageFromUrl:
],
)
def test_create_git_storage(self, url, expected_type):
storage = create_storage_from_url(url)
storage = create_storage_from_source(url)
assert isinstance(storage, eval(expected_type))
assert storage.pull_interval == 60 # default value

Expand All @@ -47,7 +48,7 @@ def test_create_git_storage(self, url, expected_type):
],
)
def test_create_git_storage_custom_pull_interval(self, url, pull_interval):
storage = create_storage_from_url(url, pull_interval=pull_interval)
storage = create_storage_from_source(url, pull_interval=pull_interval)
assert isinstance(
storage, GitRepository
) # We already know it's GitRepository from above tests
Expand All @@ -61,11 +62,29 @@ def test_create_git_storage_custom_pull_interval(self, url, pull_interval):
],
)
def test_alternative_storage_url(self, url):
storage = create_storage_from_url(url)
storage = create_storage_from_source(url)
assert isinstance(storage, RemoteStorage)
assert storage._url == url
assert storage.pull_interval == 60 # default value

@pytest.mark.parametrize(
"path",
[
"/path/to/local/flows",
"C:\\path\\to\\local\\flows",
"file:///path/to/local/flows",
"flows", # Relative Path
],
)
def test_local_storage_path(self, path):
storage = create_storage_from_source(path)

path = path.split("://")[-1] # split from Scheme when present

assert isinstance(storage, LocalStorage)
assert storage._path == Path(path).resolve()
assert storage.pull_interval == 60 # default value


@pytest.fixture
def mock_run_process(monkeypatch):
Expand Down Expand Up @@ -635,6 +654,43 @@ def test_repr(self):
assert repr(rs) == "RemoteStorage(url='s3://bucket/path')"


class TestLocalStorage:
def test_init(self):
ls = LocalStorage("/path/to/directory", pull_interval=60)
assert ls._path == Path("/path/to/directory")
assert ls.pull_interval == 60

def test_set_base_path(self):
locals = LocalStorage("/path/to/directory")
path = Path.cwd() / "new_base_path"
locals.set_base_path(path)
assert locals._storage_base_path == path

def test_destination(self):
locals = LocalStorage("/path/to/directory")
assert locals.destination == Path("/path/to/directory")

def test_to_pull_step(self):
locals = LocalStorage("/path/to/directory")
pull_step = locals.to_pull_step()
assert pull_step == {
"prefect.deployments.steps.set_working_directory": {
"directory": "/path/to/directory"
}
}

def test_eq(self):
local1 = LocalStorage(path="/path/to/local/flows")
local2 = LocalStorage(path="/path/to/local/flows")
local3 = LocalStorage(path="C:\\path\\to\\local\\flows")
assert local1 == local2
assert local1 != local3

def test_repr(self):
local = LocalStorage(path="/path/to/local/flows")
assert repr(local) == "LocalStorage(path=PosixPath('/path/to/local/flows'))"


class TestBlockStorageAdapter:
@pytest.fixture
async def test_block(self):
Expand Down
4 changes: 2 additions & 2 deletions tests/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3683,11 +3683,11 @@ def test_loaded_flow_can_be_updated_with_options(self):
assert deployment.storage == storage

async def test_load_flow_from_source_with_url(self, monkeypatch):
def mock_create_storage_from_url(url):
def mock_create_storage_from_source(url):
return MockStorage()

monkeypatch.setattr(
"prefect.flows.create_storage_from_url", mock_create_storage_from_url
"prefect.flows.create_storage_from_source", mock_create_storage_from_source
) # adjust the import path as per your module's name and location

loaded_flow = await Flow.from_source(
Expand Down