Skip to content

Commit

Permalink
Structure scan JSON and YAML
Browse files Browse the repository at this point in the history
  • Loading branch information
babenek committed Oct 26, 2022
1 parent 2ddfee3 commit 9987e12
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 72 deletions.
113 changes: 42 additions & 71 deletions credsweeper/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import signal
import sys
import zipfile
from typing import List, Optional, Union
from typing import List, Optional, Union, Tuple, Any

import pandas as pd

Expand Down Expand Up @@ -406,39 +406,46 @@ def struct_scan(self, struct_provider: StructContentProvider, depth: int, recurs

depth -= 1

items: List[Tuple[Union[int, str], Any]] = []
if isinstance(struct_provider.struct, dict):
for key, value in struct_provider.struct.items():
if isinstance(value, dict) or isinstance(value, list):
val_struct_provider = StructContentProvider(struct=value,
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|STRUCT:{key}")
candidates.extend(self.struct_scan(val_struct_provider, depth, recursive_limit_size))

elif isinstance(value, bytes):
val_struct_provider = DataContentProvider(data=value,
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|BYTES:{key}")
new_limit = recursive_limit_size - len(value)
new_candidates = self.data_scan(val_struct_provider, depth, new_limit)
candidates.extend(new_candidates)

elif isinstance(value, str):
val_struct_provider = DataContentProvider(data=value.encode(encoding=DEFAULT_ENCODING),
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|STR_DATA:`{key}`")
new_limit = recursive_limit_size - len(val_struct_provider.data)
new_candidates = self.data_scan(val_struct_provider, depth, new_limit)
candidates.extend(new_candidates)

# use key = "value" scan for common cases like in Python code
line = f"{key} = \"{value}\""
str_provider = ByteContentProvider(line.encode(encoding=DEFAULT_ENCODING),
file_path=struct_provider.file_path,
file_type=".py",
info=f'{struct_provider.info}|STRING:`{key} = "{value}"`')
items = list(struct_provider.struct.items())
elif isinstance(struct_provider.struct, list):
items = list(enumerate(struct_provider.struct))
else:
logger.error("Not supported type:%s val:%s", str(type(struct_provider.struct)), str(struct_provider.struct))

for key, value in items:
if isinstance(value, dict) or isinstance(value, list):
val_struct_provider = StructContentProvider(struct=value,
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|STRUCT:{key}")
candidates.extend(self.struct_scan(val_struct_provider, depth, recursive_limit_size))

elif isinstance(value, bytes):
val_struct_provider = DataContentProvider(data=value,
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|BYTES:{key}")
new_limit = recursive_limit_size - len(value)
new_candidates = self.data_scan(val_struct_provider, depth, new_limit)
candidates.extend(new_candidates)

elif isinstance(value, str):
val_struct_provider = DataContentProvider(data=value.encode(encoding=DEFAULT_ENCODING),
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|STRING:{key}")
new_limit = recursive_limit_size - len(val_struct_provider.data)
new_candidates = self.data_scan(val_struct_provider, depth, new_limit)
candidates.extend(new_candidates)

# use key = "value" scan for common cases like in Python code
if isinstance(struct_provider.struct, dict):
str_provider = StringContentProvider([f"{key} = \"{value}\""],
file_path=struct_provider.file_path,
file_type=".py",
info=f"{struct_provider.info}|STRING:`{key} = \"{value}\"`")
extra_candidates = self.file_scan(str_provider)
if extra_candidates:
found_values = set(
Expand All @@ -449,44 +456,8 @@ def struct_scan(self, struct_provider: StructContentProvider, depth: int, recurs
candidates.append(extra_candidate)
break

else:
logger.debug("Not supported type:%s value(%s)", str(type(value)), str(value))

elif isinstance(struct_provider.struct, list):
n = 0
for i in struct_provider.struct:
if isinstance(i, dict) or isinstance(i, list):
item_struct_provider = StructContentProvider(struct=i,
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}{{[{n}]}}")
new_candidates = self.struct_scan(item_struct_provider, depth, recursive_limit_size)
candidates.extend(new_candidates)

elif isinstance(i, str):
val_struct_provider = DataContentProvider(data=i.encode(encoding=DEFAULT_ENCODING),
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|DATA_STR[{n}]")
new_limit = recursive_limit_size - len(val_struct_provider.data)
new_candidates = self.data_scan(val_struct_provider, depth, new_limit)
candidates.extend(new_candidates)

elif isinstance(i, bytes):
val_struct_provider = DataContentProvider(data=i,
file_path=struct_provider.file_path,
file_type=struct_provider.file_type,
info=f"{struct_provider.info}|BYTES[{n}]")
new_limit = recursive_limit_size - len(i)
new_candidates = self.data_scan(val_struct_provider, depth, new_limit)
candidates.extend(new_candidates)

else:
logger.debug("Not supported type:%s val:%s", str(type(i)), str(i))
n += 1

else:
logger.error("Not supported type:%s val:%s", str(type(struct_provider.struct)), str(struct_provider.struct))
else:
logger.debug("Not supported type:%s value(%s)", str(type(value)), str(value))

return candidates

Expand Down
17 changes: 17 additions & 0 deletions tests/samples/binary.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
body:
string: !!binary |
H4sICIur8mIAA3BlbV9rZXkAbdM3kqNAAEDRnFNMTk3hEQo2oAG1sKIBASIbnLDC29OvifenP37f
338CClStL8cVv2xH9UVP+dKV19/xjZmqKoWiCkRRB28kDVBf6gclZ5eziCoev5PDXHm1v2+e1K96
xmZSRN7sYSzJJKa1KA81Qn6/3Bu/PntsazUobD6K9CqDSSU/DO7ZTMsy3T6JdAYAXRzderrZ1CLH
dGHtxxTBVPhUR/xzDnBuIa/N3ZoqfkYcRk2Ua48SqLM0tnLS60kYm5p8OGx29Ug2ijZVFpEIxA6K
t7KqO47HB3hYgkk6/vHjiOGJ47s33IFRYMy8s/7bnEeEB8pbqorO2zqa0U0gLhp0Xx+n7UBkMo2Z
e3q2qrVYprayry8pbbn0NTCh1xl1baycQWO9qvqPmylDXFfcj3jzLw2d4MnndMyAxGM+F1qHkrQz
WnbfMHhE0vlqlBxHtLH72hUJITkTNz4vVRRicKmBymZmFM3sZ0oOuqNo/Xh9spHx+y5TcKunBzxi
+lU0U+LHOhERXIMfFbecPNmf2tjm9qbClmfKBhNrRdwlg7ujmI7RyIKjGxMzaIlCsWkzOp2Hf2GO
G0sV9uRI15bn9bHIHte77WlLxxDXievxaYD7o7lhBmnJM+vW3VS94aaJt7o5HGqJiM3WqoqnqCQF
yTk3djp0+zQh+CkEDpxSRSxMMIBeoddqPY71ULkaC/mzvrhkU+nzTFefg8ZJ0p9ANiINiBqUKPPN
PY6046xN5kHpPEZ7hx0d9168EHkxekIW32vvpLO+wZ5XHyEXnS+qi0w/FEqq5YKnZ9gnfRiaCpCf
hkNhSgjArlzczq1+8mfhX0oqUWAfC0LBWeAVnUEII4y5TBqHpgCftKOz0ozZ78KahsX5vGSeQMt8
SEzJdnWiEyf4UdLYnvyF/cOjWPJ/Uf0Gdno9KXQDAAA=
secret: |
we5345d0f3da48544z1t1e275y05i161x995q485
1 change: 1 addition & 0 deletions tests/samples/struct.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"test.domain.io/actual-configuration": "{\"apiVersion\":\"v1\",\"data\":{\"smtp-password\":\"\",\"wordpress-password\":\"Axt4T0eO0lm9sS==\"},\"kind\":\"Secret\",\"metadata\":{\"annotations\":{},\"labels\":{\"app\":\"wordpress-wordpress\",\"chart\":\"wordpress-5.0.1\",\"heritage\":\"Tiller\",\"release\":\"wordpress\"},\"name\":\"wordpress-wordpress\",\"namespace\":\"argocd\"},\"type\":\"Opaque\"}\n"}
51 changes: 50 additions & 1 deletion tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,57 @@ def test_zip_p(self) -> None:

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_json_p(self) -> None:
# test for finding credentials in JSON
content_provider: FilesProvider = TextProvider([SAMPLES_DIR / "struct.json"])
# depth must be set in constructor to remove .zip as ignored extension
cred_sweeper = CredSweeper(depth=5)
cred_sweeper.run(content_provider=content_provider)
found_credentials = cred_sweeper.credential_manager.get_credentials()
assert len(found_credentials) == 1
assert {"Password"} == set(i.rule_name for i in found_credentials)
assert {"Axt4T0eO0lm9sS=="} == set(i.line_data_list[0].value for i in found_credentials)

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_json_n(self) -> None:
# test to prove that no credentials are found without depth
content_provider: FilesProvider = TextProvider([SAMPLES_DIR / "struct.json"])
# depth must be set in constructor to remove .zip as ignored extension
cred_sweeper = CredSweeper()
cred_sweeper.run(content_provider=content_provider)
found_credentials = cred_sweeper.credential_manager.get_credentials()
assert len(found_credentials) == 0

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_yaml_p(self) -> None:
# test for finding credentials in YAML
content_provider: FilesProvider = TextProvider([SAMPLES_DIR / "binary.yaml"])
# depth must be set in constructor to remove .zip as ignored extension
cred_sweeper = CredSweeper(depth=5)
cred_sweeper.run(content_provider=content_provider)
found_credentials = cred_sweeper.credential_manager.get_credentials()
assert len(found_credentials) == 2
assert {"Secret", "PEM Certificate"} == set(i.rule_name for i in found_credentials)
assert {"we5345d0f3da48544z1t1e275y05i161x995q485\n", "-----BEGIN RSA PRIVATE"} == \
set(i.line_data_list[0].value for i in found_credentials)

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_yaml_n(self) -> None:
# test to prove that no credentials are found without depth
content_provider: FilesProvider = TextProvider([SAMPLES_DIR / "binary.yaml"])
# depth must be set in constructor to remove .zip as ignored extension
cred_sweeper = CredSweeper()
cred_sweeper.run(content_provider=content_provider)
found_credentials = cred_sweeper.credential_manager.get_credentials()
assert len(found_credentials) == 0

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_encoded_p(self) -> None:
# test for finding credentials in docx
# test for finding credentials in ENCODED data
content_provider: FilesProvider = TextProvider([SAMPLES_DIR / "encoded"])
# depth must be set in constructor to remove .zip as ignored extension
cred_sweeper = CredSweeper(depth=5)
Expand Down

0 comments on commit 9987e12

Please sign in to comment.