Skip to content

Commit

Permalink
Merge pull request #26 from WIPACrepo/performance-improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
ric-evans committed Mar 15, 2021
2 parents 7380fca + 135f7f4 commit 0d39ab1
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 92 deletions.
200 changes: 134 additions & 66 deletions indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
from indexer_api.metadata_manager import MetadataManager

try:
from typing import TypedDict, Final
from typing import Final, TypedDict
except ImportError:
from typing_extensions import TypedDict, Final # type: ignore[misc]
from typing_extensions import Final, TypedDict # type: ignore[misc]


_DEFAULT_TIMEOUT: Final[int] = 30 # seconds
Expand All @@ -46,7 +46,7 @@ class IndexerFlags(TypedDict):
"""TypedDict for Indexer bool parameters."""

basic_only: bool
no_patch: bool
patch: bool
iceprodv2_rc_token: str
iceprodv1_db_pass: str
dryrun: bool
Expand Down Expand Up @@ -76,21 +76,20 @@ def is_processable_path(path: str) -> bool:
)


# Functions ----------------------------------------------------------------------------


def sorted_unique_filepaths(
file_of_filepaths: Optional[str] = None,
list_of_filepaths: Optional[List[str]] = None,
abspaths: bool = False,
) -> List[str]:
"""Return an aggregated, sorted, and set-unique list of filepaths.
Read in lines from the `file_of_filepaths` file, and/or aggregate with those
in `list_of_filepaths` list. Do not check if filepaths exist.
Keyword Arguments:
file_of_filepaths {Optional[str]} -- a file with a filepath on each line (default: {None})
list_of_filepaths {Optional[List[str]]} -- a list of filepaths (default: {None})
file_of_filepaths -- a file with a filepath on each line
list_of_filepaths -- a list of filepaths
abspaths -- call `os.path.abspath()` on each filepath
Returns:
List[str] -- all unique filepaths
Expand Down Expand Up @@ -130,14 +129,19 @@ def convert_to_good_string(b_string: bytes) -> Optional[str]:
if path:
filepaths.append(path)

if abspaths:
filepaths = [os.path.abspath(p) for p in filepaths]
filepaths = [f for f in sorted(set(filepaths)) if f]
return filepaths


async def request_post_patch(
# Indexing Functions -------------------------------------------------------------------


async def post_metadata(
fc_rc: RestClient,
metadata: types.Metadata,
dont_patch: bool = False,
patch: bool = False,
dryrun: bool = False,
) -> RestClient:
"""POST metadata, and PATCH if file is already in the file catalog."""
Expand All @@ -151,25 +155,38 @@ async def request_post_patch(
logging.debug("POSTed.")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 409:
if dont_patch:
logging.debug("File already exists, not replacing.")
else:
if patch:
patch_path = e.response.json()["file"] # /api/files/{uuid}
_ = await fc_rc.request("PATCH", patch_path, metadata)
logging.debug("PATCHed.")
else:
logging.debug("File already exists, not patching entry.")
else:
raise
return fc_rc


async def process_file(
async def file_exists_in_fc(fc_rc: RestClient, filepath: str) -> bool:
"""Return whether the filepath is currently in the File Catalog."""
ret = await fc_rc.request("GET", "/api/files", {"path": filepath})
return bool(ret["files"])


async def index_file(
filepath: str,
manager: MetadataManager,
fc_rc: RestClient,
no_patch: bool,
patch: bool,
dryrun: bool,
) -> None:
"""Gather and POST metadata for a file."""
if not patch and await file_exists_in_fc(fc_rc, filepath):
logging.info(
f"File already exists in the File Catalog (use --patch to overwrite); "
f"skipping ({filepath})"
)
return

try:
metadata_file = manager.new_file(filepath)
metadata = metadata_file.generate()
Expand All @@ -183,27 +200,27 @@ async def process_file(

logging.debug(f"{filepath} gathered.")
logging.debug(metadata)
await request_post_patch(fc_rc, metadata, no_patch, dryrun)
await post_metadata(fc_rc, metadata, patch, dryrun)


async def process_paths(
async def index_paths(
paths: List[str],
manager: MetadataManager,
fc_rc: RestClient,
no_patch: bool,
patch: bool,
dryrun: bool,
) -> List[str]:
"""POST metadata of files given by paths, and return any directories."""
sub_files: List[str] = []
"""POST metadata of files given by paths, and return all child paths."""
child_paths: List[str] = []

for p in paths:
for p in paths: # pylint: disable=C0103
try:
if is_processable_path(p):
if os.path.isfile(p):
await process_file(p, manager, fc_rc, no_patch, dryrun)
await index_file(p, manager, fc_rc, patch, dryrun)
elif os.path.isdir(p):
logging.debug(f"Directory found, {p}. Queuing its contents...")
sub_files.extend(
child_paths.extend(
dir_entry.path
for dir_entry in os.scandir(p)
if not dir_entry.is_symlink()
Expand All @@ -214,7 +231,7 @@ async def process_paths(
except (PermissionError, FileNotFoundError, NotADirectoryError) as e:
logging.info(f"Skipping {p}, {e.__class__.__name__}.")

return sub_files
return child_paths


def path_in_blacklist(path: str, blacklist: List[str]) -> bool:
Expand All @@ -233,26 +250,27 @@ def path_in_blacklist(path: str, blacklist: List[str]) -> bool:
return False


def process_work(
def index(
paths: List[str],
blacklist: List[str],
rest_client_args: RestClientArgs,
site: str,
indexer_flags: IndexerFlags,
) -> List[str]:
"""Wrap async function, `process_paths`.
"""Index paths, excluding any matching the blacklist.
Return files nested under any directories.
Return all child paths nested under any directories.
"""
if not isinstance(paths, list):
raise TypeError(f"`paths` object is not list {paths}")
if not paths:
return []

# Check blacklist
# Filter
paths = sorted_unique_filepaths(list_of_filepaths=paths)
paths = [p for p in paths if not path_in_blacklist(p, blacklist)]

# Process Paths
# Prep
fc_rc = RestClient(
rest_client_args["url"],
token=rest_client_args["token"],
Expand All @@ -265,27 +283,22 @@ def process_work(
iceprodv2_rc_token=indexer_flags["iceprodv2_rc_token"],
iceprodv1_db_pass=indexer_flags["iceprodv1_db_pass"],
)
sub_files = asyncio.get_event_loop().run_until_complete(
process_paths(
paths, manager, fc_rc, indexer_flags["no_patch"], indexer_flags["dryrun"]

# Index
child_paths = asyncio.get_event_loop().run_until_complete(
index_paths(
paths, manager, fc_rc, indexer_flags["patch"], indexer_flags["dryrun"]
)
)

fc_rc.close()
return sub_files
return child_paths


def check_path(path: str) -> None:
"""Check if `path` is rooted at a white-listed root path."""
for root in ACCEPTED_ROOTS:
if root == os.path.commonpath([path, root]):
return
message = f"{path} is not rooted at: {', '.join(ACCEPTED_ROOTS)}"
logging.critical(message)
raise Exception(f"Invalid path ({message}).")
# Recursively-Indexing Functions -------------------------------------------------------


def gather_file_info( # pylint: disable=R0913
def recursively_index_multiprocessed( # pylint: disable=R0913
starting_paths: List[str],
blacklist: List[str],
rest_client_args: RestClientArgs,
Expand All @@ -297,11 +310,6 @@ def gather_file_info( # pylint: disable=R0913
Do this multi-processed.
"""
# Get full paths
starting_paths = [os.path.abspath(p) for p in starting_paths]
for p in starting_paths:
check_path(p)

# Traverse paths and process files
futures: List[Future] = [] # type: ignore[type-arg]
with ProcessPoolExecutor() as pool:
Expand All @@ -319,7 +327,7 @@ def gather_file_info( # pylint: disable=R0913
)
futures.append(
pool.submit(
process_work,
index,
paths,
blacklist,
rest_client_args,
Expand All @@ -340,6 +348,41 @@ def gather_file_info( # pylint: disable=R0913
logging.debug(f"Worker finished: {future} (enqueued {len(result)}).")


def recursively_index( # pylint: disable=R0913
starting_paths: List[str],
blacklist: List[str],
rest_client_args: RestClientArgs,
site: str,
indexer_flags: IndexerFlags,
processes: int,
) -> None:
"""Gather and post metadata from files rooted at `starting_paths`."""
if processes > 1:
recursively_index_multiprocessed(
starting_paths, blacklist, rest_client_args, site, indexer_flags, processes
)
else:
queue = starting_paths
i = 0
while queue:
logging.debug(f"Queue Iteration #{i}")
queue = index(queue, blacklist, rest_client_args, site, indexer_flags)
i += 1


# Main ---------------------------------------------------------------------------------


def validate_path(path: str) -> None:
"""Check if `path` is rooted at a white-listed root path."""
for root in ACCEPTED_ROOTS:
if root == os.path.commonpath([path, root]):
return
message = f"{path} is not rooted at: {', '.join(ACCEPTED_ROOTS)}"
logging.critical(message)
raise Exception(f"Invalid path ({message}).")


def main() -> None:
"""Traverse paths, recursively, and index."""
parser = argparse.ArgumentParser(
Expand All @@ -354,13 +397,22 @@ def main() -> None:
parser.add_argument(
"--paths-file",
default=None,
help="file containing path(s) to scan for files. (use this option for a large number of paths)",
help="file containing path(s) to scan for files. "
"(use this option for a large number of paths)",
)
parser.add_argument(
"-n",
"--non-recursive",
default=False,
action="store_true",
help="do not recursively index / do not descend into subdirectories",
)
parser.add_argument(
"--processes",
type=int,
default=1,
help="number of processes for multi-processing",
help="number of processes for multi-processing "
"(ignored if using --non-recursive)",
)
parser.add_argument(
"-u",
Expand Down Expand Up @@ -393,18 +445,25 @@ def main() -> None:
help="only collect basic metadata",
)
parser.add_argument(
"--no-patch",
"--patch",
default=False,
action="store_true",
help="do not replace/overwrite existing File-Catalog entries (aka don't patch). "
"NOTE: this should be used *only* as a fail-safe mechanism "
"(this option will not save any processing time); "
"use --blacklist-file if you know what files you want to skip",
help="replace/overwrite any existing File-Catalog entries (aka patch)",
)
parser.add_argument(
"--blacklist-file", help="blacklist file containing filepaths to skip",
"--blacklist",
metavar="BLACKPATH",
nargs="+",
default=None,
help="list of blacklisted filepaths; Ex: /foo/bar/ will skip /foo/bar/*",
)
parser.add_argument("-l", "--log", default="DEBUG", help="the output logging level")
parser.add_argument(
"--blacklist-file",
help="a file containing blacklisted filepaths on each line "
"(this is a useful alternative to `--blacklist` when there's many blacklisted paths); "
"Ex: /foo/bar/ will skip /foo/bar/*",
)
parser.add_argument("-l", "--log", default="INFO", help="the output logging level")
parser.add_argument("--iceprodv2-rc-token", default="", help="IceProd2 REST token")
parser.add_argument("--iceprodv1-db-pass", default="", help="IceProd1 SQL password")
parser.add_argument(
Expand All @@ -423,13 +482,19 @@ def main() -> None:
f"Collecting metadata from {args.paths} and those in file (at {args.paths_file})..."
)

# Aggregate and sort filepaths
# Aggregate, sort, and validate filepaths
paths = sorted_unique_filepaths(
file_of_filepaths=args.paths_file, list_of_filepaths=args.paths
file_of_filepaths=args.paths_file, list_of_filepaths=args.paths, abspaths=True
)
for p in paths: # pylint: disable=C0103
validate_path(p)

# Aggregate & sort blacklisted paths
blacklist = sorted_unique_filepaths(
file_of_filepaths=args.blacklist_file,
list_of_filepaths=args.blacklist,
abspaths=True,
)

# Read blacklisted paths
blacklist = sorted_unique_filepaths(file_of_filepaths=args.blacklist_file)

# Grab and pack args
rest_client_args: RestClientArgs = {
Expand All @@ -440,16 +505,19 @@ def main() -> None:
}
indexer_flags: IndexerFlags = {
"basic_only": args.basic_only,
"no_patch": args.no_patch,
"patch": args.patch,
"iceprodv2_rc_token": args.iceprodv2_rc_token,
"iceprodv1_db_pass": args.iceprodv1_db_pass,
"dryrun": args.dryrun,
}

# Go!
gather_file_info(
paths, blacklist, rest_client_args, args.site, indexer_flags, args.processes
)
if args.non_recursive:
index(paths, blacklist, rest_client_args, args.site, indexer_flags)
else:
recursively_index(
paths, blacklist, rest_client_args, args.site, indexer_flags, args.processes
)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 0d39ab1

Please sign in to comment.