Skip to content

Commit

Permalink
Merge branch 'auxiliary' into structures
Browse files Browse the repository at this point in the history
  • Loading branch information
babenek committed Nov 7, 2022
2 parents 37287ae + 97bdaa0 commit ab563ba
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 40 deletions.
13 changes: 7 additions & 6 deletions credsweeper/app.py
Expand Up @@ -272,10 +272,10 @@ def file_scan(self, content_provider: ContentProvider) -> List[Candidate]:
# reduce duplicated credentials
found_values = set(line_data.value for candidate in candidates
for line_data in candidate.line_data_list)
for i in extra_candidates:
for j in i.line_data_list:
if j.value not in found_values:
candidates.append(i)
for extra_candidate in extra_candidates:
for line_data in extra_candidate.line_data_list:
if line_data.value not in found_values:
candidates.append(extra_candidate)
break

# finally return result from 'file_scan'
Expand Down Expand Up @@ -350,7 +350,7 @@ def data_scan(self, data_provider: DataContentProvider, depth: int, recursive_li
except Exception as gzip_exc:
logger.error(f"{data_provider.file_path}:{gzip_exc}")

elif data_provider.is_encoded():
elif data_provider.represent_as_encoded():
decoded_data_provider = DataContentProvider(data=data_provider.decoded,
file_path=data_provider.file_path,
file_type=data_provider.file_type,
Expand All @@ -365,8 +365,9 @@ def data_scan(self, data_provider: DataContentProvider, depth: int, recursive_li
info=f"{data_provider.info}|STRUCT")
candidates.extend(self.struct_scan(struct_data_provider, depth, recursive_limit_size))

elif data_provider.is_xml():
elif data_provider.represent_as_xml():
string_data_provider = StringContentProvider(lines=data_provider.lines,
line_numbers=data_provider.line_numbers,
file_path=data_provider.file_path,
file_type=".xml",
info=f"{data_provider.info}|XML")
Expand Down
24 changes: 11 additions & 13 deletions credsweeper/file_handler/data_content_provider.py
Expand Up @@ -5,8 +5,6 @@
from typing import List, Optional

import yaml
from lxml import etree

from credsweeper.common.constants import DEFAULT_ENCODING
from credsweeper.file_handler.analysis_target import AnalysisTarget
from credsweeper.file_handler.content_provider import ContentProvider
Expand Down Expand Up @@ -35,6 +33,7 @@ def __init__(
self.structure = None
self.decoded: Optional[bytes] = None
self.lines: List[str] = []
self.line_numbers: List[int] = []

@property
def data(self) -> bytes:
Expand Down Expand Up @@ -87,23 +86,22 @@ def is_structure(self) -> bool:
# # # None of above
return False

def is_xml(self) -> bool:
def represent_as_xml(self) -> bool:
"""Tries to read data as xml
Return True if reading was successful
Return:
True if reading was successful
"""
try:
xml_text = self.data.decode(encoding=DEFAULT_ENCODING).splitlines()
tree = etree.fromstringlist(xml_text)
for element in tree.iter():
tag = Util.extract_element_data(element, "tag")
text = Util.extract_element_data(element, "text")
self.lines.append(f"{tag} : {text}")
self.lines, self.line_numbers = Util.get_xml_from_lines(xml_text)
except Exception as exc:
logger.debug("Cannot parse as XML:%s %s", exc, self.data)
return False
return bool(self.lines)
return bool(self.lines and self.line_numbers)

def is_encoded(self) -> bool:
def represent_as_encoded(self) -> bool:
"""Encodes data from base64. Stores result in decoded
Return:
Expand All @@ -114,8 +112,8 @@ def is_encoded(self) -> bool:
logger.debug("Weak data to decode from base64: %s", self.data)
try:
self.decoded = base64.b64decode( #
self.data.decode(encoding='ascii', errors='strict'). #
translate(str.maketrans('', '', string.whitespace)), #
self.data.decode(encoding="ascii", errors="strict"). #
translate(str.maketrans("", "", string.whitespace)), #
validate=True) #
except Exception as exc:
logger.debug("Cannot decoded as base64:%s %s", exc, self.data)
Expand Down
8 changes: 6 additions & 2 deletions credsweeper/file_handler/string_content_provider.py
Expand Up @@ -16,11 +16,15 @@ class StringContentProvider(ContentProvider):
def __init__(
self, #
lines: List[str], #
line_numbers: Optional[List[int]] = None, #
file_path: Optional[str] = None, #
file_type: Optional[str] = None, #
info: Optional[str] = None) -> None:
super().__init__(file_path=file_path, file_type=file_type, info=info)
self.lines = lines
# fill line numbers only when amounts are equal
self.line_numbers = line_numbers if line_numbers and len(self.lines) == len(line_numbers) \
else (list(range(1, 1 + len(self.lines))) if self.lines else [])

def get_analysis_target(self) -> List[AnalysisTarget]:
"""Return lines to scan.
Expand All @@ -30,6 +34,6 @@ def get_analysis_target(self) -> List[AnalysisTarget]:
"""
return [
AnalysisTarget(line, i + 1, self.lines, self.file_path, self.file_type, self.info)
for i, line in enumerate(self.lines)
AnalysisTarget(line, line_number, self.lines, self.file_path, self.file_type, self.info)
for line_number, line in zip(self.line_numbers, self.lines)
]
33 changes: 23 additions & 10 deletions credsweeper/utils/util.py
Expand Up @@ -272,6 +272,27 @@ def read_data(path: str) -> Optional[bytes]:
logger.error(f"Unexpected Error: Can not read '{path}'. Error message: '{exc}'")
return None

@staticmethod
def get_xml_from_lines(xml_lines: List[str]) -> Tuple[Optional[List[str]], Optional[List[int]]]:
"""Parse xml data from list of string and return List of str.
Args:
xml_lines: list of lines of xml data
Return:
List of formatted string(f"{root.tag} : {root.text}")
"""
lines = []
line_nums = []
tree = etree.fromstringlist(xml_lines)
for element in tree.iter():
tag = Util._extract_element_data(element, "tag")
text = Util._extract_element_data(element, "text")
lines.append(f"{tag} : {text}")
line_nums.append(element.sourceline)
return lines, line_nums

@staticmethod
def get_xml_data(file_path: str) -> Tuple[Optional[List[str]], Optional[List[int]]]:
"""Read xml data and return List of str.
Expand All @@ -285,24 +306,16 @@ def get_xml_data(file_path: str) -> Tuple[Optional[List[str]], Optional[List[int
List of formatted string(f"{root.tag} : {root.text}")
"""
lines = []
line_nums = []
try:
with open(file_path, "r") as f:
xml_lines = f.readlines()
tree = etree.fromstringlist(xml_lines)
for element in tree.iter():
tag = Util.extract_element_data(element, "tag")
text = Util.extract_element_data(element, "text")
lines.append(f"{tag} : {text}")
line_nums.append(element.sourceline)
return Util.get_xml_from_lines(xml_lines)
except Exception as exc:
logger.error(f"Cannot parse '{file_path}' to xml {exc}")
return None, None
return lines, line_nums

@staticmethod
def extract_element_data(element, attr) -> str:
def _extract_element_data(element, attr) -> str:
"""Extract xml element data to string.
Try to extract the xml data and strip() the string.
Expand Down
43 changes: 34 additions & 9 deletions tests/file_handler/test_string_content_provider.py
@@ -1,22 +1,47 @@
from typing import List

import pytest

from credsweeper.file_handler.analysis_target import AnalysisTarget
from credsweeper.file_handler.string_content_provider import StringContentProvider


class TestStringContentProvider:

@pytest.mark.parametrize("lines", [["line one", "password='in_line_2'"]])
def test_get_analysis_target_p(self, lines: List[str]) -> None:
def test_get_analysis_target_p(self) -> None:
"""Evaluate that lines data correctly extracted from file"""
lines = ["line one", "password='in_line_2'"]
content_provider = StringContentProvider(lines)
analysis_targets = content_provider.get_analysis_target()

assert len(analysis_targets) == len(lines)

expected_target = AnalysisTarget(lines[0], 1, lines, "", "", "")
assert analysis_targets[0] == expected_target
# check second target and line numeration
expected_target = AnalysisTarget(lines[1], 2, lines, "", "", "")
assert analysis_targets[1] == expected_target

assert len(analysis_targets) == len(lines)
# specific line numeration
content_provider = StringContentProvider(lines, [42, -1])
analysis_targets = content_provider.get_analysis_target()
assert analysis_targets[0].line_num == 42
assert analysis_targets[1].line_num == -1

target = analysis_targets[0]
assert target == expected_target
def test_get_analysis_target_n(self) -> None:
"""Negative cases check"""
# empty list
content_provider = StringContentProvider([])
analysis_targets = content_provider.get_analysis_target()
assert len(analysis_targets) == 0

# mismatched amount of lists
content_provider = StringContentProvider(["a", "b", "c"], [2, 3])
analysis_targets = content_provider.get_analysis_target()
assert len(analysis_targets) == 3
assert analysis_targets[0].line_num == 1
assert analysis_targets[1].line_num == 2
assert analysis_targets[2].line_num == 3

content_provider = StringContentProvider(["a", "b", "c"], [5, 3, 4, 5])
analysis_targets = content_provider.get_analysis_target()
assert len(analysis_targets) == 3
assert analysis_targets[0].line_num == 1
assert analysis_targets[1].line_num == 2
assert analysis_targets[2].line_num == 3

0 comments on commit ab563ba

Please sign in to comment.