Skip to content

Commit

Permalink
Merge b3294a0 into 1e0f608
Browse files Browse the repository at this point in the history
  • Loading branch information
dmichaels-harvard committed Jan 20, 2024
2 parents 1e0f608 + b3294a0 commit 5134a04
Show file tree
Hide file tree
Showing 13 changed files with 21,661 additions and 120 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ dcicutils
Change Log
----------


8.7.1
=====

* Changed scripts/publish_to_pypi.py to allow gitinfo.json to have unstaged changes;
this is so we can optionally have repos write relevant git (repo, branch, commit) info
to this file (via GitHub Actions) and make it accessible to the package for inspection.
* Added is_schema_type and is_specified_schema to portal_utils.Portal.
* Refactoring in portal_utils; added portal_object_utils; added file_utils.py.


8.7.0
=====

Expand Down
54 changes: 54 additions & 0 deletions dcicutils/file_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import glob
import os
import pathlib
from typing import List, Optional, Union


def search_for_file(file: str,
location: Union[str, Optional[List[str]]] = None,
recursive: bool = False,
single: bool = False) -> Union[List[str], Optional[str]]:
"""
Searches for the existence of the given file name, first directly in the given directory or list
of directories, if specified, and if not then just in the current (working) directory; if the
given recursive flag is True then also searches all sub-directories of these directories;
returns the full path name to the file if found. If the single flag is True then just the
first file which is found is returns (as a string), or None if none; if the single flag
is False, then all matched files are returned in a list, or and empty list if none.
"""
if file and isinstance(file, (str, pathlib.PosixPath)):
if os.path.isabs(file):
if os.path.exists(file):
return file if single else [file]
return None if single else []
files_found = []
if not location:
location = ["."]
elif isinstance(location, (str, pathlib.PosixPath)):
location = [location]
elif not isinstance(location, list):
location = []
for directory in location:
if isinstance(directory, (str, pathlib.PosixPath)) and os.path.exists(os.path.join(directory, file)):
file_found = os.path.abspath(os.path.normpath(os.path.join(directory, file)))
if single:
return file_found
if file_found not in files_found:
files_found.append(file_found)
if recursive:
for directory in location:
if not directory.endswith("/**") and not file.startswith("**/"):
path = f"{directory}/**/{file}"
else:
path = f"{directory}/{file}"
files = glob.glob(path, recursive=recursive)
if files:
for file_found in files:
file_found = os.path.abspath(file_found)
if single:
return file_found
if file_found not in files_found:
files_found.append(file_found)
if files_found:
return files_found[0] if single else files_found
return None if single else []
119 changes: 119 additions & 0 deletions dcicutils/portal_object_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from dcicutils.schema_utils import get_identifying_properties
from dcicutils.portal_utils import Portal
from functools import lru_cache
from typing import List, Optional, Tuple, Union


class PortalObject:

def __init__(self, portal: Portal, portal_object: dict, portal_object_type: Optional[str] = None) -> None:
self._portal = portal
self._data = portal_object
self._type = portal_object_type if isinstance(portal_object_type, str) and portal_object_type else None

@property
def data(self):
return self._data

@property
@lru_cache(maxsize=1)
def schema(self):
return self._portal.get_schema(self.schema_type)

@property
@lru_cache(maxsize=1)
def schema_type(self):
return self._type or Portal.get_schema_type(self._data)

@property
@lru_cache(maxsize=1)
def schema_types(self):
return self._type or Portal.get_schema_types(self._data)

@property
@lru_cache(maxsize=1)
def schema_identifying_properties(self) -> list:
if not (schema := self.schema):
return []
return get_identifying_properties(schema)

@property
@lru_cache(maxsize=1)
def uuid(self) -> Optional[str]:
return PortalObject.get_uuid(self._data)

@staticmethod
def get_uuid(portal_object: dict) -> Optional[str]:
return portal_object.get("uuid") if isinstance(portal_object, dict) else None

@property
@lru_cache(maxsize=1)
def identifying_properties(self) -> List[str]:
"""
Returns the list of all identifying property names of this Portal object which actually have values.
Implicitly include "uuid" and "identifier" properties as identifying properties if they are actually
properties in the object schema, and favor these (first); defavor "aliases"; no other ordering defined.
"""
identifying_properties = []
for identifying_property in self.schema_identifying_properties:
if identifying_property not in ["uuid", "identifier", "aliases"]:
if self._data.get(identifying_property):
identifying_properties.append(identifying_property)
if self._data.get("identifier"):
identifying_properties.insert(0, "identifier")
if self._data.get("uuid"):
identifying_properties.insert(0, "uuid")
if "aliases" in self.schema_identifying_properties and self._data.get("aliases"):
identifying_properties.append("aliases")
return identifying_properties

@property
@lru_cache(maxsize=1)
def identifying_paths(self) -> List[str]:
"""
Returns a list of the possible Portal URL paths identifying this Portal object.
"""
if not (identifying_properties := self.identifying_properties):
return []
identifying_paths = []
for identifying_property in identifying_properties:
if (identifying_value := self._data.get(identifying_property)):
if identifying_property == "uuid":
identifying_paths.append(f"/{identifying_value}")
# For now at least we include the path both with and without the schema type component
# as for some identifying values it works (only) with and some it works (only) without.
# For example: If we have FileSet with "accession", an identifying property, with value
# SMAFSFXF1RO4 then /SMAFSFXF1RO4 works but /FileSet/SMAFSFXF1RO4 does not; and
# conversely using "submitted_id", also an identifying property, with value
# UW_FILE-SET_COLO-829BL_HI-C_1 then /UW_FILE-SET_COLO-829BL_HI-C_1 does
# not work but /FileSet/UW_FILE-SET_COLO-829BL_HI-C_1 does work.
elif isinstance(identifying_value, list):
for identifying_value_item in identifying_value:
identifying_paths.append(f"/{self.schema_type}/{identifying_value_item}")
identifying_paths.append(f"/{identifying_value_item}")
else:
identifying_paths.append(f"/{self.schema_type}/{identifying_value}")
identifying_paths.append(f"/{identifying_value}")
return identifying_paths

@property
@lru_cache(maxsize=1)
def identifying_path(self) -> Optional[str]:
if identifying_paths := self.identifying_paths:
return identifying_paths[0]

def lookup(self, include_identifying_path: bool = False,
raw: bool = False) -> Optional[Union[Tuple[dict, str], dict]]:
return self._lookup(raw=raw) if include_identifying_path else self._lookup(raw=raw)[0]

def lookup_identifying_path(self) -> Optional[str]:
return self._lookup()[1]

def _lookup(self, raw: bool = False) -> Tuple[Optional[dict], Optional[str]]:
try:
for identifying_path in self.identifying_paths:
if (value := self._portal.get(identifying_path, raw=raw)) and (value.status_code == 200):
return value.json(), identifying_path
except Exception:
pass
return None, self.identifying_path

0 comments on commit 5134a04

Please sign in to comment.