Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YAML and JSON are scanned like structures #236

Merged
merged 2 commits into from Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/fuzz.yml
Expand Up @@ -78,8 +78,9 @@ jobs:
run: |
COVERAGE=$(tail -1 report.txt | awk '{print $6}' | tr --delete '%')
# additionally check correctness of the value - should be an integer
if ! [ 75 -le ${COVERAGE} ]; then
echo "Fuzzing coverage '${COVERAGE}' does not satisfy the limit 74%"
FUZZ_COVERAGE_LIMIT=75
if ! [ ${FUZZ_COVERAGE_LIMIT} -le ${COVERAGE} ]; then
echo "Fuzzing coverage '${COVERAGE}' does not satisfy the limit ${FUZZ_COVERAGE_LIMIT}%"
exit 1
fi

Expand Down
2 changes: 1 addition & 1 deletion cicd/mypy_warnings.txt
@@ -1 +1 @@
Success: no issues found in 83 source files
Success: no issues found in 84 source files
94 changes: 90 additions & 4 deletions credsweeper/app.py
Expand Up @@ -7,11 +7,12 @@
import signal
import sys
import zipfile
from typing import List, Optional, Union
from typing import List, Optional, Union, Tuple, Any

import pandas as pd

from credsweeper.common.constants import KeyValidationOption, ThresholdPreset, RECURSIVE_SCAN_LIMITATION
from credsweeper.common.constants import KeyValidationOption, ThresholdPreset, RECURSIVE_SCAN_LIMITATION, \
DEFAULT_ENCODING
from credsweeper.config import Config
from credsweeper.credentials import Candidate, CredentialManager
from credsweeper.file_handler.byte_content_provider import ByteContentProvider
Expand All @@ -21,6 +22,7 @@
from credsweeper.file_handler.file_path_extractor import FilePathExtractor
from credsweeper.file_handler.files_provider import FilesProvider
from credsweeper.file_handler.string_content_provider import StringContentProvider
from credsweeper.file_handler.struct_content_provider import StructContentProvider
from credsweeper.file_handler.text_content_provider import TextContentProvider
from credsweeper.scanner import Scanner
from credsweeper.utils import Util
Expand Down Expand Up @@ -356,13 +358,20 @@ def data_scan(self, data_provider: DataContentProvider, depth: int, recursive_li
new_limit = recursive_limit_size - len(decoded_data_provider.data)
candidates.extend(self.data_scan(decoded_data_provider, depth, new_limit))

elif data_provider.represent_as_structure():
struct_data_provider = StructContentProvider(struct=data_provider.structure,
file_path=data_provider.file_path,
file_type=data_provider.file_type,
info=f"{data_provider.info}|STRUCT")
candidates.extend(self.struct_scan(struct_data_provider, depth, recursive_limit_size))

elif data_provider.represent_as_xml():
struct_data_provider = StringContentProvider(lines=data_provider.lines,
string_data_provider = StringContentProvider(lines=data_provider.lines,
line_numbers=data_provider.line_numbers,
file_path=data_provider.file_path,
file_type=".xml",
info=f"{data_provider.info}|XML")
candidates.extend(self.file_scan(struct_data_provider))
candidates.extend(self.file_scan(string_data_provider))

else:
# finally try scan the data via byte content provider
Expand All @@ -378,6 +387,83 @@ def data_scan(self, data_provider: DataContentProvider, depth: int, recursive_li

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def struct_scan(self, struct_provider: StructContentProvider, depth: int, recursive_limit_size: int) -> \
List[Candidate]:
"""Recursive function to scan structured data

Args:
struct_provider: DataContentProvider object may be a container
depth: maximal level of recursion
recursive_limit_size: maximal bytes of opened files to prevent recursive zip-bomb attack
"""
candidates: List[Candidate] = []
logger.debug("Start struct_scan: depth=%d, limit=%d, path=%s, info=%s", depth, recursive_limit_size,
struct_provider.file_path, struct_provider.info)

if 0 > depth:
# break recursion if maximal depth is reached
logger.debug("bottom reached %s recursive_limit_size:%d", struct_provider.file_path, recursive_limit_size)
return candidates

depth -= 1

items: List[Tuple[Union[int, str], Any]] = []
if isinstance(struct_provider.struct, dict):
items = list(struct_provider.struct.items())
elif isinstance(struct_provider.struct, list):
items = list(enumerate(struct_provider.struct))
else:
logger.error("Not supported type:%s val:%s", str(type(struct_provider.struct)), str(struct_provider.struct))

for key, value in items:
if isinstance(value, dict) or isinstance(value, list):
val_struct_provider = StructContentProvider(struct=value,
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|STRUCT:{key}")
candidates.extend(self.struct_scan(val_struct_provider, depth, recursive_limit_size))

elif isinstance(value, bytes):
bytes_struct_provider = DataContentProvider(data=value,
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|BYTES:{key}")
new_limit = recursive_limit_size - len(value)
new_candidates = self.data_scan(bytes_struct_provider, depth, new_limit)
candidates.extend(new_candidates)

elif isinstance(value, str):
str_struct_provider = DataContentProvider(data=value.encode(encoding=DEFAULT_ENCODING),
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|STRING:{key}")
new_limit = recursive_limit_size - len(str_struct_provider.data)
new_candidates = self.data_scan(str_struct_provider, depth, new_limit)
candidates.extend(new_candidates)

# use key = "value" scan for common cases like in Python code
if isinstance(struct_provider.struct, dict):
str_provider = StringContentProvider([f"{key} = \"{value}\""],
file_path=struct_provider.file_path,
file_type=".py",
info=f"{struct_provider.info}|STRING:`{key} = \"{value}\"`")
extra_candidates = self.file_scan(str_provider)
if extra_candidates:
found_values = set(line_data.value for candidate in candidates
for line_data in candidate.line_data_list)
for extra_candidate in extra_candidates:
for line_data in extra_candidate.line_data_list:
if line_data.value not in found_values:
candidates.append(extra_candidate)
break

else:
logger.debug("Not supported type:%s value(%s)", str(type(value)), str(value))

return candidates

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def post_processing(self) -> None:
"""Machine learning validation for received credential candidates."""
if self._use_ml_validation():
Expand Down
40 changes: 40 additions & 0 deletions credsweeper/file_handler/data_content_provider.py
@@ -1,8 +1,10 @@
import base64
import json
import logging
import string
from typing import List, Optional

import yaml
from credsweeper.common.constants import DEFAULT_ENCODING
from credsweeper.file_handler.analysis_target import AnalysisTarget
from credsweeper.file_handler.content_provider import ContentProvider
Expand All @@ -28,6 +30,7 @@ def __init__(
info: Optional[str] = None) -> None:
super().__init__(file_path=file_path, file_type=file_type, info=info)
self.data = data
self.structure = None
self.decoded: Optional[bytes] = None
self.lines: List[str] = []
self.line_numbers: List[int] = []
Expand All @@ -42,6 +45,43 @@ def data(self, data: bytes) -> None:
"""data setter"""
self.__data = data

def represent_as_structure(self) -> bool:
"""Tries to convert data with many parsers. Stores result to internal structure
Return True if some structure found
"""
try:
text = self.data.decode(encoding='utf-8', errors='strict')
except Exception:
return False
# JSON
try:
if "{" in text:
self.structure = json.loads(text)
logger.debug("CONVERTED from json")
else:
logger.debug("Data do not contain { - weak JSON")
except Exception as exc:
logger.debug("Cannot parse as json:%s %s", exc, self.data)
self.structure = None
if self.structure is not None and (isinstance(self.structure, dict) and 0 < len(self.structure.keys())
or isinstance(self.structure, list) and 0 < len(self.structure)):
return True
# # # YAML - almost always recognized
try:
if ":" in text:
self.structure = yaml.load(text, Loader=yaml.FullLoader)
logger.debug("CONVERTED from yaml")
else:
logger.debug("Data do not contain colon mark - weak YAML")
except Exception as exc:
logger.debug("Cannot parse as yaml:%s %s", exc, self.data)
self.structure = None
if self.structure is not None and (isinstance(self.structure, dict) and 0 < len(self.structure.keys())
or isinstance(self.structure, list) and 0 < len(self.structure)):
return True
# # # None of above
return False

def represent_as_xml(self) -> bool:
"""Tries to read data as xml

Expand Down
45 changes: 45 additions & 0 deletions credsweeper/file_handler/struct_content_provider.py
@@ -0,0 +1,45 @@
import logging
from typing import List, Optional, Any

from credsweeper.file_handler.analysis_target import AnalysisTarget
from credsweeper.file_handler.content_provider import ContentProvider

logger = logging.getLogger(__name__)


class StructContentProvider(ContentProvider):
"""Dummy raw provider to keep structured data

Parameters:
struct: byte sequence to be stored.
file_path: optional string. Might be specified if you know true file name where lines were taken from.

"""

def __init__(
self, #
struct: Any, #
file_path: Optional[str] = None, #
file_type: Optional[str] = None, #
info: Optional[str] = None) -> None:
super().__init__(file_path=file_path, file_type=file_type, info=info)
self.struct = struct

@property
def struct(self) -> Any:
"""obj getter"""
return self.__struct

@struct.setter
def struct(self, struct: Any) -> None:
"""obj setter"""
self.__struct = struct

def get_analysis_target(self) -> List[AnalysisTarget]:
"""Return nothing. The class provides only data storage.

Raise:
NotImplementedError

"""
raise NotImplementedError()
Binary file removed fuzz/corpus/096ec2ed3a11a2c4422fe445f86fc03963adf350
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions fuzz/corpus/19521a7555bd197646dd224e3890064c2f4cf9bf
@@ -0,0 +1 @@
{"t:ire\"l#b.ls\":\"ap\n"}
1 change: 0 additions & 1 deletion fuzz/corpus/1df555ea6ab8f834626d3002c2d3eaf7746450ae

This file was deleted.

Binary file removed fuzz/corpus/2023cf6be65f362b3892de9f6f1f8b7eec51d3ef
Binary file not shown.
1 change: 1 addition & 0 deletions fuzz/corpus/248b61d9c284b005868a4c0a80854281f99a7bdb
@@ -0,0 +1 @@
PK�4
Binary file not shown.
Binary file not shown.
Binary file removed fuzz/corpus/2bc2e50780867fcce228c61c332e93862a7b8716
Binary file not shown.
1 change: 1 addition & 0 deletions fuzz/corpus/2e0ed4d19120c0095af1391325e73d763e97268d
@@ -0,0 +1 @@
<link rel="0ERgAAQA06dded84d6a99f6126968f210a526d9bb8E"/>
Binary file not shown.
Binary file not shown.
5 changes: 0 additions & 5 deletions fuzz/corpus/50127775b46b8b432f85928ab5d21cc9a5b9916a

This file was deleted.

Binary file not shown.
Binary file not shown.
2 changes: 2 additions & 0 deletions fuzz/corpus/76dadf69d4d7273035b509c4fb172df1a1e3aa42
@@ -0,0 +1,2 @@
="<?password">cackl/Cit䝉 <password name="password">ace_for_ukraine</password>
</ies>
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions fuzz/corpus/8e97bf37cf7ab00ab5b748b108e331a60394d956
@@ -0,0 +1 @@
pwd : "cace!"
Binary file not shown.
1 change: 1 addition & 0 deletions fuzz/corpus/90ad557dfb83213e837631d70ed888c7cd8bd2e0
@@ -0,0 +1 @@
{"test.domain.io/actual-configuration": "{\"apiVersion\":\"v1\",\"data\":{\"smtp-password\":\"\",\"wordpresdpress-wordpress\",\"chart\":\"wordp:\"wordpress\"},\"name\":\"wordpress-wopaque\"}\n"}
Binary file not shown.
Binary file not shown.
Binary file not shown.
4 changes: 4 additions & 0 deletions fuzz/corpus/9e17092d3faa95169b6eb6da9ac36d4650175281
@@ -0,0 +1,4 @@
IREOGIogicr_gireAbody:
WM824c3
sk_liv,e_gireogicracklea)pGI: !!binary |
H4sICv2xH9UVPREO
2 changes: 2 additions & 0 deletions fuzz/corpus/9e5b0e206c5e57b929e482e6ae5abf8b036367ef
@@ -0,0 +1,2 @@
body:
- stringna6@^ame\":---ordpress-wordp485
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
1 change: 0 additions & 1 deletion fuzz/corpus/cd4f0538db030c9794809b18f32321b125d67b28

This file was deleted.

Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file removed fuzz/corpus/e245e2f884af2d01bea253673200a29f51363fa2
Binary file not shown.
Binary file removed fuzz/corpus/e5a8d3e8ac6b758fde8c6315baf28e825d029ea8
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file removed fuzz/corpus/fd8463c8426b79d944d2ff54554982c98617c3c8
Binary file not shown.
6 changes: 3 additions & 3 deletions tests/__init__.py
@@ -1,7 +1,7 @@
from pathlib import Path

# total number of files in test samples, included .gitignore
SAMPLES_FILES_COUNT: int = 52
SAMPLES_FILES_COUNT: int = 54

# credentials count after scan
SAMPLES_CRED_COUNT: int = 51
Expand All @@ -12,8 +12,8 @@

# archived credentials that not found without --depth
SAMPLES_IN_DEEP_1 = 6
SAMPLES_IN_DEEP_2 = 7
SAMPLES_IN_DEEP_3 = 8
SAMPLES_IN_DEEP_2 = 8
SAMPLES_IN_DEEP_3 = 9

SAMPLES_FILTERED_BY_POST_COUNT = 1

Expand Down
17 changes: 17 additions & 0 deletions tests/samples/binary.yaml
@@ -0,0 +1,17 @@
body:
string: !!binary |
H4sICIur8mIAA3BlbV9rZXkAbdM3kqNAAEDRnFNMTk3hEQo2oAG1sKIBASIbnLDC29OvifenP37f
338CClStL8cVv2xH9UVP+dKV19/xjZmqKoWiCkRRB28kDVBf6gclZ5eziCoev5PDXHm1v2+e1K96
xmZSRN7sYSzJJKa1KA81Qn6/3Bu/PntsazUobD6K9CqDSSU/DO7ZTMsy3T6JdAYAXRzderrZ1CLH
dGHtxxTBVPhUR/xzDnBuIa/N3ZoqfkYcRk2Ua48SqLM0tnLS60kYm5p8OGx29Ug2ijZVFpEIxA6K
t7KqO47HB3hYgkk6/vHjiOGJ47s33IFRYMy8s/7bnEeEB8pbqorO2zqa0U0gLhp0Xx+n7UBkMo2Z
e3q2qrVYprayry8pbbn0NTCh1xl1baycQWO9qvqPmylDXFfcj3jzLw2d4MnndMyAxGM+F1qHkrQz
WnbfMHhE0vlqlBxHtLH72hUJITkTNz4vVRRicKmBymZmFM3sZ0oOuqNo/Xh9spHx+y5TcKunBzxi
+lU0U+LHOhERXIMfFbecPNmf2tjm9qbClmfKBhNrRdwlg7ujmI7RyIKjGxMzaIlCsWkzOp2Hf2GO
G0sV9uRI15bn9bHIHte77WlLxxDXievxaYD7o7lhBmnJM+vW3VS94aaJt7o5HGqJiM3WqoqnqCQF
yTk3djp0+zQh+CkEDpxSRSxMMIBeoddqPY71ULkaC/mzvrhkU+nzTFefg8ZJ0p9ANiINiBqUKPPN
PY6046xN5kHpPEZ7hx0d9168EHkxekIW32vvpLO+wZ5XHyEXnS+qi0w/FEqq5YKnZ9gnfRiaCpCf
hkNhSgjArlzczq1+8mfhX0oqUWAfC0LBWeAVnUEII4y5TBqHpgCftKOz0ozZ78KahsX5vGSeQMt8
SEzJdnWiEyf4UdLYnvyF/cOjWPJ/Uf0Gdno9KXQDAAA=
secret: |
we5345d0f3da48544z1t1e275y05i161x995q485
1 change: 1 addition & 0 deletions tests/samples/struct.json
@@ -0,0 +1 @@
{"test.domain.io/actual-configuration": "{\"apiVersion\":\"v1\",\"data\":{\"smtp-password\":\"\",\"wordpress-password\":\"Axt4T0eO0lm9sS==\"},\"kind\":\"Secret\",\"metadata\":{\"annotations\":{},\"labels\":{\"app\":\"wordpress-wordpress\",\"chart\":\"wordpress-5.0.1\",\"heritage\":\"Tiller\",\"release\":\"wordpress\"},\"name\":\"wordpress-wordpress\",\"namespace\":\"argocd\"},\"type\":\"Opaque\"}\n"}
51 changes: 50 additions & 1 deletion tests/test_main.py
Expand Up @@ -362,8 +362,57 @@ def test_zip_p(self) -> None:

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_json_p(self) -> None:
# test for finding credentials in JSON
content_provider: FilesProvider = TextProvider([SAMPLES_DIR / "struct.json"])
# depth must be set in constructor to remove .zip as ignored extension
cred_sweeper = CredSweeper(depth=5)
cred_sweeper.run(content_provider=content_provider)
found_credentials = cred_sweeper.credential_manager.get_credentials()
assert len(found_credentials) == 1
assert {"Password"} == set(i.rule_name for i in found_credentials)
assert {"Axt4T0eO0lm9sS=="} == set(i.line_data_list[0].value for i in found_credentials)

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_json_n(self) -> None:
# test to prove that no credentials are found without depth
content_provider: FilesProvider = TextProvider([SAMPLES_DIR / "struct.json"])
# depth must be set in constructor to remove .zip as ignored extension
cred_sweeper = CredSweeper()
cred_sweeper.run(content_provider=content_provider)
found_credentials = cred_sweeper.credential_manager.get_credentials()
assert len(found_credentials) == 0

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_yaml_p(self) -> None:
# test for finding credentials in YAML
content_provider: FilesProvider = TextProvider([SAMPLES_DIR / "binary.yaml"])
# depth must be set in constructor to remove .zip as ignored extension
cred_sweeper = CredSweeper(depth=5)
cred_sweeper.run(content_provider=content_provider)
found_credentials = cred_sweeper.credential_manager.get_credentials()
assert len(found_credentials) == 2
assert {"Secret", "PEM Certificate"} == set(i.rule_name for i in found_credentials)
assert {"we5345d0f3da48544z1t1e275y05i161x995q485\n", "-----BEGIN RSA PRIVATE"} == \
set(i.line_data_list[0].value for i in found_credentials)

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_yaml_n(self) -> None:
# test to prove that no credentials are found without depth
content_provider: FilesProvider = TextProvider([SAMPLES_DIR / "binary.yaml"])
# depth must be set in constructor to remove .zip as ignored extension
cred_sweeper = CredSweeper()
cred_sweeper.run(content_provider=content_provider)
found_credentials = cred_sweeper.credential_manager.get_credentials()
assert len(found_credentials) == 0

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_encoded_p(self) -> None:
# test for finding credentials in docx
# test for finding credentials in ENCODED data
content_provider: FilesProvider = TextProvider([SAMPLES_DIR / "encoded"])
# depth must be set in constructor to remove .zip as ignored extension
cred_sweeper = CredSweeper(depth=5)
Expand Down