Skip to content

Commit

Permalink
feat(dataset): filter ls-files by tag (#2950)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Jun 23, 2022
1 parent 6995098 commit 73866f2
Show file tree
Hide file tree
Showing 11 changed files with 247 additions and 103 deletions.
62 changes: 35 additions & 27 deletions renku/command/format/dataset_files.py
Expand Up @@ -52,42 +52,36 @@ def tabular(records, *, columns=None):


@inject.autoparams()
def get_lfs_tracking(records, client_dispatcher: IClientDispatcher):
"""Check if files are tracked in git lfs.
Args:
records: File records to check.
client_dispatcher(IClientDispatcher): Injected client dispatcher.
"""
client = client_dispatcher.current_client

paths = (r.path for r in records)
attrs = client.repository.get_attributes(*paths)

for record in records:
if attrs.get(str(record.path), {}).get("filter") == "lfs":
record.is_lfs = True
else:
record.is_lfs = False


@inject.autoparams()
def get_lfs_file_sizes(records, client_dispatcher: IClientDispatcher):
"""Try to get file size from Git LFS.
def get_lfs_tracking_and_file_sizes(records, has_tag: bool, client_dispatcher: IClientDispatcher):
"""Try to get file size from Git LFS and check if files are tracked in git lfs.
Args:
records: File records tog et size for.
has_tag(bool): Whether sizes are retrieved for a given tag instead of HEAD commit
client_dispatcher(IClientDispatcher): Injected client dispatcher.
"""
from humanize import naturalsize # Slow import

client = client_dispatcher.current_client

def get_lfs_tracking():
paths = (r.path for r in records)
attrs = client.repository.get_attributes(*paths)

for record in records:
if attrs.get(str(record.path), {}).get("filter") == "lfs":
record.is_lfs = True
else:
record.is_lfs = False

lfs_files_sizes = {}

try:
lfs_run = run(
("git", "lfs", "ls-files", "--name-only", "--size"), stdout=PIPE, cwd=client.path, universal_newlines=True
("git", "lfs", "ls-files", "--name-only", "--size", "--deleted"),
stdout=PIPE,
cwd=client.path,
universal_newlines=True,
)
except SubprocessError:
pass
Expand All @@ -106,15 +100,29 @@ def get_lfs_file_sizes(records, client_dispatcher: IClientDispatcher):
size = size.replace(" B", " B")
lfs_files_sizes[path] = size

non_lfs_files_sizes = {
o.path: o.size for o in client.repository.head.commit.traverse() if o.path not in lfs_files_sizes
}
non_lfs_files_sizes = {k: naturalsize(v).upper().replace("BYTES", " B") for k, v in non_lfs_files_sizes.items()}
if has_tag:
checksums = [r.entity.checksum for r in records]
sizes = client.repository.get_sizes(*checksums)
non_lfs_files_sizes = {
r.entity.path: naturalsize(s).upper().replace("BYTES", " B") for r, s in zip(records, sizes)
}
else:
non_lfs_files_sizes = {
o.path: o.size for o in client.repository.head.commit.traverse() if o.path not in lfs_files_sizes
}
non_lfs_files_sizes = {k: naturalsize(v).upper().replace("BYTES", " B") for k, v in non_lfs_files_sizes.items()}

# NOTE: Check .gitattributes file to see if a file is in LFS
get_lfs_tracking()

for record in records:
size = lfs_files_sizes.get(record.path) or non_lfs_files_sizes.get(record.path)
record.size = size

# NOTE: When listing a tag we assume that the file is in LFS if it was in LFS at some point in time
if has_tag:
record.is_lfs = lfs_files_sizes.get(record.path) is not None


def jsonld(records, **kwargs):
"""Format dataset files as JSON-LD.
Expand Down
142 changes: 74 additions & 68 deletions renku/core/dataset/dataset.py
Expand Up @@ -36,7 +36,7 @@
from renku.core.dataset.providers import ProviderFactory
from renku.core.dataset.providers.models import ProviderDataset, ProviderDatasetFile
from renku.core.dataset.request_model import ImageRequestModel
from renku.core.dataset.tag import add_dataset_tag, prompt_access_token, prompt_tag_selection
from renku.core.dataset.tag import add_dataset_tag, get_dataset_by_tag, prompt_access_token, prompt_tag_selection
from renku.core.interface.client_dispatcher import IClientDispatcher
from renku.core.interface.database_dispatcher import IDatabaseDispatcher
from renku.core.interface.dataset_gateway import IDatasetGateway
Expand Down Expand Up @@ -249,10 +249,11 @@ def edit_dataset(
return updated


@inject.autoparams()
@inject.autoparams("client_dispatcher")
def list_dataset_files(
client_dispatcher: IClientDispatcher,
datasets=None,
datasets: List[str] = None,
tag: Optional[str] = None,
creators=None,
include=None,
exclude=None,
Expand All @@ -261,19 +262,22 @@ def list_dataset_files(
Args:
client_dispatcher(IClientDispatcher): Injected client dispatcher.
datasets: Datasets to list files for (Default value = None).
datasets(List[str]): Datasets to list files for (Default value = None).
tag(str): Tag to filter by (Default value = None).
creators: Creators to filter by (Default value = None).
include: Include filters for file paths (Default value = None).
exclude: Exclude filters for file paths (Default value = None).
Returns:
List[DynamicProxy]: Filtered dataset files.
"""
from renku.command.format.dataset_files import get_lfs_file_sizes, get_lfs_tracking
from renku.command.format.dataset_files import get_lfs_tracking_and_file_sizes

client = client_dispatcher.current_client

records = filter_dataset_files(names=datasets, creators=creators, include=include, exclude=exclude, immutable=True)
records = filter_dataset_files(
names=datasets, tag=tag, creators=creators, include=include, exclude=exclude, immutable=True
)
for record in records:
record.title = record.dataset.title
record.dataset_name = record.dataset.name
Expand All @@ -285,8 +289,7 @@ def list_dataset_files(
record.name = Path(record.entity.path).name
record.added = record.date_added

get_lfs_file_sizes(records)
get_lfs_tracking(records)
get_lfs_tracking_and_file_sizes(records, has_tag=bool(tag))

return records

Expand Down Expand Up @@ -1145,90 +1148,93 @@ def update_external_files(client: "LocalClient", records: List[DynamicProxy], dr
return updated_files


@inject.autoparams()
@inject.autoparams("client_dispatcher", "dataset_gateway")
def filter_dataset_files(
client_dispatcher: IClientDispatcher,
dataset_gateway: IDatasetGateway,
names=None,
creators=None,
include=None,
exclude=None,
ignore=None,
immutable=False,
names: Optional[List[str]] = None,
tag: Optional[str] = None,
creators: Optional[Union[str, List[str], Tuple[str]]] = None,
include: Optional[List[str]] = None,
exclude: Optional[List[str]] = None,
ignore: Optional[List[str]] = None,
immutable: bool = False,
) -> List[DynamicProxy]:
"""Filter dataset files by specified filters.
Args:
client_dispatcher(IClientDispatcher): Injected client dispatcher.
dataset_gateway(IDatasetGateway):Injected dataset gateway.
names: Filter by specified dataset names. (Default value = None).
creators: Filter by creators. (Default value = None).
include: Include files matching file pattern. (Default value = None).
exclude: Exclude files matching file pattern. (Default value = None).
ignore: Ignored datasets. (Default value = None).
immutable: Return immutable copies of dataset objects. (Default value = False).
names(Optional[List[str]]): Filter by specified dataset names (Default value = None).
tag(Optional[str]): Filter by specified tag (Default value = None).
creators(Optional[Union[str, List[str], Tuple[str]]]): Filter by creators (Default value = None).
include(Optional[List[str]]): Tuple containing patterns to which include from result (Default value = None).
exclude(Optional[List[str]]): Tuple containing patterns to which exclude from result (Default value = None).
ignore(Optional[List[str]]): Ignored datasets (Default value = None).
immutable(bool): Return immutable copies of dataset objects (Default value = False).
Returns:
List[DynamicProxy]: List of filtered files sorted by date added.
"""

def should_include(filepath: Path) -> bool:
"""Check if file matches one of include filters and not in exclude filter."""
if exclude:
for pattern in exclude:
if filepath.match(pattern):
return False

if include:
for pattern in include:
if filepath.match(pattern):
return True
return False

return True

client = client_dispatcher.current_client

if isinstance(creators, str):
creators = set(creators.split(","))

if isinstance(creators, list) or isinstance(creators, tuple):
creators = set(creators)
creators_set = set(creators.split(","))
elif isinstance(creators, list) or isinstance(creators, tuple):
creators_set = set(creators)
else:
creators_set = set()

records = []
unused_names = set(names)
unused_names = set(names) if names is not None else set()

for dataset in dataset_gateway.get_all_active_datasets():
if (names and dataset.name not in names) or (ignore and dataset.name in ignore):
continue

if tag:
dataset = get_dataset_by_tag(dataset=dataset, tag=tag) # type: ignore
if not dataset:
continue

if not immutable:
dataset = dataset.copy()
if (not names or dataset.name in names) and (not ignore or dataset.name not in ignore):
if unused_names:
unused_names.remove(dataset.name)
for file in dataset.files:
record = DynamicProxy(file)
record.dataset = dataset
record.client = client
path = Path(record.entity.path)
match = _include_exclude(path, include, exclude)

if creators:
c: Person
dataset_creators = {c.name for c in dataset.creators}
match = match and creators.issubset(dataset_creators)

if match:
records.append(record)

if unused_names:
unused_names_str = ", ".join(unused_names)
raise errors.ParameterError(f"Dataset does not exist: {unused_names_str}")
if unused_names:
unused_names.remove(dataset.name)

return sorted(records, key=lambda r: r.date_added)
if creators_set:
dataset_creators = {creator.name for creator in dataset.creators}
if not creators_set.issubset(dataset_creators):
continue

for file in dataset.files:
if not should_include(Path(file.entity.path)):
continue

def _include_exclude(file_path, include=None, exclude=None):
"""Check if file matches one of include filters and not in exclude filter.
record = DynamicProxy(file)
record.dataset = dataset
record.client = client
records.append(record)

Args:
file_path: Path to the file.
include: Tuple containing patterns to which include from result (Default value = None).
exclude: Tuple containing patterns to which exclude from result (Default value = None).
if unused_names:
unused_names_str = ", ".join(unused_names)
raise errors.ParameterError(f"These datasets don't exist: {unused_names_str}")

Returns:
bool: True if a file should be included, False otherwise.
"""
if exclude is not None and exclude:
for pattern in exclude:
if file_path.match(pattern):
return False

if include is not None and include:
for pattern in include:
if file_path.match(pattern):
return True
return False

return True
return sorted(records, key=lambda r: r.date_added)
22 changes: 21 additions & 1 deletion renku/core/dataset/tag.py
Expand Up @@ -24,7 +24,8 @@
from renku.core import errors
from renku.core.dataset.datasets_provenance import DatasetsProvenance
from renku.core.util import communication
from renku.domain_model.dataset import DatasetTag, Url
from renku.domain_model.dataset import Dataset, DatasetTag, Url
from renku.infrastructure.gateway.dataset_gateway import DatasetGateway
from renku.infrastructure.immutable import DynamicProxy


Expand Down Expand Up @@ -94,6 +95,25 @@ def remove_dataset_tags(dataset_name: str, tags: List[str]):
datasets_provenance.remove_tag(dataset, tag)


def get_dataset_by_tag(dataset: Dataset, tag: str) -> Optional[Dataset]:
"""Return a version of dataset that has a specific tag.
Args:
dataset(Dataset): A dataset to return its tagged version.
tag(str): Tag name to search for.
Returns:
Optional[Dataset]: The dataset pointed to by the tag or None if nothing found.
"""
dataset_gateway = DatasetGateway()

tags = dataset_gateway.get_all_tags(dataset)
selected_tag = next((t for t in tags if t.name == tag), None)
if selected_tag is None:
return None
return dataset_gateway.get_by_id(selected_tag.dataset_id.value)


def prompt_access_token(exporter):
"""Prompt user for an access token for a provider.
Expand Down
7 changes: 5 additions & 2 deletions renku/core/util/metadata.py
Expand Up @@ -34,7 +34,9 @@
from renku.domain_model.provenance.agent import Person


def construct_creators(creators: Optional[List[Union[dict, str]]], ignore_email=False):
def construct_creators(
creators: List[Union[dict, str]], ignore_email=False
) -> Tuple[List["Person"], List[Union[dict, str]]]:
"""Parse input and return a list of Person."""
creators = creators or []

Expand All @@ -46,7 +48,8 @@ def construct_creators(creators: Optional[List[Union[dict, str]]], ignore_email=
for creator in creators:
person, no_email_warning = construct_creator(creator, ignore_email=ignore_email)

people.append(person)
if person:
people.append(person)

if no_email_warning:
no_email_warnings.append(no_email_warning)
Expand Down

0 comments on commit 73866f2

Please sign in to comment.