Skip to content

Commit

Permalink
chore: replace bandit action by ruff's bandit ruleset (DEV-3026) (#722)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum committed Jan 9, 2024
1 parent 9973f7a commit 76203cf
Show file tree
Hide file tree
Showing 13 changed files with 74 additions and 94 deletions.
26 changes: 0 additions & 26 deletions .github/workflows/bandit.yml

This file was deleted.

6 changes: 6 additions & 0 deletions pyproject.toml
Expand Up @@ -161,6 +161,7 @@ select = [
"A", # flake8-builtins
"BLE", # flake8-blind-except
"ARG", # flake8-unused-arguments
"S", # flake8-bandit plugin which checks for security issues
"YTT", # flake8-2020 plugin, which checks for misuse of `sys.version` or `sys.version_info`
"ASYNC", # flake8-async plugin, which checks for bad async / asyncio practices
"ISC", # flake8-implicit-str-concat plugin, which checks for problematic string concatenation
Expand All @@ -180,6 +181,10 @@ select = [
]
ignore = [
"ISC001", # flake8-implicit-str-concat: single-line-implicit-string-concatenation # incompatible with the formatter
"S105", # flake8-bandit: hardcoded password
"S106", # flake8-bandit: hardcoded password
"S603", # flake8-bandit: subprocess-without-shell-equals-true
"S320", # flake8-bandit: xml parsing vulnerable to XML attacks
]


Expand All @@ -196,6 +201,7 @@ ignore = [
"D103", # pydocstyle: undocumented-public-function
"D102", # pydocstyle: undocumented-public-method
"D101", # pydocstyle: undocumented-public-class
"S101", # flake8-bandit: use of assert
]


Expand Down
4 changes: 2 additions & 2 deletions src/dsp_tools/cli/entry_point.py
Expand Up @@ -119,8 +119,8 @@ def _parse_arguments(


def _get_version() -> str:
result = subprocess.run("pip freeze | grep dsp-tools", check=False, shell=True, capture_output=True)
_detail_version = result.stdout.decode("utf-8")
result = subprocess.run("pip freeze".split(), check=False, capture_output=True).stdout.decode("utf-8")
_detail_version = next(x for x in result.split("\n") if "dsp-tools" in x)
# _detail_version has one of the following formats:
# - 'dsp-tools==5.0.3\n'
# - 'dsp-tools==5.6.0.post9\n'
Expand Down
56 changes: 27 additions & 29 deletions src/dsp_tools/commands/excel2xml/excel2xml_cli.py
Expand Up @@ -127,37 +127,35 @@ def _convert_rows_to_xml(

for index, row in dataframe.iterrows():
row_number = int(str(index)) + 2
# either the row is a resource-row or a property-row, but not both
if check_notna(row["id"]) == check_notna(row["prop name"]):
raise BaseError(
f"Exactly 1 of the 2 columns 'id' and 'prop name' must be filled. "
f"Excel row {row_number} has too many/too less entries:\n"
f"id: '{row['id']}'\n"
f"prop name: '{row['prop name']}'"
)

# this is a resource-row
elif check_notna(row["id"]):
# the previous resource is finished, a new resource begins: append the previous to the resulting list
# in all cases (except for the very first iteration), a previous resource exists
if resource is not None:
resources.append(resource)
resource = _convert_resource_row_to_xml(row_number=row_number, row=row)

# this is a property-row
else:
assert check_notna(row["prop name"])
if resource is None:
is_resource = check_notna(row["id"])
is_property = check_notna(row["prop name"])
match is_resource, is_property:
case (True, True) | (False, False): # either the row is a resource-row or a property-row, but not both
raise BaseError(
"The first row of your Excel/CSV is invalid. The first row must define a resource, not a property."
f"Exactly 1 of the 2 columns 'id' and 'prop name' must be filled. "
f"Excel row {row_number} has too many/too less entries:\n"
f"id: '{row['id']}'\n"
f"prop name: '{row['prop name']}'"
)
prop = _convert_property_row_to_xml(
row_number=row_number,
row=row,
max_num_of_props=max_num_of_props,
resource_id=resource.attrib["id"],
)
resource.append(prop)
case True, False: # this is a resource-row
# the previous resource is finished, a new resource begins: append the previous to the resulting list
# in all cases (except for the very first iteration), a previous resource exists
if resource is not None:
resources.append(resource)
resource = _convert_resource_row_to_xml(row_number=row_number, row=row)
case False, True: # this is a property-row
if resource is None:
raise BaseError(
"The first row of your Excel/CSV is invalid. "
"The first row must define a resource, not a property."
)
prop = _convert_property_row_to_xml(
row_number=row_number,
row=row,
max_num_of_props=max_num_of_props,
resource_id=resource.attrib["id"],
)
resource.append(prop)

# append the resource of the very last iteration of the for loop
if resource is not None:
Expand Down
12 changes: 10 additions & 2 deletions src/dsp_tools/commands/excel2xml/excel2xml_lib.py
Expand Up @@ -876,8 +876,16 @@ def make_geometry_prop(
for val in values:
try:
value_as_dict = json.loads(str(val.value))
assert value_as_dict["type"] in ["rectangle", "circle", "polygon"]
assert isinstance(value_as_dict["points"], list)
if value_as_dict["type"] not in ["rectangle", "circle", "polygon"]:
raise BaseError(
f"Failed validation in resource '{calling_resource}', property '{name}': "
f"The 'type' of the JSON geometry object must be 'rectangle', 'circle', or 'polygon'."
)
if not isinstance(value_as_dict["points"], list):
raise BaseError(
f"Failed validation in resource '{calling_resource}', property '{name}': "
f"The 'points'of the JSON geometry object must be a list of points."
)
except (json.JSONDecodeError, TypeError, IndexError, KeyError, AssertionError):
raise BaseError(
f"Failed validation in resource '{calling_resource}', property '{name}': "
Expand Down
2 changes: 1 addition & 1 deletion src/dsp_tools/commands/fast_xmlupload/upload_files.py
Expand Up @@ -66,7 +66,7 @@ def _get_paths_from_pkl_files(pkl_files: list[Path]) -> list[Path]:
"""
orig_paths_2_processed_paths: list[tuple[Path, Optional[Path]]] = []
for pkl_file in pkl_files:
orig_paths_2_processed_paths.extend(pickle.loads(pkl_file.read_bytes()))
orig_paths_2_processed_paths.extend(pickle.loads(pkl_file.read_bytes())) # noqa: S301 (deserialize untrusted data)

processed_paths: list[Path] = []
for orig_path, processed_path in orig_paths_2_processed_paths:
Expand Down
2 changes: 1 addition & 1 deletion src/dsp_tools/commands/fast_xmlupload/upload_xml.py
Expand Up @@ -29,7 +29,7 @@ def _get_paths_from_pkl_files(pkl_files: list[Path]) -> dict[str, str]:
"""
orig_path_2_processed_path: list[tuple[Path, Optional[Path]]] = []
for pkl_file in pkl_files:
orig_path_2_processed_path.extend(pickle.loads(pkl_file.read_bytes()))
orig_path_2_processed_path.extend(pickle.loads(pkl_file.read_bytes())) # noqa: S301 (deserialize untrusted data)

orig_path_2_uuid_filename: dict[str, str] = {}
for orig_path, processed_path in orig_path_2_processed_path:
Expand Down
10 changes: 3 additions & 7 deletions src/dsp_tools/commands/rosetta.py
Expand Up @@ -21,7 +21,7 @@ def _update_possibly_existing_repo(rosetta_folder: Path) -> bool:
is_rosetta_up_to_date = True
if rosetta_folder.is_dir():
print(f"Execute 'git pull' in {rosetta_folder}...")
completed_process = subprocess.run("git pull", shell=True, cwd=rosetta_folder, check=False)
completed_process = subprocess.run("git pull".split(), cwd=rosetta_folder, check=False)
if not completed_process or completed_process.returncode != 0:
print(f"'git pull' failed. Remove '{rosetta_folder}'...")
shutil.rmtree(rosetta_folder, ignore_errors=True)
Expand All @@ -47,12 +47,8 @@ def _clone_repo(
UserError: If rosetta cannot be cloned
"""
print(f"Clone into {rosetta_folder}...")
completed_process = subprocess.run(
"git clone https://github.com/dasch-swiss/082E-rosetta-scripts.git",
shell=True,
cwd=enclosing_folder,
check=False,
)
cmd = "git clone https://github.com/dasch-swiss/082E-rosetta-scripts.git".split()
completed_process = subprocess.run(cmd, cwd=enclosing_folder, check=False)
if not completed_process or completed_process.returncode != 0:
raise UserError("There was a problem while cloning the rosetta test project")

Expand Down
17 changes: 6 additions & 11 deletions src/dsp_tools/commands/start_stack.py
Expand Up @@ -151,12 +151,8 @@ def _start_up_fuseki(self) -> None:
Raises:
UserError: if the database cannot be started
"""
completed_process = subprocess.run(
"docker compose up db -d",
shell=True,
cwd=self.__docker_path_of_user,
check=False,
)
cmd = "docker compose up db -d".split()
completed_process = subprocess.run(cmd, cwd=self.__docker_path_of_user, check=False)
if not completed_process or completed_process.returncode != 0:
msg = "Cannot start the API: Error while executing 'docker compose up db -d'"
logger.error(f"{msg}. completed_process = '{vars(completed_process)}'")
Expand Down Expand Up @@ -256,12 +252,11 @@ def _start_remaining_docker_containers(self) -> None:
"""
if self.__stack_configuration.latest_dev_version:
subprocess.run(
"docker pull daschswiss/knora-api:latest",
shell=True,
"docker pull daschswiss/knora-api:latest".split(),
cwd=self.__docker_path_of_user,
check=True,
)
subprocess.run("docker compose up -d", shell=True, cwd=self.__docker_path_of_user, check=True)
subprocess.run("docker compose up -d".split(), cwd=self.__docker_path_of_user, check=True)
print("DSP-API is now running on http://0.0.0.0:3333/ and DSP-APP on http://0.0.0.0:4200/")

def _execute_docker_system_prune(self) -> None:
Expand All @@ -281,7 +276,7 @@ def _execute_docker_system_prune(self) -> None:
"If you are unsure what that means, just type y and press Enter. [y/n]"
)
if prune_docker == "y":
subprocess.run("docker system prune -f", shell=True, cwd=self.__docker_path_of_user, check=False)
subprocess.run("docker system prune -f".split(), cwd=self.__docker_path_of_user, check=False)

def _start_docker_containers(self) -> None:
"""
Expand Down Expand Up @@ -320,5 +315,5 @@ def stop_stack(self) -> bool:
Returns:
True if everything went well, False otherwise
"""
subprocess.run("docker compose down --volumes", shell=True, cwd=self.__docker_path_of_user, check=True)
subprocess.run("docker compose down --volumes".split(), cwd=self.__docker_path_of_user, check=True)
return True
Expand Up @@ -93,6 +93,9 @@ def stash_circular_references(
Returns:
stash: an object that contains the stashed references
Raises:
ValueError: If a link property of one of the resources is not "text" or "resptr"
"""
stashed_standoff_values: list[StandoffStashItem] = []
stashed_link_values: list[LinkValueStashItem] = []
Expand All @@ -101,13 +104,14 @@ def stash_circular_references(
if res.res_id not in stash_lookup:
continue
for link_prop in res.get_props_with_links():
assert link_prop.valtype in ["text", "resptr"]
if link_prop.valtype == "text":
standoff_stash_item = _stash_standoff(res.res_id, res.restype, link_prop, stash_lookup)
stashed_standoff_values.extend(standoff_stash_item)
elif link_prop.valtype == "resptr":
link_stash_item = _stash_resptr(res.res_id, res.restype, link_prop, stash_lookup, permission_lookup)
stashed_link_values.extend(link_stash_item)
else:
raise ValueError(f"Unknown value type: '{link_prop.valtype}' (should be 'text' or 'resptr')")

if len(link_prop.values) == 0:
# if all values of a link property have been stashed, the property needs to be removed
Expand Down
2 changes: 1 addition & 1 deletion src/dsp_tools/commands/xmlupload/upload_config.py
Expand Up @@ -27,7 +27,7 @@ def _transform_server_url_to_foldername(server: str) -> str:
r"^api\.": "",
r":\d{2,5}/?$": "",
r"/$": "",
r"0.0.0.0": "localhost",
r"0.0.0.0": "localhost", # noqa: S104 (hardcoded-bind-all-interfaces)
}
for pattern, repl in server_substitutions.items():
server = regex.sub(pattern, repl, server)
Expand Down
23 changes: 11 additions & 12 deletions test/distribution/test_cli.py
Expand Up @@ -37,8 +37,8 @@ def tearDownClass(cls) -> None:

def test_validate_lists_section_with_schema(self) -> None:
"""Test if the resource file 'src/dsp_tools/resources/schema/lists-only.json' can be accessed."""
cmd = f"dsp-tools create --lists-only --validate-only {self.test_project_systematic_file.absolute()}"
self._make_cli_call(cmd)
args = ["create", "--lists-only", "--validate-only", str(self.test_project_systematic_file.absolute())]
self._make_cli_call(args)

def test_excel_to_json_resources(self) -> None:
"""
Expand All @@ -47,7 +47,7 @@ def test_excel_to_json_resources(self) -> None:
"""
excel_file = Path("testdata/excel2json/excel2json_files/test-name (test_label)/resources.xlsx")
out_file = self.testdata_tmp / "_out_resources.json"
self._make_cli_call(f"dsp-tools excel2resources '{excel_file.absolute()}' {out_file.absolute()}")
self._make_cli_call(["excel2resources", str(excel_file.absolute()), str(out_file.absolute())])
out_file.unlink()

def test_excel_to_json_properties(self) -> None:
Expand All @@ -57,39 +57,38 @@ def test_excel_to_json_properties(self) -> None:
"""
excel_file = Path("testdata/excel2json/excel2json_files/test-name (test_label)/properties.xlsx")
out_file = self.testdata_tmp / "_out_properties.json"
self._make_cli_call(f"dsp-tools excel2properties '{excel_file.absolute()}' {out_file.absolute()}")
self._make_cli_call(["excel2properties", str(excel_file.absolute()), str(out_file.absolute())])
out_file.unlink()

def test_validate_project(self) -> None:
"""Test if the resource file 'src/dsp_tools/resources/schema/project.json' can be accessed."""
self._make_cli_call(cli_call=f"dsp-tools create --validate-only {self.test_project_minimal_file.absolute()}")
self._make_cli_call(["create", "--validate-only", str(self.test_project_minimal_file.absolute())])

def test_xml_upload(self) -> None:
"""Test if the resource file 'src/dsp_tools/resources/schema/data.xsd' can be accessed."""
self._make_cli_call(f"dsp-tools xmlupload -v --validate-only {self.test_data_minimal_file.absolute()}")
self._make_cli_call(["xmlupload", "-v", "--validate-only", str(self.test_data_minimal_file.absolute())])

def _make_cli_call(self, cli_call: str) -> None:
def _make_cli_call(self, args: list[str]) -> None:
"""
Execute a CLI call, capture its stdout and stderr,
Execute a CLI call to dsp-tools, capture its stdout and stderr,
and raise an AssertionError with stdout and stderr if it fails.
Args:
cli_call: a command that can be executed by the system shell
args: list of arguments for the dsp-tools CLI call
Raises:
AssertionError: detailed diagnostic message with stdout and stderr if the call fails
"""
try:
subprocess.run(
cli_call,
["dsp-tools", *args], # noqa: S607 (Starting a process with a partial executable path)
check=True,
shell=True,
capture_output=True,
cwd=self.cwd,
)
except subprocess.CalledProcessError as e:
msg = (
f"Failed CLI call:\n'{cli_call}'\n\n"
f"Failed CLI call:\n'dsp-tools {' '.join(args)}'\n\n"
f"Stdout:\n{e.output.decode('utf-8')}\n\n"
f"Stderr:\n{e.stderr.decode('utf-8')}"
)
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/test_00A1_import_scripts.py
Expand Up @@ -26,7 +26,7 @@ def generated_xml_file() -> Iterator[Path]:
def test_script(generated_xml_file: Path) -> None:
"""Execute the import script in its directory"""
# pull the latest state of the git submodule
subprocess.run("git submodule update --init --recursive", check=True, shell=True)
subprocess.run("git submodule update --init --recursive".split(), check=True)
from dsp_tools.import_scripts import import_script

# execute the import script in its directory
Expand Down

0 comments on commit 76203cf

Please sign in to comment.