Skip to content

Commit

Permalink
chore: add packages propagation tests (#9577)
Browse files Browse the repository at this point in the history
  • Loading branch information
juanjux committed Jun 18, 2024
1 parent 3de23a1 commit c0bc1eb
Show file tree
Hide file tree
Showing 26 changed files with 868 additions and 16 deletions.
14 changes: 14 additions & 0 deletions tests/appsec/iast_packages/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import pytest


@pytest.fixture(name="printer")
def printer(request):
terminal_reporter = request.config.pluginmanager.getplugin("terminalreporter")
capture_manager = request.config.pluginmanager.get_plugin("capturemanager")

def printer(*args, **kwargs):
with capture_manager.global_and_fixture_disabled():
if terminal_reporter is not None: # pragma: no branch
terminal_reporter.write_line(*args, **kwargs)

return printer
30 changes: 30 additions & 0 deletions tests/appsec/iast_packages/packages/pkg_attrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,33 @@ class User:
response.result1 = str(e)

return response.json()


@pkg_attrs.route("/attrs_propagation")
def pkg_attrs_propagation_view():
import attrs

from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted

response = ResultResponse(request.args.get("package_param"))
if not is_pyobject_tainted(response.package_param):
response.result1 = "Error: package_param is not tainted"
return response.json()

try:

@attrs.define
class UserPropagation:
name: str
age: int

user = UserPropagation(name=response.package_param, age=65)
if not is_pyobject_tainted(user.name):
response.result1 = "Error: user.name is not tainted"
return response.json()

response.result1 = "OK"
except Exception as e:
response.result1 = str(e)

return response.json()
28 changes: 27 additions & 1 deletion tests/appsec/iast_packages/packages/pkg_beautifulsoup4.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


@pkg_beautifulsoup4.route("/beautifulsoup4")
def pkg_beautifusoup4_view():
def pkg_beautifulsoup4_view():
from bs4 import BeautifulSoup

response = ResultResponse(request.args.get("package_param"))
Expand All @@ -24,3 +24,29 @@ def pkg_beautifusoup4_view():
except Exception:
pass
return response.json()


@pkg_beautifulsoup4.route("/beautifulsoup4_propagation")
def pkg_beautifulsoup4_propagation_view():
from bs4 import BeautifulSoup

from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted

response = ResultResponse(request.args.get("package_param"))
if not is_pyobject_tainted(response.package_param):
response.result1 = "Error: package_param is not tainted"
return response.json()

try:
html = response.package_param
soup = BeautifulSoup(html, "html.parser")
html_tags = soup.find_all("html")
output = "".join(str(tag) for tag in html_tags).lstrip()
if not is_pyobject_tainted(output):
response.result1 = "Error: output is not tainted: " + str(output)
return response.json()
response.result1 = "OK"
except Exception as e:
response.result1 = "Exception: " + str(e)

return response.json()
33 changes: 33 additions & 0 deletions tests/appsec/iast_packages/packages/pkg_cachetools.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,36 @@ def expensive_function(key):
response.result1 = f"Error: {str(e)}"

return jsonify(response.json())


@pkg_cachetools.route("/cachetools_propagation")
def pkg_cachetools_propagation_view():
import cachetools

from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted

response = ResultResponse(request.args.get("package_param"))

try:
param_value = request.args.get("package_param", "default-key")
if not is_pyobject_tainted(param_value):
response.result1 = "Error: package_param is not tainted"
return jsonify(response.json())

cache = cachetools.LRUCache(maxsize=2)

@cachetools.cached(cache)
def expensive_function(key):
return f"Computed value for {key}"

try:
# Access the cache with the parameter value
res = expensive_function(param_value)
result_output = "OK" if is_pyobject_tainted(res) else f"Error: result is not tainted: {res}"
except Exception as e:
result_output = f"Error: {str(e)}"
except Exception as e:
result_output = f"Error: {str(e)}"

response.result1 = result_output
return jsonify(response.json())
23 changes: 22 additions & 1 deletion tests/appsec/iast_packages/packages/pkg_chartset_normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
https://pypi.org/project/charset-normalizer/
"""
from charset_normalizer import from_bytes
from flask import Blueprint
from flask import request

Expand All @@ -15,6 +14,28 @@

@pkg_chartset_normalizer.route("/charset-normalizer")
def pkg_charset_normalizer_view():
from charset_normalizer import from_bytes

response = ResultResponse(request.args.get("package_param"))
response.result1 = str(from_bytes(bytes(response.package_param, encoding="utf-8")).best())
return response.json()


@pkg_chartset_normalizer.route("/charset-normalizer_propagation")
def pkg_charset_normalizer_propagation_view():
from charset_normalizer import from_bytes

from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted

response = ResultResponse(request.args.get("package_param"))
if not is_pyobject_tainted(response.package_param):
response.result1 = "Error: package_param is not tainted"
return response.json()

try:
res = str(from_bytes(bytes(response.package_param, encoding="utf-8")).best())
response.result1 = "OK" if is_pyobject_tainted(res) else "Error: result is not tainted: %s" % res
except Exception as e:
response.result1 = str(e)

return response.json()
35 changes: 35 additions & 0 deletions tests/appsec/iast_packages/packages/pkg_cryptography.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,38 @@ def pkg_cryptography_view():
response.result1 = str(e)

return response.json()


@pkg_cryptography.route("/cryptography_propagation")
def pkg_cryptography_propagation_view():
from cryptography.fernet import Fernet

from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted

response = ResultResponse(request.args.get("package_param"))
if not is_pyobject_tainted(response.package_param):
response.result1 = "Error: package_param is not tainted"
return response.json()

try:
key = Fernet.generate_key()
fernet = Fernet(key)

encrypted_message = fernet.encrypt(response.package_param.encode())
decrypted_message = fernet.decrypt(encrypted_message).decode()

result = {
"key": key.decode(),
"encrypted_message": encrypted_message.decode(),
"decrypted_message": decrypted_message,
}

if not is_pyobject_tainted(result["decrypted_message"]):
response.result1 = "Error: result['decrypted_message'] is not tainted: %s" % result["decrypted_message"]
return response.json()

response.result1 = "OK"
except Exception as e:
response.result1 = str(e)

return response.json()
29 changes: 29 additions & 0 deletions tests/appsec/iast_packages/packages/pkg_docutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,32 @@ def pkg_docutils_view():
response.result1 = f"Error: {str(e)}"

return jsonify(response.json())


@pkg_docutils.route("/docutils_propagation")
def pkg_docutils_propagation_view():
import docutils.core

from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted

response = ResultResponse(request.args.get("package_param"))

try:
rst_content = request.args.get("package_param", "Hello, **world**!")
if not is_pyobject_tainted(rst_content):
response.result1 = "Error: package_param is not tainted"
return jsonify(response.json())

try:
# Convert reStructuredText to HTML
html_output = docutils.core.publish_string(rst_content, writer_name="html").decode("utf-8")
result_output = (
"OK" if is_pyobject_tainted(html_output) else f"Error: html_output is not tainted: {html_output}"
)
except Exception as e:
result_output = f"Error: {str(e)}"
except Exception as e:
result_output = f"Error: {str(e)}"

response.result1 = result_output
return jsonify(response.json())
37 changes: 37 additions & 0 deletions tests/appsec/iast_packages/packages/pkg_exceptiongroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,40 @@ def raise_exceptions(param):
except Exception as e:
response.result1 = f"Error: {str(e)}"
return response.json()


@pkg_exceptiongroup.route("/exceptiongroup_propagation")
def pkg_exceptiongroup_propagation_view():
from exceptiongroup import ExceptionGroup

from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted

response = ResultResponse(request.args.get("package_param"))
try:
package_param = request.args.get("package_param", "default message")

if not is_pyobject_tainted(package_param):
response.result1 = "Error: package_param is not tainted"
return response.json()

def raise_exceptions(param):
raise ExceptionGroup(
"Multiple errors", [ValueError(f"First error with {param}"), TypeError(f"Second error with {param}")]
)

try:
raise_exceptions(package_param)
except ExceptionGroup as eg:
caught_exceptions = eg

if caught_exceptions:
result_output = "\n".join(f"{type(ex).__name__}: {str(ex)}" for ex in caught_exceptions.exceptions)
else:
result_output = "Error: No exceptions caught"

response.result1 = (
"OK" if is_pyobject_tainted(package_param) else "Error: result is not tainted: %s" % result_output
)
except Exception as e:
response.result1 = f"Error: {str(e)}"
return response.json()
17 changes: 17 additions & 0 deletions tests/appsec/iast_packages/packages/pkg_idna.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,20 @@ def pkg_idna_view():
response.result1 = idna.decode(response.package_param)
response.result2 = str(idna.encode(response.result1), encoding="utf-8")
return response.json()


@pkg_idna.route("/idna_propagation")
def pkg_idna_propagation_view():
import idna

from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted

response = ResultResponse(request.args.get("package_param"))
if not is_pyobject_tainted(response.package_param):
response.result1 = "Error: package_param is not tainted"
return response.json()

response.result1 = idna.decode(response.package_param)
res = str(idna.encode(response.result1), encoding="utf-8")
response.result1 = "OK" if is_pyobject_tainted(res) else "Error: result is not tainted"
return response.json()
51 changes: 45 additions & 6 deletions tests/appsec/iast_packages/packages/pkg_iniconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,18 @@ def pkg_iniconfig_view():
response = ResultResponse(request.args.get("package_param"))

try:
# Not using the argument for this one because it eats the newline characters
ini_content = "[section]\nkey=value"
# ini_content = request.args.get("package_param", "[section]\nkey=value")
value = request.args.get("package_param", "test1234")
ini_content = f"[section]\nkey={value}"
ini_path = "example.ini"

try:
# Write the ini content to a file
with open(ini_path, "w") as f:
f.write(ini_content)

# Read and parse the ini file
config = iniconfig.IniConfig(ini_path)
parsed_data = {section.name: list(section.items()) for section in config}
result_output = f"Parsed INI data: {parsed_data}"

# Clean up the created ini file
if os.path.exists(ini_path):
os.remove(ini_path)
except Exception as e:
Expand All @@ -48,3 +44,46 @@ def pkg_iniconfig_view():
response.result1 = f"Error: {str(e)}"

return jsonify(response.json())


@pkg_iniconfig.route("/iniconfig_propagation")
def pkg_iniconfig_propagation_view():
import iniconfig

from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted

response = ResultResponse(request.args.get("package_param"))
try:
value = request.args.get("package_param", "test1234")
if not is_pyobject_tainted(value):
response.result1 = "Error: package_param is not tainted"
return jsonify(response.json())

ini_content = f"[section]\nkey={value}"
if not is_pyobject_tainted(ini_content):
response.result1 = f"Error: combined ini_content is not tainted: {ini_content}"
return jsonify(response.json())

ini_path = "example.ini"

try:
with open(ini_path, "w") as f:
f.write(ini_content)

config = iniconfig.IniConfig(ini_path)
parsed_data = {section.name: list(section.items()) for section in config}
value = parsed_data["section"][0][1]
result_output = (
"OK" if is_pyobject_tainted(value) else f"Error: value from parsed_data is not tainted: {value}"
)

if os.path.exists(ini_path):
os.remove(ini_path)
except Exception as e:
result_output = f"Error: {str(e)}"
except Exception as e:
result_output = f"Error: {str(e)}"

response.result1 = result_output

return jsonify(response.json())
Loading

0 comments on commit c0bc1eb

Please sign in to comment.