Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions datashare-python/datashare_python/objects.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
import os
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from datetime import UTC, datetime
from enum import StrEnum, unique
from io import BytesIO
from pathlib import Path
from typing import Any, Literal, Self, TypeVar, cast
from typing import Annotated, Any, Literal, Self, TypeVar, cast

from temporalio import workflow

Expand All @@ -30,8 +31,8 @@
merge_configs,
no_enum_values_config,
)
from pydantic import AfterValidator, Field
from pydantic import BaseModel as _BaseModel
from pydantic import Field
from pydantic.main import IncEx

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -162,9 +163,17 @@ class DocumentLocation(StrEnum):
WORKDIR = "workdir"


def _is_relative(value: Path) -> Path:
if value.is_absolute():
raise ValueError(
f"FilesystemDocument path should always be relative, found {value}"
)
return value


class FilesystemDocument(DatashareModel):
id: str
path: Path
path: Annotated[Path, AfterValidator(_is_relative)]
index: str
location: DocumentLocation
resource_name: str
Expand All @@ -174,11 +183,11 @@ def locate(
) -> Path:
from datashare_python.utils import artifacts_dir # noqa: PLC0415

project = self.index
match self.location:
case DocumentLocation.ORIGINAL:
return original_root / self.path
case DocumentLocation.ARTIFACTS:
project = self.index
return artifacts_root / artifacts_dir(self.id, project=project) / "raw"
case DocumentLocation.WORKDIR:
return workdir / self.path
Expand Down Expand Up @@ -236,6 +245,10 @@ def to_filesystem(self) -> FilesystemDocument:
raise ValueError(msg)
path = artifacts_dir(doc_id=self.id, project=self.index) / "raw"
location = DocumentLocation.ARTIFACTS
# The filesystem dod is alway relative to the base location, let's make sure
# we store a relative path otherwise joining with the location will fail
if path.parts and path.parts[0] == os.path.sep:
path = Path(*path.parts[1:])
return FilesystemDocument(
id=self.id,
path=path,
Expand Down
43 changes: 42 additions & 1 deletion datashare-python/tests/test_object.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
import re
from datetime import datetime
from pathlib import Path

from datashare_python.objects import Task, TaskState
import pytest
from datashare_python.conftest import TEST_PROJECT
from datashare_python.constants import TIKA_METADATA_RESOURCENAME
from datashare_python.objects import (
Document,
DocumentLocation,
FilesystemDocument,
Task,
TaskState,
)
from pydantic import ValidationError


def test_task_ser() -> None:
Expand All @@ -26,3 +38,32 @@ def test_task_ser() -> None:
"state": TaskState.CREATED,
}
assert serialized == expected


def test_filesystem_document_should_raise_on_absolute_path() -> None:
# Given
path = Path("/some/absolute/path")
# When/Then
expected = re.escape("FilesystemDocument path should always be relative")
with pytest.raises(ValidationError, match=expected):
FilesystemDocument(
id="some_id",
path=path,
index="id",
location=DocumentLocation.ORIGINAL,
resource_name="aa",
)


def test_document_to_filesystem_document_use_relative_path() -> None:
# Given
path = Path("/some/absolute/path/resource.file")
assert path.is_absolute()
meta = {TIKA_METADATA_RESOURCENAME: "resource.file"}
doc = Document(
index=TEST_PROJECT, path=path, id="some_id", language="ENGLISH", metadata=meta
)
# When
fs_doc = doc.to_filesystem()
relative_path = Path("some/absolute/path/resource.file")
assert fs_doc.path == relative_path
13 changes: 2 additions & 11 deletions workers/asr-worker/asr_worker/activities.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import contextlib
import logging
import os
from asyncio import AbstractEventLoop
from collections.abc import AsyncGenerator, AsyncIterable, Iterable
Expand Down Expand Up @@ -219,26 +218,18 @@ def preprocess_act(
worker_config: ASRWorkerConfig,
output_dir: Path,
) -> list[Path]:
# TODO: remove this debug code
import datashare_python # noqa: PLC0415

logger = logging.getLogger(datashare_python.__name__)
logger.info("worker_config: %s", worker_config)
audios_root = worker_config.audios_root
docs_root = worker_config.docs_root
artifacts_root = worker_config.artifacts_root
workdir = worker_config.workdir
audios = (
FilesystemDocument.model_validate(fs_doc) for fs_doc in read_jsonl(audio_batch)
)
audios = (
fs_doc.locate(
original_root=audios_root, artifacts_root=artifacts_root, workdir=workdir
original_root=docs_root, artifacts_root=artifacts_root, workdir=workdir
)
for fs_doc in audios
)
# TODO: removeme
audios = list(audios)
logger.info("audios: %s", audios)
audios = (str(a) for a in audios)
# TODO: implement a caching strategy here, we could avoid processing files
# which have already been preprocessed
Expand Down
1 change: 0 additions & 1 deletion workers/asr-worker/asr_worker/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ class ASRWorkerConfig(WorkerConfig):
artifacts_root: Path
workdir: Path

@property
def audios_root(self) -> Path:
return self.docs_root

Expand Down
4 changes: 2 additions & 2 deletions workers/asr-worker/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ def with_audio_docs(
audio_path = AUDIOS_PATH / "asr_test.wav"
for doc in docs:
if doc.root_document is None:
config.audios_root.mkdir(parents=True, exist_ok=True)
shutil.copy(audio_path, config.audios_root / doc.path)
config.docs_root.mkdir(parents=True, exist_ok=True)
shutil.copy(audio_path, config.docs_root / doc.path)
else:
artifact_path = (
config.artifacts_root / artifacts_dir(doc.id, project=doc.index) / "raw"
Expand Down
Loading