diff --git a/credsweeper/app.py b/credsweeper/app.py index ccc5e380a..6935c5e38 100644 --- a/credsweeper/app.py +++ b/credsweeper/app.py @@ -272,10 +272,10 @@ def file_scan(self, content_provider: ContentProvider) -> List[Candidate]: # reduce duplicated credentials found_values = set(line_data.value for candidate in candidates for line_data in candidate.line_data_list) - for i in extra_candidates: - for j in i.line_data_list: - if j.value not in found_values: - candidates.append(i) + for extra_candidate in extra_candidates: + for line_data in extra_candidate.line_data_list: + if line_data.value not in found_values: + candidates.append(extra_candidate) break # finally return result from 'file_scan' @@ -350,7 +350,7 @@ def data_scan(self, data_provider: DataContentProvider, depth: int, recursive_li except Exception as gzip_exc: logger.error(f"{data_provider.file_path}:{gzip_exc}") - elif data_provider.is_encoded(): + elif data_provider.represent_as_encoded(): decoded_data_provider = DataContentProvider(data=data_provider.decoded, file_path=data_provider.file_path, file_type=data_provider.file_type, @@ -365,8 +365,9 @@ def data_scan(self, data_provider: DataContentProvider, depth: int, recursive_li info=f"{data_provider.info}|STRUCT") candidates.extend(self.struct_scan(struct_data_provider, depth, recursive_limit_size)) - elif data_provider.is_xml(): + elif data_provider.represent_as_xml(): string_data_provider = StringContentProvider(lines=data_provider.lines, + line_numbers=data_provider.line_numbers, file_path=data_provider.file_path, file_type=".xml", info=f"{data_provider.info}|XML") diff --git a/credsweeper/file_handler/data_content_provider.py b/credsweeper/file_handler/data_content_provider.py index e59562dbb..ff7154edb 100644 --- a/credsweeper/file_handler/data_content_provider.py +++ b/credsweeper/file_handler/data_content_provider.py @@ -5,8 +5,6 @@ from typing import List, Optional import yaml -from lxml import etree - from credsweeper.common.constants import DEFAULT_ENCODING from credsweeper.file_handler.analysis_target import AnalysisTarget from credsweeper.file_handler.content_provider import ContentProvider @@ -35,6 +33,7 @@ def __init__( self.structure = None self.decoded: Optional[bytes] = None self.lines: List[str] = [] + self.line_numbers: List[int] = [] @property def data(self) -> bytes: @@ -87,23 +86,22 @@ def is_structure(self) -> bool: # # # None of above return False - def is_xml(self) -> bool: + def represent_as_xml(self) -> bool: """Tries to read data as xml - Return True if reading was successful + + Return: + True if reading was successful + """ try: xml_text = self.data.decode(encoding=DEFAULT_ENCODING).splitlines() - tree = etree.fromstringlist(xml_text) - for element in tree.iter(): - tag = Util.extract_element_data(element, "tag") - text = Util.extract_element_data(element, "text") - self.lines.append(f"{tag} : {text}") + self.lines, self.line_numbers = Util.get_xml_from_lines(xml_text) except Exception as exc: logger.debug("Cannot parse as XML:%s %s", exc, self.data) return False - return bool(self.lines) + return bool(self.lines and self.line_numbers) - def is_encoded(self) -> bool: + def represent_as_encoded(self) -> bool: """Encodes data from base64. Stores result in decoded Return: @@ -114,8 +112,8 @@ def is_encoded(self) -> bool: logger.debug("Weak data to decode from base64: %s", self.data) try: self.decoded = base64.b64decode( # - self.data.decode(encoding='ascii', errors='strict'). # - translate(str.maketrans('', '', string.whitespace)), # + self.data.decode(encoding="ascii", errors="strict"). # + translate(str.maketrans("", "", string.whitespace)), # validate=True) # except Exception as exc: logger.debug("Cannot decoded as base64:%s %s", exc, self.data) diff --git a/credsweeper/file_handler/string_content_provider.py b/credsweeper/file_handler/string_content_provider.py index 158390487..b5349b9c9 100644 --- a/credsweeper/file_handler/string_content_provider.py +++ b/credsweeper/file_handler/string_content_provider.py @@ -16,11 +16,15 @@ class StringContentProvider(ContentProvider): def __init__( self, # lines: List[str], # + line_numbers: Optional[List[int]] = None, # file_path: Optional[str] = None, # file_type: Optional[str] = None, # info: Optional[str] = None) -> None: super().__init__(file_path=file_path, file_type=file_type, info=info) self.lines = lines + # fill line numbers only when amounts are equal + self.line_numbers = line_numbers if line_numbers and len(self.lines) == len(line_numbers) \ + else (list(range(1, 1 + len(self.lines))) if self.lines else []) def get_analysis_target(self) -> List[AnalysisTarget]: """Return lines to scan. @@ -30,6 +34,6 @@ def get_analysis_target(self) -> List[AnalysisTarget]: """ return [ - AnalysisTarget(line, i + 1, self.lines, self.file_path, self.file_type, self.info) - for i, line in enumerate(self.lines) + AnalysisTarget(line, line_number, self.lines, self.file_path, self.file_type, self.info) + for line_number, line in zip(self.line_numbers, self.lines) ] diff --git a/credsweeper/utils/util.py b/credsweeper/utils/util.py index 9880aea3f..d99364a37 100644 --- a/credsweeper/utils/util.py +++ b/credsweeper/utils/util.py @@ -272,6 +272,27 @@ def read_data(path: str) -> Optional[bytes]: logger.error(f"Unexpected Error: Can not read '{path}'. Error message: '{exc}'") return None + @staticmethod + def get_xml_from_lines(xml_lines: List[str]) -> Tuple[Optional[List[str]], Optional[List[int]]]: + """Parse xml data from list of string and return List of str. + + Args: + xml_lines: list of lines of xml data + + Return: + List of formatted string(f"{root.tag} : {root.text}") + + """ + lines = [] + line_nums = [] + tree = etree.fromstringlist(xml_lines) + for element in tree.iter(): + tag = Util._extract_element_data(element, "tag") + text = Util._extract_element_data(element, "text") + lines.append(f"{tag} : {text}") + line_nums.append(element.sourceline) + return lines, line_nums + @staticmethod def get_xml_data(file_path: str) -> Tuple[Optional[List[str]], Optional[List[int]]]: """Read xml data and return List of str. @@ -285,24 +306,16 @@ def get_xml_data(file_path: str) -> Tuple[Optional[List[str]], Optional[List[int List of formatted string(f"{root.tag} : {root.text}") """ - lines = [] - line_nums = [] try: with open(file_path, "r") as f: xml_lines = f.readlines() - tree = etree.fromstringlist(xml_lines) - for element in tree.iter(): - tag = Util.extract_element_data(element, "tag") - text = Util.extract_element_data(element, "text") - lines.append(f"{tag} : {text}") - line_nums.append(element.sourceline) + return Util.get_xml_from_lines(xml_lines) except Exception as exc: logger.error(f"Cannot parse '{file_path}' to xml {exc}") return None, None - return lines, line_nums @staticmethod - def extract_element_data(element, attr) -> str: + def _extract_element_data(element, attr) -> str: """Extract xml element data to string. Try to extract the xml data and strip() the string. diff --git a/tests/file_handler/test_string_content_provider.py b/tests/file_handler/test_string_content_provider.py index 1570706f5..5fa885c98 100644 --- a/tests/file_handler/test_string_content_provider.py +++ b/tests/file_handler/test_string_content_provider.py @@ -1,22 +1,47 @@ -from typing import List - -import pytest - from credsweeper.file_handler.analysis_target import AnalysisTarget from credsweeper.file_handler.string_content_provider import StringContentProvider class TestStringContentProvider: - @pytest.mark.parametrize("lines", [["line one", "password='in_line_2'"]]) - def test_get_analysis_target_p(self, lines: List[str]) -> None: + def test_get_analysis_target_p(self) -> None: """Evaluate that lines data correctly extracted from file""" + lines = ["line one", "password='in_line_2'"] content_provider = StringContentProvider(lines) analysis_targets = content_provider.get_analysis_target() + assert len(analysis_targets) == len(lines) + expected_target = AnalysisTarget(lines[0], 1, lines, "", "", "") + assert analysis_targets[0] == expected_target + # check second target and line numeration + expected_target = AnalysisTarget(lines[1], 2, lines, "", "", "") + assert analysis_targets[1] == expected_target - assert len(analysis_targets) == len(lines) + # specific line numeration + content_provider = StringContentProvider(lines, [42, -1]) + analysis_targets = content_provider.get_analysis_target() + assert analysis_targets[0].line_num == 42 + assert analysis_targets[1].line_num == -1 - target = analysis_targets[0] - assert target == expected_target + def test_get_analysis_target_n(self) -> None: + """Negative cases check""" + # empty list + content_provider = StringContentProvider([]) + analysis_targets = content_provider.get_analysis_target() + assert len(analysis_targets) == 0 + + # mismatched amount of lists + content_provider = StringContentProvider(["a", "b", "c"], [2, 3]) + analysis_targets = content_provider.get_analysis_target() + assert len(analysis_targets) == 3 + assert analysis_targets[0].line_num == 1 + assert analysis_targets[1].line_num == 2 + assert analysis_targets[2].line_num == 3 + + content_provider = StringContentProvider(["a", "b", "c"], [5, 3, 4, 5]) + analysis_targets = content_provider.get_analysis_target() + assert len(analysis_targets) == 3 + assert analysis_targets[0].line_num == 1 + assert analysis_targets[1].line_num == 2 + assert analysis_targets[2].line_num == 3