From 96110a0ad3c94c4bce30e261ac3b544492c43f87 Mon Sep 17 00:00:00 2001 From: Johannes Nussbaum <39048939+jnussbaum@users.noreply.github.com> Date: Thu, 16 Nov 2023 09:17:08 +0100 Subject: [PATCH] refactor: apply sourcery suggestions (#644) --- src/dsp_tools/cli.py | 13 ++-- src/dsp_tools/commands/excel2json/lists.py | 11 +-- src/dsp_tools/commands/excel2json/project.py | 2 +- .../commands/excel2json/properties.py | 27 ++++---- .../commands/excel2json/resources.py | 47 ++++++------- src/dsp_tools/commands/excel2json/utils.py | 15 +--- .../commands/excel2xml/excel2xml_cli.py | 11 +-- .../commands/excel2xml/excel2xml_lib.py | 34 ++++----- .../commands/fast_xmlupload/process_files.py | 30 ++++---- .../commands/fast_xmlupload/upload_files.py | 11 ++- src/dsp_tools/commands/id2iri.py | 2 +- .../commands/project/create/project_create.py | 39 +++++------ .../project/create/project_create_lists.py | 2 +- .../project/create/project_validate.py | 69 +++++++++---------- src/dsp_tools/commands/project/get.py | 8 +-- src/dsp_tools/commands/start_stack.py | 5 +- src/dsp_tools/commands/xmlupload/ark2iri.py | 2 +- .../commands/xmlupload/project_client.py | 5 +- .../xmlupload/read_validate_xml_file.py | 5 +- .../xmlupload/resource_create_client.py | 45 ++++++------ .../stash/upload_stashed_xml_texts.py | 8 +-- src/dsp_tools/models/exceptions.py | 7 +- src/dsp_tools/utils/connection_live.py | 10 +-- 23 files changed, 177 insertions(+), 231 deletions(-) diff --git a/src/dsp_tools/cli.py b/src/dsp_tools/cli.py index 313247d85..3ab34045c 100644 --- a/src/dsp_tools/cli.py +++ b/src/dsp_tools/cli.py @@ -312,14 +312,15 @@ def _log_cli_arguments(parsed_args: argparse.Namespace) -> None: Args: parsed_args: parsed arguments """ - metadata_lines = [] - metadata_lines.append(f"DSP-TOOLS: Called the action '{parsed_args.action}' from the command line") - metadata_lines.append(f"DSP-TOOLS version: {_get_version()}") - metadata_lines.append(f"Location of this installation: {__file__}") - metadata_lines.append("CLI arguments:") + metadata_lines = [ + f"DSP-TOOLS: Called the action '{parsed_args.action}' from the command line", + f"DSP-TOOLS version: {_get_version()}", + f"Location of this installation: {__file__}", + "CLI arguments:", + ] metadata_lines = [f"*** {line}" for line in metadata_lines] - parameter_lines = list() + parameter_lines = [] parameters_to_log = {key: value for key, value in vars(parsed_args).items() if key != "action"} longest_key_length = max(len(key) for key in parameters_to_log) if parameters_to_log else 0 for key, value in parameters_to_log.items(): diff --git a/src/dsp_tools/commands/excel2json/lists.py b/src/dsp_tools/commands/excel2json/lists.py index dcadf4e42..bf502b37f 100644 --- a/src/dsp_tools/commands/excel2json/lists.py +++ b/src/dsp_tools/commands/excel2json/lists.py @@ -102,7 +102,7 @@ def _get_values_from_excel( dict: The JSON list up to the current recursion. At the last recursion, this is the final JSON list. """ nodes: list[dict[str, Any]] = [] - currentnode: dict[str, Any] = dict() + currentnode: dict[str, Any] = {} base_file_ws: Worksheet = list(base_file.values())[0] cell: Cell = base_file_ws.cell(column=col, row=row) @@ -162,7 +162,7 @@ def _get_values_from_excel( # append a number (p.ex. node-name-2) if there are list nodes with identical names n = list_of_previous_node_names.count(nodename) if n > 1: - nodename = nodename + "-" + str(n) + nodename = f"{nodename}-{n}" # read label values from the other Excel files (other languages) labels_dict: dict[str, str] = {} @@ -224,9 +224,10 @@ def _make_json_lists_from_excel( startcol = 1 # make a dict with the language labels and the worksheets - lang_to_worksheet: dict[str, Worksheet] = {} - for filepath in excel_file_paths: - lang_to_worksheet[os.path.basename(filepath)[0:2]] = load_workbook(filepath, read_only=True).worksheets[0] + lang_to_worksheet = { + os.path.basename(filepath)[:2]: load_workbook(filepath, read_only=True).worksheets[0] + for filepath in excel_file_paths + } # take English as base file. If English is not available, take a random one. base_lang = "en" if "en" in lang_to_worksheet else list(lang_to_worksheet.keys())[0] diff --git a/src/dsp_tools/commands/excel2json/project.py b/src/dsp_tools/commands/excel2json/project.py index c31e1a13d..7b649d797 100644 --- a/src/dsp_tools/commands/excel2json/project.py +++ b/src/dsp_tools/commands/excel2json/project.py @@ -51,7 +51,7 @@ def excel2json( processed_files = [] onto_folders = [x for x in folder if os.path.isdir(x) and regex.search(r"([\w.-]+) \(([\w.\- ]+)\)", x.name)] - if len(onto_folders) == 0: + if not onto_folders: raise UserError( f"'{data_model_files}' must contain at least one subfolder named after the pattern 'onto_name (onto_label)'" ) diff --git a/src/dsp_tools/commands/excel2json/properties.py b/src/dsp_tools/commands/excel2json/properties.py index 81aec19db..e4f542ffb 100644 --- a/src/dsp_tools/commands/excel2json/properties.py +++ b/src/dsp_tools/commands/excel2json/properties.py @@ -37,8 +37,7 @@ def _search_json_validation_error_get_err_msg_str( A string which is used in the Error message that contains detailed information about the problem """ err_msg_list = [f"The 'properties' section defined in the Excel file '{excelfile}' did not pass validation."] - json_path_to_property = regex.search(r"^\$\[(\d+)\]", validation_error.json_path) - if json_path_to_property: + if json_path_to_property := regex.search(r"^\$\[(\d+)\]", validation_error.json_path): # fmt: off wrong_property_name = ( jsonpath_ng.ext.parse(json_path_to_property.group(0)) @@ -48,11 +47,10 @@ def _search_json_validation_error_get_err_msg_str( # fmt: on excel_row = int(json_path_to_property.group(1)) + 2 err_msg_list.append(f"The problematic property is '{wrong_property_name}' in Excel row {excel_row}.") - affected_field = regex.search( + if affected_field := regex.search( r"name|labels|comments|super|subject|object|gui_element|gui_attributes", validation_error.json_path, - ) - if affected_field: + ): err_msg_list.append( f"The problem is that the column '{affected_field.group(0)}' has an invalid value: " f"{validation_error.message}" @@ -131,12 +129,12 @@ def _unpack_gui_attributes(attribute_str: str) -> dict[str, str]: IndexError: if the sub-lists do not contain each two items """ # Create a list with several attributes - gui_list = [x.strip() for x in attribute_str.split(",") if not x.strip() == ""] + gui_list = [x.strip() for x in attribute_str.split(",") if x.strip() != ""] # create a sub list with the kex value pair of the attribute if it is an empty string we exclude it. # this error will be detected when checking for the length of the lists sub_gui_list = [[sub.strip() for sub in x.split(":") if sub.strip() != ""] for x in gui_list] # if not all sublist contain two items, something is wrong with the attribute - if not all(len(sub) == 2 for sub in sub_gui_list): + if any(len(sub) != 2 for sub in sub_gui_list): raise IndexError return {sub[0]: sub[1] for sub in sub_gui_list} @@ -450,15 +448,14 @@ def excel2properties( ) # transform every row into a property - props: list[dict[str, Any]] = [] - for index, row in property_df.iterrows(): - props.append( - _row2prop( - df_row=row, - row_num=int(str(index)) + 2, # index is a label/index/hashable, but we need an int - excelfile=excelfile, - ) + props = [ + _row2prop( + df_row=row, + row_num=int(str(index)) + 2, # index is a label/index/hashable, but we need an int + excelfile=excelfile, ) + for index, row in property_df.iterrows() + ] # write final JSON file _validate_properties(properties_list=props, excelfile=excelfile) diff --git a/src/dsp_tools/commands/excel2json/resources.py b/src/dsp_tools/commands/excel2json/resources.py index 79622a951..c0d448840 100644 --- a/src/dsp_tools/commands/excel2json/resources.py +++ b/src/dsp_tools/commands/excel2json/resources.py @@ -41,45 +41,42 @@ def _validate_resources( jsonschema.validate(instance=resources_list, schema=resources_schema) except jsonschema.ValidationError as err: err_msg = f"The 'resources' section defined in the Excel file '{excelfile}' did not pass validation. " - json_path_to_resource = regex.search(r"^\$\[(\d+)\]", err.json_path) - if json_path_to_resource: + if json_path_to_resource := regex.search(r"^\$\[(\d+)\]", err.json_path): # fmt: off - wrong_resource_name = ( + wrong_res_name = ( jsonpath_ng.ext.parse(json_path_to_resource.group(0)) .find(resources_list)[0] .value["name"] ) # fmt: on - affected_field = regex.search(r"name|labels|comments|super|cardinalities\[(\d+)\]", err.json_path) - if affected_field and affected_field.group(0) in ["name", "labels", "comments", "super"]: - excel_row = int(json_path_to_resource.group(1)) + 2 - err_msg += ( - f"The problem is that the Excel sheet 'classes' contains an invalid value for resource " - f"'{wrong_resource_name}', in row {excel_row}, column '{affected_field.group(0)}': {err.message}" - ) - elif affected_field and "cardinalities" in affected_field.group(0): - excel_row = int(affected_field.group(1)) + 2 - if err.json_path.endswith("cardinality"): + if affected_field := regex.search(r"name|labels|comments|super|cardinalities\[(\d+)\]", err.json_path): + if affected_field.group(0) in ["name", "labels", "comments", "super"]: + excel_row = int(json_path_to_resource.group(1)) + 2 err_msg += ( - f"The problem is that the Excel sheet '{wrong_resource_name}' contains an invalid value " - f"in row {excel_row}, column 'Cardinality': {err.message}" - ) - elif err.json_path.endswith("propname"): - err_msg += ( - f"The problem is that the Excel sheet '{wrong_resource_name}' contains an invalid value " - f"in row {excel_row}, column 'Property': {err.message}" + f"The problem is that the Excel sheet 'classes' contains an invalid value for resource " + f"'{wrong_res_name}', in row {excel_row}, column '{affected_field.group(0)}': {err.message}" ) + elif "cardinalities" in affected_field.group(0): + excel_row = int(affected_field.group(1)) + 2 + if err.json_path.endswith("cardinality"): + err_msg += ( + f"The problem is that the Excel sheet '{wrong_res_name}' contains an invalid value " + f"in row {excel_row}, column 'Cardinality': {err.message}" + ) + elif err.json_path.endswith("propname"): + err_msg += ( + f"The problem is that the Excel sheet '{wrong_res_name}' contains an invalid value " + f"in row {excel_row}, column 'Property': {err.message}" + ) else: err_msg += f"The error message is: {err.message}\nThe error occurred at {err.json_path}" raise UserError(err_msg) from None # check if resource names are unique all_names = [r["name"] for r in resources_list] - duplicates: dict[int, str] = dict() - for index, resdef in enumerate(resources_list): - if all_names.count(resdef["name"]) > 1: - duplicates[index + 2] = resdef["name"] - if duplicates: + if duplicates := { + index + 2: resdef["name"] for index, resdef in enumerate(resources_list) if all_names.count(resdef["name"]) > 1 + }: err_msg = ( f"Resource names must be unique inside every ontology, " f"but your Excel file '{excelfile}' contains duplicates:\n" diff --git a/src/dsp_tools/commands/excel2json/utils.py b/src/dsp_tools/commands/excel2json/utils.py index 10f6f401b..b0a575f82 100644 --- a/src/dsp_tools/commands/excel2json/utils.py +++ b/src/dsp_tools/commands/excel2json/utils.py @@ -214,10 +214,7 @@ def get_comments(df_row: pd.Series) -> dict[str, str] | None: A dictionary with the language tag and the content of the cell """ comments = {lang: df_row[f"comment_{lang}"] for lang in languages if df_row[f"comment_{lang}"] is not pd.NA} - if comments == {}: - return None - else: - return comments + return comments or None def find_one_full_cell_in_cols(df: pd.DataFrame, required_columns: list[str]) -> pd.Series | None: @@ -238,10 +235,7 @@ def find_one_full_cell_in_cols(df: pd.DataFrame, required_columns: list[str]) -> # If all are True logical_and returns True otherwise False combined_array = np.logical_and.reduce(result_arrays) # if any of the values are True, it is turned into a pd.Series - if any(combined_array): - return pd.Series(combined_array) - else: - return None + return pd.Series(combined_array) if any(combined_array) else None def col_must_or_not_empty_based_on_other_col( @@ -281,10 +275,7 @@ def col_must_or_not_empty_based_on_other_col( substring_array = df[substring_colname].str.contains("|".join(substring_list), na=False, regex=True) # If both are True logical_and returns True otherwise False combined_array = np.logical_and(na_series, substring_array) - if any(combined_array): - return pd.Series(combined_array) - else: - return None + return pd.Series(combined_array) if any(combined_array) else None def add_optional_columns(df: pd.DataFrame, optional_col_set: set[str]) -> pd.DataFrame: diff --git a/src/dsp_tools/commands/excel2xml/excel2xml_cli.py b/src/dsp_tools/commands/excel2xml/excel2xml_cli.py index e9500867d..6245194ac 100644 --- a/src/dsp_tools/commands/excel2xml/excel2xml_cli.py +++ b/src/dsp_tools/commands/excel2xml/excel2xml_cli.py @@ -364,8 +364,7 @@ def _convert_row_to_property_elements( # if all other cells are empty, continue with next property element other_cell_headers = [f"{i}_{x}" for x in ["encoding", "permissions", "comment"]] notna_cell_headers = [x for x in other_cell_headers if check_notna(row.get(x))] - notna_cell_headers_str = ", ".join([f"'{x}'" for x in notna_cell_headers]) - if notna_cell_headers_str: + if notna_cell_headers_str := ", ".join([f"'{x}'" for x in notna_cell_headers]): warnings.warn( f"Error in resource '{resource_id}': Excel row {row_number} has an entry " f"in column(s) {notna_cell_headers_str}, but not in '{i}_value'. " @@ -388,7 +387,7 @@ def _convert_row_to_property_elements( property_elements.append(PropertyElement(**kwargs_propelem)) # validate the end result before returning it - if len(property_elements) == 0: + if not property_elements: warnings.warn( f"At least one value per property is required, " f"but resource '{resource_id}', property '{row['prop name']}' (Excel row {row_number}) doesn't contain any values." @@ -468,13 +467,9 @@ def _create_property( kwargs_propfunc: dict[str, Union[str, PropertyElement, list[PropertyElement]]] = { "name": row["prop name"], "calling_resource": resource_id, + "value": property_elements[0] if row.get("prop type") == "boolean-prop" else property_elements, } - if row.get("prop type") == "boolean-prop": - kwargs_propfunc["value"] = property_elements[0] - else: - kwargs_propfunc["value"] = property_elements - if check_notna(row.get("prop list")): kwargs_propfunc["list_name"] = str(row["prop list"]) diff --git a/src/dsp_tools/commands/excel2xml/excel2xml_lib.py b/src/dsp_tools/commands/excel2xml/excel2xml_lib.py index 601075462..3d187b252 100644 --- a/src/dsp_tools/commands/excel2xml/excel2xml_lib.py +++ b/src/dsp_tools/commands/excel2xml/excel2xml_lib.py @@ -102,7 +102,6 @@ def find_date_in_string(string: str) -> Optional[str]: # sanitize input, just in case that the method was called on an empty or N/A cell if not check_notna(string): return None - string = str(string) months_dict = { "January": 1, @@ -214,9 +213,9 @@ def find_date_in_string(string: str) -> Optional[str]: elif year_range: startyear = int(year_range.group(1)) endyear = int(year_range.group(2)) - if int(endyear / 100) == 0: + if endyear // 100 == 0: # endyear is only 2-digit: add the first two digits of startyear - endyear = int(startyear / 100) * 100 + endyear + endyear = startyear // 100 * 100 + endyear elif year_only: startyear = int(year_only.group(0)) @@ -1265,10 +1264,7 @@ def make_text_prop( kwargs = {"permissions": val.permissions} if check_notna(val.comment): kwargs["comment"] = val.comment - if check_notna(val.encoding): - kwargs["encoding"] = val.encoding - else: - kwargs["encoding"] = "utf8" + kwargs["encoding"] = val.encoding if check_notna(val.encoding) else "utf8" value_ = etree.Element( "{%s}text" % xml_namespace_map[None], **kwargs, # type: ignore[arg-type] @@ -1680,7 +1676,7 @@ def create_json_excel_list_mapping( corrections = corrections or {} # split the values, if necessary - excel_values_new = list() + excel_values_new = [] for val in excel_values: if isinstance(val, str): excel_values_new.extend([x.strip() for x in val.split(sep) if x]) @@ -1688,24 +1684,23 @@ def create_json_excel_list_mapping( # read the list of the JSON project (works also for nested lists) with open(path_to_json, encoding="utf-8") as f: json_file = json.load(f) - json_subset = list() + json_subset = [] for elem in json_file["project"]["lists"]: if elem["name"] == list_name: json_subset = elem["nodes"] json_values = set(_nested_dict_values_iterator(json_subset)) # build dictionary with the mapping, based on string similarity - res = dict() + res = {} for excel_value in excel_values_new: excel_value_corrected = corrections.get(excel_value, excel_value) excel_value_simpl = simplify_name(excel_value_corrected) # increase match probability by removing illegal chars - matches: list[str] = difflib.get_close_matches( + if matches := difflib.get_close_matches( word=excel_value_simpl, possibilities=json_values, n=1, cutoff=0.6, - ) - if matches: + ): res[excel_value] = matches[0] res[excel_value.lower()] = matches[0] else: @@ -1731,8 +1726,7 @@ def _nested_dict_values_iterator(dicts: list[dict[str, Any]]) -> Iterable[str]: # Credits: https://thispointer.com/python-iterate-loop-over-all-nested-dictionary-values/ for _dict in dicts: if "nodes" in _dict: - for value in _nested_dict_values_iterator(_dict["nodes"]): - yield value + yield from _nested_dict_values_iterator(_dict["nodes"]) if "name" in _dict: yield _dict["name"] @@ -1760,10 +1754,7 @@ def create_json_list_mapping( """ with open(path_to_json, encoding="utf-8") as f: json_file = json.load(f) - json_subset = list() - for numbered_json_obj in json_file["project"]["lists"]: - if numbered_json_obj["name"] == list_name: - json_subset.append(numbered_json_obj) + json_subset = [x for x in json_file["project"]["lists"] if x["name"] == list_name] # json_subset is a list containing one item, namely the json object containing the entire json-list res = {} @@ -1793,9 +1784,8 @@ def _name_label_mapper_iterator( # node is the json object containing the entire json-list if "nodes" in node: # "nodes" is the json sub-object containing the entries of the json-list - for value in _name_label_mapper_iterator(node["nodes"], language_label): - yield value - # "value" is a (label, name) pair of a single list entry + yield from _name_label_mapper_iterator(node["nodes"], language_label) + # each yielded value is a (label, name) pair of a single list entry if "name" in node: yield (node["labels"][language_label], node["name"]) # the actual values of the name and the label diff --git a/src/dsp_tools/commands/fast_xmlupload/process_files.py b/src/dsp_tools/commands/fast_xmlupload/process_files.py index 07667b549..74b2a0fed 100644 --- a/src/dsp_tools/commands/fast_xmlupload/process_files.py +++ b/src/dsp_tools/commands/fast_xmlupload/process_files.py @@ -489,14 +489,14 @@ def _get_file_category_from_extension(file: Path) -> Optional[str]: Returns: the file category, either IMAGE, VIDEO or OTHER (or None) """ - extensions: dict[str, list[str]] = dict() - extensions["image"] = [".jpg", ".jpeg", ".tif", ".tiff", ".jp2", ".png"] - extensions["video"] = [".mp4"] - extensions["archive"] = [".7z", ".gz", ".gzip", ".tar", ".tar.gz", ".tgz", ".z", ".zip"] - extensions["text"] = [".csv", ".txt", ".xml", ".xsd", ".xsl"] - extensions["document"] = [".doc", ".docx", ".pdf", ".ppt", ".pptx", ".xls", ".xlsx"] - extensions["audio"] = [".mp3", ".wav"] - + extensions = { + "image": [".jpg", ".jpeg", ".tif", ".tiff", ".jp2", ".png"], + "video": [".mp4"], + "archive": [".7z", ".gz", ".gzip", ".tar", ".tar.gz", ".tgz", ".z", ".zip"], + "text": [".csv", ".txt", ".xml", ".xsd", ".xsl"], + "document": [".doc", ".docx", ".pdf", ".ppt", ".pptx", ".xls", ".xlsx"], + "audio": [".mp3", ".wav"], + } if file.suffix.lower() in extensions["video"]: category = "VIDEO" elif file.suffix.lower() in extensions["image"]: @@ -524,10 +524,7 @@ def _extract_preview_from_video(file: Path) -> bool: """ result = subprocess.call(["/bin/bash", f"{export_moving_image_frames_script}", "-i", f"{file}"]) - if result != 0: - return False - else: - return True + return result == 0 def _process_file( @@ -560,7 +557,7 @@ def _process_file( # get random UUID for internal file handling, and create directory structure internal_filename = str(uuid.uuid4()) - out_dir_full = Path(output_dir, internal_filename[0:2], internal_filename[2:4]) + out_dir_full = Path(output_dir, internal_filename[:2], internal_filename[2:4]) out_dir_full.mkdir(parents=True, exist_ok=True) # create .orig file @@ -751,14 +748,13 @@ def _write_processed_and_unprocessed_files_to_txt_files( if Path("processed_files.txt").is_file(): with open("processed_files.txt", "r", encoding="utf-8") as f: previously_processed_files = [Path(x) for x in f.read().splitlines()] - processed_original_paths = processed_original_paths + previously_processed_files + processed_original_paths += previously_processed_files with open("processed_files.txt", "w", encoding="utf-8") as f: f.write("\n".join([str(x) for x in processed_original_paths])) msg = "Wrote 'processed_files.txt'" - unprocessed_original_paths = [x for x in all_files if x not in processed_original_paths] - if unprocessed_original_paths: + if unprocessed_original_paths := [x for x in all_files if x not in processed_original_paths]: with open("unprocessed_files.txt", "w", encoding="utf-8") as f: f.write("\n".join([str(x) for x in unprocessed_original_paths])) msg += " and 'unprocessed_files.txt'" @@ -828,7 +824,7 @@ def double_check_unprocessed_files( if unprocessed_files_txt_exists: # there is a 'unprocessed_files.txt' file. check it for consistency unprocessed_files_from_processed_files = [x for x in all_files if x not in processed_files] - if not sorted(unprocessed_files_from_processed_files) == sorted(unprocessed_files): + if sorted(unprocessed_files_from_processed_files) != sorted(unprocessed_files): logger.error("The files 'unprocessed_files.txt' and 'processed_files.txt' are inconsistent") raise UserError("The files 'unprocessed_files.txt' and 'processed_files.txt' are inconsistent") diff --git a/src/dsp_tools/commands/fast_xmlupload/upload_files.py b/src/dsp_tools/commands/fast_xmlupload/upload_files.py index 40e2cb459..de5501002 100644 --- a/src/dsp_tools/commands/fast_xmlupload/upload_files.py +++ b/src/dsp_tools/commands/fast_xmlupload/upload_files.py @@ -47,10 +47,10 @@ def get_pkl_files() -> list[Path]: Returns: list of pickle files """ - pkl_file_paths = [Path(x) for x in glob.glob("processing_result_*.pkl")] - if len(pkl_file_paths) == 0: + if pkl_file_paths := [Path(x) for x in glob.glob("processing_result_*.pkl")]: + return pkl_file_paths + else: raise UserError("No pickle file found. Please run the processing step first.") - return pkl_file_paths def _get_paths_from_pkl_files(pkl_files: list[Path]) -> list[Path]: @@ -100,10 +100,7 @@ def _get_upload_candidates( Returns: list of all processed files that belong to the same original file """ - upload_candidates: list[str] = [] - upload_candidates.extend( - glob.glob(f"{dir_with_processed_files}/**/**/{internal_filename_of_processed_file.stem}/*.*") - ) + upload_candidates = glob.glob(f"{dir_with_processed_files}/**/**/{internal_filename_of_processed_file.stem}/*.*") upload_candidates.extend( glob.glob(f"{dir_with_processed_files}/**/**/{internal_filename_of_processed_file.stem}*.*") ) diff --git a/src/dsp_tools/commands/id2iri.py b/src/dsp_tools/commands/id2iri.py index 87e997723..b22cd53a2 100644 --- a/src/dsp_tools/commands/id2iri.py +++ b/src/dsp_tools/commands/id2iri.py @@ -203,7 +203,7 @@ def _write_output_file( tree: modified XML tree with replaced IDs """ timestamp_str = datetime.now().strftime("%Y%m%d-%H%M%S") - out_file = orig_xml_file.stem + "_replaced_" + timestamp_str + ".xml" + out_file = f"{orig_xml_file.stem}_replaced_{timestamp_str}.xml" et = etree.ElementTree(tree) et.write(out_file, pretty_print=True, xml_declaration=True, encoding="utf-8") logger.info(f"XML with replaced IDs was written to file {out_file}.") diff --git a/src/dsp_tools/commands/project/create/project_create.py b/src/dsp_tools/commands/project/create/project_create.py index bdd8e93ce..ed470586f 100644 --- a/src/dsp_tools/commands/project/create/project_create.py +++ b/src/dsp_tools/commands/project/create/project_create.py @@ -1,5 +1,7 @@ """This module handles the ontology creation, update and upload to a DSP server. This includes the creation and update of the project, the creation of groups, users, lists, resource classes, properties and cardinalities.""" + +import contextlib from pathlib import Path from typing import Any, Optional, Union, cast @@ -52,8 +54,8 @@ def _create_project_on_server( Returns: a tuple of the remote project and the success status (True if everything went smoothly, False otherwise) """ - try: - # the normal, expected case is that this try block fails + with contextlib.suppress(BaseError): + # the normal, expected case is that this block fails project_local = Project(con=con, shortcode=shortcode) project_remote: Project = try_network_action(project_local.read) proj_designation = f"'{project_remote.shortname}' ({project_remote.shortcode})" @@ -74,8 +76,6 @@ def _create_project_on_server( # There are other things from this file that can be created on the server, # e.g. the groups and users, so the process must continue. return project_remote, False - except BaseError: - pass success = True project_local = Project( @@ -188,8 +188,7 @@ def _create_groups( group_name = group["name"] # if the group already exists, add it to "current_project_groups" (for later usage), then skip it - remotely_existing_group = [g for g in remote_groups if g.name == group_name] - if remotely_existing_group: + if remotely_existing_group := [g for g in remote_groups if g.name == group_name]: current_project_groups[group_name] = remotely_existing_group[0] print(f"\tWARNING: Group name '{group_name}' already exists on the DSP server. Skipping...") logger.warning(f"Group name '{group_name}' already exists on the DSP server. Skipping...") @@ -403,16 +402,13 @@ def _create_users( username = json_user_definition["username"] # skip the user if he already exists - try: - # the normal case is that this try block fails + with contextlib.suppress(BaseError): + # the normal case is that this block fails try_network_action(User(con, email=json_user_definition["email"]).read) print(f"\tWARNING: User '{username}' already exists on the DSP server. Skipping...") logger.warning(f"User '{username}' already exists on the DSP server. Skipping...") overall_success = False continue - except BaseError: - pass - # add user to the group(s) group_iris, sysadmin, success = _get_group_iris_for_user( json_user_definition=json_user_definition, @@ -478,12 +474,11 @@ def _sort_resources( # do not modify the original unsorted_resources, which points to the original JSON project file resources_to_sort = unsorted_resources.copy() - sorted_resources: list[dict[str, Any]] = list() - ok_resource_names: list[str] = list() - while len(resources_to_sort) > 0: + sorted_resources: list[dict[str, Any]] = [] + ok_resource_names: list[str] = [] + while resources_to_sort: # inside the for loop, resources_to_sort is modified, so a copy must be made to iterate over for res in resources_to_sort.copy(): - res_name = f'{onto_name}:{res["name"]}' parent_classes = res["super"] if isinstance(parent_classes, str): parent_classes = [parent_classes] @@ -491,6 +486,7 @@ def _sort_resources( parent_classes_ok = [not p.startswith(onto_name) or p in ok_resource_names for p in parent_classes] if all(parent_classes_ok): sorted_resources.append(res) + res_name = f'{onto_name}:{res["name"]}' ok_resource_names.append(res_name) resources_to_sort.remove(res) return sorted_resources @@ -514,9 +510,9 @@ def _sort_prop_classes( # do not modify the original unsorted_prop_classes, which points to the original JSON project file prop_classes_to_sort = unsorted_prop_classes.copy() - sorted_prop_classes: list[dict[str, Any]] = list() - ok_propclass_names: list[str] = list() - while len(prop_classes_to_sort) > 0: + sorted_prop_classes: list[dict[str, Any]] = [] + ok_propclass_names: list[str] = [] + while prop_classes_to_sort: # inside the for loop, resources_to_sort is modified, so a copy must be made to iterate over for prop in prop_classes_to_sort.copy(): prop_name = f'{onto_name}:{prop["name"]}' @@ -591,7 +587,7 @@ def _create_ontology( context.add_context( ontology_remote.name, - ontology_remote.iri + ("#" if not ontology_remote.iri.endswith("#") else ""), + ontology_remote.iri + ("" if ontology_remote.iri.endswith("#") else "#"), ) # add the prefixes defined in the JSON file @@ -898,7 +894,7 @@ def _add_cardinalities_to_resource_classes( "1-n": Cardinality.C_1_n, } for res_class in resclass_definitions: - res_class_remote = remote_res_classes.get(ontology_remote.iri + "#" + res_class["name"]) + res_class_remote = remote_res_classes.get(f"{ontology_remote.iri}#{res_class['name']}") if not res_class_remote: msg = ( f"Unable to add cardinalities to resource class '{res_class['name']}': " @@ -1040,8 +1036,7 @@ def create_project( context = Context(project_definition.get("prefixes", {})) # expand the Excel files referenced in the "lists" section of the project (if any), and add them to the project - new_lists = expand_lists_from_excel(project_definition.get("project", {}).get("lists", [])) - if new_lists: + if new_lists := expand_lists_from_excel(project_definition.get("project", {}).get("lists", [])): project_definition["project"]["lists"] = new_lists # validate against JSON schema diff --git a/src/dsp_tools/commands/project/create/project_create_lists.py b/src/dsp_tools/commands/project/create/project_create_lists.py index 330cd6045..239804c02 100644 --- a/src/dsp_tools/commands/project/create/project_create_lists.py +++ b/src/dsp_tools/commands/project/create/project_create_lists.py @@ -103,7 +103,7 @@ def create_lists_on_server( ) except BaseError: err_msg = "Unable to retrieve existing lists on DSP server. Cannot check if your lists are already existing." - print("WARNING: " + err_msg) + print(f"WARNING: {err_msg}") logger.warning(err_msg, exc_info=True) existing_lists = [] overall_success = False diff --git a/src/dsp_tools/commands/project/create/project_validate.py b/src/dsp_tools/commands/project/create/project_validate.py index 6232641f2..6660dd52a 100644 --- a/src/dsp_tools/commands/project/create/project_validate.py +++ b/src/dsp_tools/commands/project/create/project_validate.py @@ -26,26 +26,26 @@ def _check_for_duplicate_names(project_definition: dict[str, Any]) -> bool: Returns: True if the resource/property names are unique """ - resnames_duplicates: dict[str, set[str]] = dict() - propnames_duplicates: dict[str, set[str]] = dict() + resnames_duplicates: dict[str, set[str]] = {} + propnames_duplicates: dict[str, set[str]] = {} for onto in project_definition["project"]["ontologies"]: resnames = [r["name"] for r in onto["resources"]] if len(set(resnames)) != len(resnames): for elem in resnames: if resnames.count(elem) > 1: - if not resnames_duplicates.get(onto["name"]): - resnames_duplicates[onto["name"]] = {elem} - else: + if resnames_duplicates.get(onto["name"]): resnames_duplicates[onto["name"]].add(elem) + else: + resnames_duplicates[onto["name"]] = {elem} propnames = [p["name"] for p in onto["properties"]] if len(set(propnames)) != len(propnames): for elem in propnames: if propnames.count(elem) > 1: - if not propnames_duplicates.get(onto["name"]): - propnames_duplicates[onto["name"]] = {elem} - else: + if propnames_duplicates.get(onto["name"]): propnames_duplicates[onto["name"]].add(elem) + else: + propnames_duplicates[onto["name"]] = {elem} if not resnames_duplicates and not propnames_duplicates: return True @@ -76,7 +76,7 @@ def _check_for_undefined_super_resource(project_definition: dict[str, Any]) -> b Returns: True if the superresource are valid """ - errors: dict[str, list[str]] = dict() + errors: dict[str, list[str]] = {} for onto in project_definition["project"]["ontologies"]: ontoname = onto["name"] resnames = [r["name"] for r in onto["resources"]] @@ -97,13 +97,13 @@ def _check_for_undefined_super_resource(project_definition: dict[str, Any]) -> b # convert to short form supers = [regex.sub(f"^{ontoname}", "", s) for s in supers] - invalid_references = [s for s in supers if regex.sub(":", "", s) not in resnames] - if invalid_references: + if invalid_references := [s for s in supers if regex.sub(":", "", s) not in resnames]: errors[f"Ontology '{ontoname}', resource '{res['name']}'"] = invalid_references if errors: - err_msg = "Your data model contains resources that are derived from an invalid super-resource:\n" - err_msg += "\n".join(f" - {loc}: {invalids}" for loc, invalids in errors.items()) + err_msg = "Your data model contains resources that are derived from an invalid super-resource:\n" + "\n".join( + f" - {loc}: {invalids}" for loc, invalids in errors.items() + ) raise BaseError(err_msg) return True @@ -123,7 +123,7 @@ def _check_for_undefined_super_property(project_definition: dict[str, Any]) -> b Returns: True if the superproperties are valid """ - errors: dict[str, list[str]] = dict() + errors: dict[str, list[str]] = {} for onto in project_definition["project"]["ontologies"]: ontoname = onto["name"] propnames = [p["name"] for p in onto["properties"]] @@ -144,13 +144,13 @@ def _check_for_undefined_super_property(project_definition: dict[str, Any]) -> b # convert to short form supers = [regex.sub(f"^{ontoname}", "", s) for s in supers] - invalid_references = [s for s in supers if regex.sub(":", "", s) not in propnames] - if invalid_references: + if invalid_references := [s for s in supers if regex.sub(":", "", s) not in propnames]: errors[f"Ontology '{ontoname}', property '{prop['name']}'"] = invalid_references if errors: - err_msg = "Your data model contains properties that are derived from an invalid super-property:\n" - err_msg += "\n".join(f" - {loc}: {invalids}" for loc, invalids in errors.items()) + err_msg = "Your data model contains properties that are derived from an invalid super-property:\n" + "\n".join( + f" - {loc}: {invalids}" for loc, invalids in errors.items() + ) raise BaseError(err_msg) return True @@ -169,7 +169,7 @@ def _check_for_undefined_cardinalities(project_definition: dict[str, Any]) -> bo Returns: True if all cardinalities are defined in the "properties" section """ - errors: dict[str, list[str]] = dict() + errors: dict[str, list[str]] = {} for onto in project_definition["project"]["ontologies"]: ontoname = onto["name"] propnames = [prop["name"] for prop in onto["properties"]] @@ -190,13 +190,13 @@ def _check_for_undefined_cardinalities(project_definition: dict[str, Any]) -> bo # convert to short form cardnames = [regex.sub(f"^{ontoname}:", ":", card) for card in cardnames] - invalid_cardnames = [card for card in cardnames if regex.sub(":", "", card) not in propnames] - if invalid_cardnames: + if invalid_cardnames := [card for card in cardnames if regex.sub(":", "", card) not in propnames]: errors[f"Ontology '{ontoname}', resource '{res['name']}'"] = invalid_cardnames if errors: - err_msg = "Your data model contains cardinalities with invalid propnames:\n" - err_msg += "\n".join(f" - {loc}: {invalids}" for loc, invalids in errors.items()) + err_msg = "Your data model contains cardinalities with invalid propnames:\n" + "\n".join( + f" - {loc}: {invalids}" for loc, invalids in errors.items() + ) raise BaseError(err_msg) return True @@ -249,8 +249,7 @@ def validate_project( # expand all lists referenced in the "lists" section of the project definition, # and add them to the project definition if expand_lists: - new_lists = expand_lists_from_excel(project_definition["project"].get("lists", [])) - if new_lists: + if new_lists := expand_lists_from_excel(project_definition["project"].get("lists", [])): project_definition["project"]["lists"] = new_lists # validate the project definition against the schema @@ -323,9 +322,9 @@ def _collect_link_properties(project_definition: dict[Any, Any]) -> dict[str, li """ ontos = project_definition["project"]["ontologies"] hasLinkTo_props = {"hasLinkTo", "isPartOf", "isRegionOf", "isAnnotationOf"} - link_properties: dict[str, list[str]] = dict() + link_properties: dict[str, list[str]] = {} for index, onto in enumerate(ontos): - hasLinkTo_matches = list() + hasLinkTo_matches = [] # look for child-properties down to 5 inheritance levels that are derived from hasLinkTo-properties for _ in range(5): for hasLinkTo_prop in hasLinkTo_props: @@ -336,7 +335,7 @@ def _collect_link_properties(project_definition: dict[Any, Any]) -> dict[str, li ) # make the children from this iteration to the parents of the next iteration hasLinkTo_props = {x.value["name"] for x in hasLinkTo_matches} - prop_obj_pair: dict[str, list[str]] = dict() + prop_obj_pair: dict[str, list[str]] = {} for match in hasLinkTo_matches: prop = onto["name"] + ":" + match.value["name"] target = match.value["object"] @@ -347,8 +346,8 @@ def _collect_link_properties(project_definition: dict[Any, Any]) -> dict[str, li link_properties.update(prop_obj_pair) # in case the object of a property is "Resource", the link can point to any resource class - all_res_names: list[str] = list() - for index, onto in enumerate(ontos): + all_res_names: list[str] = [] + for onto in ontos: matches = jsonpath_ng.ext.parse("$.resources[*].name").find(onto) tmp = [f"{onto['name']}:{match.value}" for match in matches] all_res_names.extend(tmp) @@ -377,8 +376,8 @@ def _identify_problematic_cardinalities( # make 2 dicts of the following form: # dependencies = {"rosetta:Text": {"rosetta:hasImage2D": ["rosetta:Image2D"], ...}} # cardinalities = {"rosetta:Text": {"rosetta:hasImage2D": "0-1", ...}} - dependencies: dict[str, dict[str, list[str]]] = dict() - cardinalities: dict[str, dict[str, str]] = dict() + dependencies: dict[str, dict[str, list[str]]] = {} + cardinalities: dict[str, dict[str, str]] = {} for onto in project_definition["project"]["ontologies"]: for resource in onto["resources"]: resname: str = onto["name"] + ":" + resource["name"] @@ -393,10 +392,8 @@ def _identify_problematic_cardinalities( # For this reason, `targets` must be created with `targets = list(link_properties[cardname])` targets = list(link_properties[cardname]) if resname not in dependencies: - dependencies[resname] = dict() - dependencies[resname][cardname] = targets - cardinalities[resname] = dict() - cardinalities[resname][cardname] = card["cardinality"] + dependencies[resname] = {cardname: targets} + cardinalities[resname] = {cardname: card["cardinality"]} elif cardname not in dependencies[resname]: dependencies[resname][cardname] = targets cardinalities[resname][cardname] = card["cardinality"] diff --git a/src/dsp_tools/commands/project/get.py b/src/dsp_tools/commands/project/get.py index 01f166071..af74f8b90 100644 --- a/src/dsp_tools/commands/project/get.py +++ b/src/dsp_tools/commands/project/get.py @@ -66,8 +66,7 @@ def get_project( if verbose: print("Getting groups...") groups_obj: list[dict[str, Any]] = [] - groups = Group.getAllGroupsForProject(con=con, proj_iri=str(project.iri)) - if groups: + if groups := Group.getAllGroupsForProject(con=con, proj_iri=str(project.iri)): for group in groups: groups_obj.append(group.createDefinitionFileObj()) if verbose: @@ -99,8 +98,7 @@ def get_project( if verbose: print("Getting lists...") list_obj: list[dict[str, Any]] = [] - list_roots = ListNode.getAllLists(con=con, project_iri=project.iri) - if list_roots: + if list_roots := ListNode.getAllLists(con=con, project_iri=project.iri): for list_root in list_roots: complete_list = list_root.getAllNodes() list_obj.append(complete_list.createDefinitionFileObj()) @@ -112,7 +110,7 @@ def get_project( if verbose: print("Getting ontologies...") project_obj["ontologies"] = [] - prefixes: dict[str, str] = dict() + prefixes: dict[str, str] = {} ontologies = Ontology.getProjectOntologies(con, str(project.iri)) ontology_ids = [onto.iri for onto in ontologies] for ontology_id in ontology_ids: diff --git a/src/dsp_tools/commands/start_stack.py b/src/dsp_tools/commands/start_stack.py index 4547f5a90..bbb023ac7 100644 --- a/src/dsp_tools/commands/start_stack.py +++ b/src/dsp_tools/commands/start_stack.py @@ -41,9 +41,8 @@ def __post_init__(self) -> None: Raises: UserError: if one of the parameters is invalid """ - if self.max_file_size is not None: - if not 1 <= self.max_file_size <= 100_000: - raise UserError("max_file_size must be between 1 and 100000") + if self.max_file_size is not None and not 1 <= self.max_file_size <= 100_000: + raise UserError("max_file_size must be between 1 and 100000") if self.enforce_docker_system_prune and self.suppress_docker_system_prune: raise UserError('The arguments "--prune" and "--no-prune" are mutually exclusive') diff --git a/src/dsp_tools/commands/xmlupload/ark2iri.py b/src/dsp_tools/commands/xmlupload/ark2iri.py index b71b7cacd..a073598c1 100644 --- a/src/dsp_tools/commands/xmlupload/ark2iri.py +++ b/src/dsp_tools/commands/xmlupload/ark2iri.py @@ -58,4 +58,4 @@ def convert_ark_v0_to_resource_iri(ark: str) -> str: dsp_uuid = dsp_uuid[:-2] # use the new UUID to create the resource IRI - return "http://rdfh.ch/" + project_id + "/" + dsp_uuid + return f"http://rdfh.ch/{project_id}/{dsp_uuid}" diff --git a/src/dsp_tools/commands/xmlupload/project_client.py b/src/dsp_tools/commands/xmlupload/project_client.py index 55d82c732..759f0d0b4 100644 --- a/src/dsp_tools/commands/xmlupload/project_client.py +++ b/src/dsp_tools/commands/xmlupload/project_client.py @@ -90,10 +90,7 @@ def _get_ontologies_from_server(con: Connection, project_iri: str) -> list[str]: iri = quote_plus(project_iri) url = f"/v2/ontologies/metadata/{iri}" res: dict[str, Any] = try_network_action(con.get, route=url) - if "@graph" in res: - body = res["@graph"] - else: - body = res + body = res.get("@graph", res) match body: case list(): return [o["@id"] for o in body] diff --git a/src/dsp_tools/commands/xmlupload/read_validate_xml_file.py b/src/dsp_tools/commands/xmlupload/read_validate_xml_file.py index 1aa8cb0ac..72e31661f 100644 --- a/src/dsp_tools/commands/xmlupload/read_validate_xml_file.py +++ b/src/dsp_tools/commands/xmlupload/read_validate_xml_file.py @@ -58,10 +58,9 @@ def _check_if_link_targets_exist(root: etree._Element) -> None: """ resptr_errors = _check_if_resptr_targets_exist(root) salsah_errors = _check_if_salsah_targets_exist(root) - errors = resptr_errors + salsah_errors - if errors: + if errors := resptr_errors + salsah_errors: sep = "\n - " - msg = f"It is not possible to upload the XML file, because it contains invalid links:{sep}" + sep.join(errors) + msg = f"It is not possible to upload the XML file, because it contains invalid links:{sep}{sep.join(errors)}" raise UserError(msg) diff --git a/src/dsp_tools/commands/xmlupload/resource_create_client.py b/src/dsp_tools/commands/xmlupload/resource_create_client.py index 6d9480896..4a47cbe45 100644 --- a/src/dsp_tools/commands/xmlupload/resource_create_client.py +++ b/src/dsp_tools/commands/xmlupload/resource_create_client.py @@ -75,12 +75,12 @@ def _make_resource( if resource_iri: res["@id"] = resource_iri if resource.permissions: - perm = self.permissions_lookup.get(resource.permissions) - if not perm: + if perm := self.permissions_lookup.get(resource.permissions): + res["knora-api:hasPermissions"] = str(perm) + else: raise BaseError( f"Could not find permissions for resource {resource.id} with permissions {resource.permissions}" ) - res["knora-api:hasPermissions"] = str(perm) if resource.creation_date: res["knora-api:creationDate"] = { "@type": "xsd:dateTimeStamp", @@ -92,9 +92,7 @@ def _make_resource( def _make_values(self, resource: XMLResource) -> dict[str, Any]: def prop_name(p: XMLProperty) -> str: - if p.valtype == "resptr": - return p.name + "Value" - return p.name + return f"{p.name}Value" if p.valtype == "resptr" else p.name def make_values(p: XMLProperty) -> list[dict[str, Any]]: return [self._make_value(v, p.valtype) for v in p.values] @@ -134,10 +132,10 @@ def _make_value(self, value: XMLValue, value_type: str) -> dict[str, Any]: if value.comment: res["knora-api:valueHasComment"] = value.comment if value.permissions: - perm = self.permissions_lookup.get(value.permissions) - if not perm: + if perm := self.permissions_lookup.get(value.permissions): + res["knora-api:hasPermissions"] = str(perm) + else: raise BaseError(f"Could not find permissions for value: {value.permissions}") - res["knora-api:hasPermissions"] = str(perm) return res @@ -203,8 +201,10 @@ def _make_color_value(value: XMLValue) -> dict[str, Any]: def _make_date_value(value: XMLValue) -> dict[str, Any]: string_value = _assert_is_string(value.value) date = parse_date_string(string_value) - res: dict[str, Any] = {"@type": "knora-api:DateValue"} - res["knora-api:dateValueHasStartYear"] = date.start.year + res: dict[str, Any] = { + "@type": "knora-api:DateValue", + "knora-api:dateValueHasStartYear": date.start.year, + } if month := date.start.month: res["knora-api:dateValueHasStartMonth"] = month if day := date.start.day: @@ -283,11 +283,10 @@ def _make_link_value(value: XMLValue, iri_resolver: IriResolver) -> dict[str, An s = _assert_is_string(value.value) if is_resource_iri(s): iri = s - else: - resolved_iri = iri_resolver.get(s) - if not resolved_iri: - raise BaseError(f"Could not resolve ID {s} to IRI.") + elif resolved_iri := iri_resolver.get(s): iri = resolved_iri + else: + raise BaseError(f"Could not resolve ID {s} to IRI.") return { "@type": "knora-api:LinkValue", "knora-api:linkValueHasTargetIri": { @@ -298,15 +297,15 @@ def _make_link_value(value: XMLValue, iri_resolver: IriResolver) -> dict[str, An def _make_list_value(value: XMLValue, iri_lookup: dict[str, str]) -> dict[str, Any]: s = _assert_is_string(value.value) - iri = iri_lookup.get(s) - if not iri: + if iri := iri_lookup.get(s): + return { + "@type": "knora-api:ListValue", + "knora-api:listValueAsListNode": { + "@id": iri, + }, + } + else: raise BaseError(f"Could not resolve list node ID {s} to IRI.") - return { - "@type": "knora-api:ListValue", - "knora-api:listValueAsListNode": { - "@id": iri, - }, - } def _make_text_value(value: XMLValue, iri_resolver: IriResolver) -> dict[str, Any]: diff --git a/src/dsp_tools/commands/xmlupload/stash/upload_stashed_xml_texts.py b/src/dsp_tools/commands/xmlupload/stash/upload_stashed_xml_texts.py index a3349cf68..6fd3f6382 100644 --- a/src/dsp_tools/commands/xmlupload/stash/upload_stashed_xml_texts.py +++ b/src/dsp_tools/commands/xmlupload/stash/upload_stashed_xml_texts.py @@ -168,11 +168,9 @@ def _get_value_iri( # get the IRI of the value that contains the UUID in its text text_and_iris = ((v["knora-api:textValueAsXml"], v["@id"]) for v in values_on_server) value_iri: str | None = next((iri for text, iri in text_and_iris if uuid in text), None) - if not value_iri: - # the value that contains the UUID in its text does not exist in DSP - # no action necessary: this resource will remain in nonapplied_xml_texts, - # which will be handled by the caller - return None + # in case that "value_iri" is None, the value that contains the UUID in its text does not exist in DSP + # no action necessary: this resource will remain in nonapplied_xml_texts, + # which will be handled by the caller return value_iri diff --git a/src/dsp_tools/models/exceptions.py b/src/dsp_tools/models/exceptions.py index 44d0d8c6b..d18e457ec 100644 --- a/src/dsp_tools/models/exceptions.py +++ b/src/dsp_tools/models/exceptions.py @@ -1,3 +1,4 @@ +import contextlib import json from typing import Optional @@ -48,14 +49,12 @@ def __init__( self.status_code = status_code if json_content_of_api_response: self.json_content_of_api_response = json_content_of_api_response - try: + with contextlib.suppress(json.JSONDecodeError): parsed_json = json.loads(json_content_of_api_response) if "knora-api:error" in parsed_json: knora_api_error = parsed_json["knora-api:error"] knora_api_error = regex.sub(r"^dsp\.errors\.[A-Za-z]+?: ?", "", knora_api_error) self.orig_err_msg_from_api = knora_api_error - except json.JSONDecodeError: - pass self.reason_from_api = reason_from_api self.api_route = api_route @@ -87,4 +86,4 @@ def __init__(self, msg: str): self._message = msg def __str__(self) -> str: - return "XML-ERROR: " + self._message + return f"XML-ERROR: {self._message}" diff --git a/src/dsp_tools/utils/connection_live.py b/src/dsp_tools/utils/connection_live.py index c44082999..e5f1b6883 100644 --- a/src/dsp_tools/utils/connection_live.py +++ b/src/dsp_tools/utils/connection_live.py @@ -21,7 +21,7 @@ def check_for_api_error(response: requests.Response) -> None: """ if response.status_code != 200: raise BaseError( - message="KNORA-ERROR: status code=" + str(response.status_code) + "\nMessage:" + response.text, + message=f"KNORA-ERROR: status code={response.status_code}\nMessage: {response.text}", status_code=response.status_code, json_content_of_api_response=response.text, reason_from_api=response.reason, @@ -164,7 +164,7 @@ def post( # and the response of the original API call will be lost timeout = 60 if not route.startswith("/"): - route = "/" + route + route = f"/{route}" url = self.server + route headers = {} if jsondata: @@ -209,7 +209,7 @@ def get( response from server """ if not route.startswith("/"): - route = "/" + route + route = f"/{route}" url = self.server + route if not headers: headers = {} @@ -256,7 +256,7 @@ def put( # in that case, the client's retry will fail, and the response of the original API call will be lost timeout = 60 if not route.startswith("/"): - route = "/" + route + route = f"/{route}" url = self.server + route headers = {} if jsondata: @@ -301,7 +301,7 @@ def delete( response from server """ if not route.startswith("/"): - route = "/" + route + route = f"/{route}" url = self.server + route headers = {} if self.token: