diff --git a/.github/workflows/bandit.yml b/.github/workflows/bandit.yml deleted file mode 100644 index 9201ab81f..000000000 --- a/.github/workflows/bandit.yml +++ /dev/null @@ -1,26 +0,0 @@ -# Bandit is a security linter designed to find common security issues in Python code. - -name: Bandit -on: - schedule: - - cron: '15 0 * * 1' - -jobs: - bandit: - permissions: - contents: read # for actions/checkout to fetch code - security-events: write # for github/codeql-action/upload-sarif to upload SARIF results - actions: read # only required for a private repository by - # github/codeql-action/upload-sarif to get the Action run status - - - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - name: Bandit Scan - uses: shundor/python-bandit-scan@main - with: # optional arguments - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # needed to get PR information - level: high - confidence: medium - exit_zero: true # exit with 0, even with results found diff --git a/pyproject.toml b/pyproject.toml index ecfbf305d..02d74973e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -161,6 +161,7 @@ select = [ "A", # flake8-builtins "BLE", # flake8-blind-except "ARG", # flake8-unused-arguments + "S", # flake8-bandit plugin which checks for security issues "YTT", # flake8-2020 plugin, which checks for misuse of `sys.version` or `sys.version_info` "ASYNC", # flake8-async plugin, which checks for bad async / asyncio practices "ISC", # flake8-implicit-str-concat plugin, which checks for problematic string concatenation @@ -180,6 +181,10 @@ select = [ ] ignore = [ "ISC001", # flake8-implicit-str-concat: single-line-implicit-string-concatenation # incompatible with the formatter + "S105", # flake8-bandit: hardcoded password + "S106", # flake8-bandit: hardcoded password + "S603", # flake8-bandit: subprocess-without-shell-equals-true + "S320", # flake8-bandit: xml parsing vulnerable to XML attacks ] @@ -196,6 +201,7 @@ ignore = [ "D103", # pydocstyle: undocumented-public-function "D102", # pydocstyle: undocumented-public-method "D101", # pydocstyle: undocumented-public-class + "S101", # flake8-bandit: use of assert ] diff --git a/src/dsp_tools/cli/entry_point.py b/src/dsp_tools/cli/entry_point.py index 46af64544..a91d712b5 100644 --- a/src/dsp_tools/cli/entry_point.py +++ b/src/dsp_tools/cli/entry_point.py @@ -119,8 +119,8 @@ def _parse_arguments( def _get_version() -> str: - result = subprocess.run("pip freeze | grep dsp-tools", check=False, shell=True, capture_output=True) - _detail_version = result.stdout.decode("utf-8") + result = subprocess.run("pip freeze".split(), check=False, capture_output=True).stdout.decode("utf-8") + _detail_version = next(x for x in result.split("\n") if "dsp-tools" in x) # _detail_version has one of the following formats: # - 'dsp-tools==5.0.3\n' # - 'dsp-tools==5.6.0.post9\n' diff --git a/src/dsp_tools/commands/excel2xml/excel2xml_cli.py b/src/dsp_tools/commands/excel2xml/excel2xml_cli.py index d8f5eb26b..5cdd503ac 100644 --- a/src/dsp_tools/commands/excel2xml/excel2xml_cli.py +++ b/src/dsp_tools/commands/excel2xml/excel2xml_cli.py @@ -127,37 +127,35 @@ def _convert_rows_to_xml( for index, row in dataframe.iterrows(): row_number = int(str(index)) + 2 - # either the row is a resource-row or a property-row, but not both - if check_notna(row["id"]) == check_notna(row["prop name"]): - raise BaseError( - f"Exactly 1 of the 2 columns 'id' and 'prop name' must be filled. " - f"Excel row {row_number} has too many/too less entries:\n" - f"id: '{row['id']}'\n" - f"prop name: '{row['prop name']}'" - ) - - # this is a resource-row - elif check_notna(row["id"]): - # the previous resource is finished, a new resource begins: append the previous to the resulting list - # in all cases (except for the very first iteration), a previous resource exists - if resource is not None: - resources.append(resource) - resource = _convert_resource_row_to_xml(row_number=row_number, row=row) - - # this is a property-row - else: - assert check_notna(row["prop name"]) - if resource is None: + is_resource = check_notna(row["id"]) + is_property = check_notna(row["prop name"]) + match is_resource, is_property: + case (True, True) | (False, False): # either the row is a resource-row or a property-row, but not both raise BaseError( - "The first row of your Excel/CSV is invalid. The first row must define a resource, not a property." + f"Exactly 1 of the 2 columns 'id' and 'prop name' must be filled. " + f"Excel row {row_number} has too many/too less entries:\n" + f"id: '{row['id']}'\n" + f"prop name: '{row['prop name']}'" ) - prop = _convert_property_row_to_xml( - row_number=row_number, - row=row, - max_num_of_props=max_num_of_props, - resource_id=resource.attrib["id"], - ) - resource.append(prop) + case True, False: # this is a resource-row + # the previous resource is finished, a new resource begins: append the previous to the resulting list + # in all cases (except for the very first iteration), a previous resource exists + if resource is not None: + resources.append(resource) + resource = _convert_resource_row_to_xml(row_number=row_number, row=row) + case False, True: # this is a property-row + if resource is None: + raise BaseError( + "The first row of your Excel/CSV is invalid. " + "The first row must define a resource, not a property." + ) + prop = _convert_property_row_to_xml( + row_number=row_number, + row=row, + max_num_of_props=max_num_of_props, + resource_id=resource.attrib["id"], + ) + resource.append(prop) # append the resource of the very last iteration of the for loop if resource is not None: diff --git a/src/dsp_tools/commands/excel2xml/excel2xml_lib.py b/src/dsp_tools/commands/excel2xml/excel2xml_lib.py index 8a8eb7ed5..d623f10d2 100644 --- a/src/dsp_tools/commands/excel2xml/excel2xml_lib.py +++ b/src/dsp_tools/commands/excel2xml/excel2xml_lib.py @@ -876,8 +876,16 @@ def make_geometry_prop( for val in values: try: value_as_dict = json.loads(str(val.value)) - assert value_as_dict["type"] in ["rectangle", "circle", "polygon"] - assert isinstance(value_as_dict["points"], list) + if value_as_dict["type"] not in ["rectangle", "circle", "polygon"]: + raise BaseError( + f"Failed validation in resource '{calling_resource}', property '{name}': " + f"The 'type' of the JSON geometry object must be 'rectangle', 'circle', or 'polygon'." + ) + if not isinstance(value_as_dict["points"], list): + raise BaseError( + f"Failed validation in resource '{calling_resource}', property '{name}': " + f"The 'points'of the JSON geometry object must be a list of points." + ) except (json.JSONDecodeError, TypeError, IndexError, KeyError, AssertionError): raise BaseError( f"Failed validation in resource '{calling_resource}', property '{name}': " diff --git a/src/dsp_tools/commands/fast_xmlupload/upload_files.py b/src/dsp_tools/commands/fast_xmlupload/upload_files.py index 24b257e26..33d364717 100644 --- a/src/dsp_tools/commands/fast_xmlupload/upload_files.py +++ b/src/dsp_tools/commands/fast_xmlupload/upload_files.py @@ -66,7 +66,7 @@ def _get_paths_from_pkl_files(pkl_files: list[Path]) -> list[Path]: """ orig_paths_2_processed_paths: list[tuple[Path, Optional[Path]]] = [] for pkl_file in pkl_files: - orig_paths_2_processed_paths.extend(pickle.loads(pkl_file.read_bytes())) + orig_paths_2_processed_paths.extend(pickle.loads(pkl_file.read_bytes())) # noqa: S301 (deserialize untrusted data) processed_paths: list[Path] = [] for orig_path, processed_path in orig_paths_2_processed_paths: diff --git a/src/dsp_tools/commands/fast_xmlupload/upload_xml.py b/src/dsp_tools/commands/fast_xmlupload/upload_xml.py index 6d7fd2f5c..5a9c62945 100644 --- a/src/dsp_tools/commands/fast_xmlupload/upload_xml.py +++ b/src/dsp_tools/commands/fast_xmlupload/upload_xml.py @@ -29,7 +29,7 @@ def _get_paths_from_pkl_files(pkl_files: list[Path]) -> dict[str, str]: """ orig_path_2_processed_path: list[tuple[Path, Optional[Path]]] = [] for pkl_file in pkl_files: - orig_path_2_processed_path.extend(pickle.loads(pkl_file.read_bytes())) + orig_path_2_processed_path.extend(pickle.loads(pkl_file.read_bytes())) # noqa: S301 (deserialize untrusted data) orig_path_2_uuid_filename: dict[str, str] = {} for orig_path, processed_path in orig_path_2_processed_path: diff --git a/src/dsp_tools/commands/rosetta.py b/src/dsp_tools/commands/rosetta.py index 2e3846245..71a72e96a 100644 --- a/src/dsp_tools/commands/rosetta.py +++ b/src/dsp_tools/commands/rosetta.py @@ -21,7 +21,7 @@ def _update_possibly_existing_repo(rosetta_folder: Path) -> bool: is_rosetta_up_to_date = True if rosetta_folder.is_dir(): print(f"Execute 'git pull' in {rosetta_folder}...") - completed_process = subprocess.run("git pull", shell=True, cwd=rosetta_folder, check=False) + completed_process = subprocess.run("git pull".split(), cwd=rosetta_folder, check=False) if not completed_process or completed_process.returncode != 0: print(f"'git pull' failed. Remove '{rosetta_folder}'...") shutil.rmtree(rosetta_folder, ignore_errors=True) @@ -47,12 +47,8 @@ def _clone_repo( UserError: If rosetta cannot be cloned """ print(f"Clone into {rosetta_folder}...") - completed_process = subprocess.run( - "git clone https://github.com/dasch-swiss/082E-rosetta-scripts.git", - shell=True, - cwd=enclosing_folder, - check=False, - ) + cmd = "git clone https://github.com/dasch-swiss/082E-rosetta-scripts.git".split() + completed_process = subprocess.run(cmd, cwd=enclosing_folder, check=False) if not completed_process or completed_process.returncode != 0: raise UserError("There was a problem while cloning the rosetta test project") diff --git a/src/dsp_tools/commands/start_stack.py b/src/dsp_tools/commands/start_stack.py index 7c241ef0a..b2a183e9d 100644 --- a/src/dsp_tools/commands/start_stack.py +++ b/src/dsp_tools/commands/start_stack.py @@ -151,12 +151,8 @@ def _start_up_fuseki(self) -> None: Raises: UserError: if the database cannot be started """ - completed_process = subprocess.run( - "docker compose up db -d", - shell=True, - cwd=self.__docker_path_of_user, - check=False, - ) + cmd = "docker compose up db -d".split() + completed_process = subprocess.run(cmd, cwd=self.__docker_path_of_user, check=False) if not completed_process or completed_process.returncode != 0: msg = "Cannot start the API: Error while executing 'docker compose up db -d'" logger.error(f"{msg}. completed_process = '{vars(completed_process)}'") @@ -256,12 +252,11 @@ def _start_remaining_docker_containers(self) -> None: """ if self.__stack_configuration.latest_dev_version: subprocess.run( - "docker pull daschswiss/knora-api:latest", - shell=True, + "docker pull daschswiss/knora-api:latest".split(), cwd=self.__docker_path_of_user, check=True, ) - subprocess.run("docker compose up -d", shell=True, cwd=self.__docker_path_of_user, check=True) + subprocess.run("docker compose up -d".split(), cwd=self.__docker_path_of_user, check=True) print("DSP-API is now running on http://0.0.0.0:3333/ and DSP-APP on http://0.0.0.0:4200/") def _execute_docker_system_prune(self) -> None: @@ -281,7 +276,7 @@ def _execute_docker_system_prune(self) -> None: "If you are unsure what that means, just type y and press Enter. [y/n]" ) if prune_docker == "y": - subprocess.run("docker system prune -f", shell=True, cwd=self.__docker_path_of_user, check=False) + subprocess.run("docker system prune -f".split(), cwd=self.__docker_path_of_user, check=False) def _start_docker_containers(self) -> None: """ @@ -320,5 +315,5 @@ def stop_stack(self) -> bool: Returns: True if everything went well, False otherwise """ - subprocess.run("docker compose down --volumes", shell=True, cwd=self.__docker_path_of_user, check=True) + subprocess.run("docker compose down --volumes".split(), cwd=self.__docker_path_of_user, check=True) return True diff --git a/src/dsp_tools/commands/xmlupload/stash/stash_circular_references.py b/src/dsp_tools/commands/xmlupload/stash/stash_circular_references.py index 0a6073920..5657b2fa0 100644 --- a/src/dsp_tools/commands/xmlupload/stash/stash_circular_references.py +++ b/src/dsp_tools/commands/xmlupload/stash/stash_circular_references.py @@ -93,6 +93,9 @@ def stash_circular_references( Returns: stash: an object that contains the stashed references + + Raises: + ValueError: If a link property of one of the resources is not "text" or "resptr" """ stashed_standoff_values: list[StandoffStashItem] = [] stashed_link_values: list[LinkValueStashItem] = [] @@ -101,13 +104,14 @@ def stash_circular_references( if res.res_id not in stash_lookup: continue for link_prop in res.get_props_with_links(): - assert link_prop.valtype in ["text", "resptr"] if link_prop.valtype == "text": standoff_stash_item = _stash_standoff(res.res_id, res.restype, link_prop, stash_lookup) stashed_standoff_values.extend(standoff_stash_item) elif link_prop.valtype == "resptr": link_stash_item = _stash_resptr(res.res_id, res.restype, link_prop, stash_lookup, permission_lookup) stashed_link_values.extend(link_stash_item) + else: + raise ValueError(f"Unknown value type: '{link_prop.valtype}' (should be 'text' or 'resptr')") if len(link_prop.values) == 0: # if all values of a link property have been stashed, the property needs to be removed diff --git a/src/dsp_tools/commands/xmlupload/upload_config.py b/src/dsp_tools/commands/xmlupload/upload_config.py index 924134f18..b23ae101a 100644 --- a/src/dsp_tools/commands/xmlupload/upload_config.py +++ b/src/dsp_tools/commands/xmlupload/upload_config.py @@ -27,7 +27,7 @@ def _transform_server_url_to_foldername(server: str) -> str: r"^api\.": "", r":\d{2,5}/?$": "", r"/$": "", - r"0.0.0.0": "localhost", + r"0.0.0.0": "localhost", # noqa: S104 (hardcoded-bind-all-interfaces) } for pattern, repl in server_substitutions.items(): server = regex.sub(pattern, repl, server) diff --git a/test/distribution/test_cli.py b/test/distribution/test_cli.py index 94885191a..1275a641b 100644 --- a/test/distribution/test_cli.py +++ b/test/distribution/test_cli.py @@ -37,8 +37,8 @@ def tearDownClass(cls) -> None: def test_validate_lists_section_with_schema(self) -> None: """Test if the resource file 'src/dsp_tools/resources/schema/lists-only.json' can be accessed.""" - cmd = f"dsp-tools create --lists-only --validate-only {self.test_project_systematic_file.absolute()}" - self._make_cli_call(cmd) + args = ["create", "--lists-only", "--validate-only", str(self.test_project_systematic_file.absolute())] + self._make_cli_call(args) def test_excel_to_json_resources(self) -> None: """ @@ -47,7 +47,7 @@ def test_excel_to_json_resources(self) -> None: """ excel_file = Path("testdata/excel2json/excel2json_files/test-name (test_label)/resources.xlsx") out_file = self.testdata_tmp / "_out_resources.json" - self._make_cli_call(f"dsp-tools excel2resources '{excel_file.absolute()}' {out_file.absolute()}") + self._make_cli_call(["excel2resources", str(excel_file.absolute()), str(out_file.absolute())]) out_file.unlink() def test_excel_to_json_properties(self) -> None: @@ -57,39 +57,38 @@ def test_excel_to_json_properties(self) -> None: """ excel_file = Path("testdata/excel2json/excel2json_files/test-name (test_label)/properties.xlsx") out_file = self.testdata_tmp / "_out_properties.json" - self._make_cli_call(f"dsp-tools excel2properties '{excel_file.absolute()}' {out_file.absolute()}") + self._make_cli_call(["excel2properties", str(excel_file.absolute()), str(out_file.absolute())]) out_file.unlink() def test_validate_project(self) -> None: """Test if the resource file 'src/dsp_tools/resources/schema/project.json' can be accessed.""" - self._make_cli_call(cli_call=f"dsp-tools create --validate-only {self.test_project_minimal_file.absolute()}") + self._make_cli_call(["create", "--validate-only", str(self.test_project_minimal_file.absolute())]) def test_xml_upload(self) -> None: """Test if the resource file 'src/dsp_tools/resources/schema/data.xsd' can be accessed.""" - self._make_cli_call(f"dsp-tools xmlupload -v --validate-only {self.test_data_minimal_file.absolute()}") + self._make_cli_call(["xmlupload", "-v", "--validate-only", str(self.test_data_minimal_file.absolute())]) - def _make_cli_call(self, cli_call: str) -> None: + def _make_cli_call(self, args: list[str]) -> None: """ - Execute a CLI call, capture its stdout and stderr, + Execute a CLI call to dsp-tools, capture its stdout and stderr, and raise an AssertionError with stdout and stderr if it fails. Args: - cli_call: a command that can be executed by the system shell + args: list of arguments for the dsp-tools CLI call Raises: AssertionError: detailed diagnostic message with stdout and stderr if the call fails """ try: subprocess.run( - cli_call, + ["dsp-tools", *args], # noqa: S607 (Starting a process with a partial executable path) check=True, - shell=True, capture_output=True, cwd=self.cwd, ) except subprocess.CalledProcessError as e: msg = ( - f"Failed CLI call:\n'{cli_call}'\n\n" + f"Failed CLI call:\n'dsp-tools {' '.join(args)}'\n\n" f"Stdout:\n{e.output.decode('utf-8')}\n\n" f"Stderr:\n{e.stderr.decode('utf-8')}" ) diff --git a/test/e2e/test_00A1_import_scripts.py b/test/e2e/test_00A1_import_scripts.py index a2cb9b6ab..48789a11c 100644 --- a/test/e2e/test_00A1_import_scripts.py +++ b/test/e2e/test_00A1_import_scripts.py @@ -26,7 +26,7 @@ def generated_xml_file() -> Iterator[Path]: def test_script(generated_xml_file: Path) -> None: """Execute the import script in its directory""" # pull the latest state of the git submodule - subprocess.run("git submodule update --init --recursive", check=True, shell=True) + subprocess.run("git submodule update --init --recursive".split(), check=True) from dsp_tools.import_scripts import import_script # execute the import script in its directory