Skip to content

Commit

Permalink
Merge branch 'main' into gnufede/fix-requests-framework-test
Browse files Browse the repository at this point in the history
  • Loading branch information
gnufede committed May 23, 2024
2 parents e7866f2 + f251e5b commit 11321de
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 74 deletions.
4 changes: 4 additions & 0 deletions ddtrace/appsec/_iast/_taint_tracking/Aspects/AspectIndex.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "AspectIndex.h"
#include "Helpers.h"

/**
* @brief Index aspect
Expand Down Expand Up @@ -49,6 +50,9 @@ api_index_aspect(PyObject* self, PyObject* const* args, const Py_ssize_t nargs)
PyObject* idx = args[1];

PyObject* result_o = PyObject_GetItem(candidate_text, idx);
if (has_pyerr()) {
return nullptr;
}

if (not ctx_map or ctx_map->empty()) {
return result_o;
Expand Down
116 changes: 49 additions & 67 deletions ddtrace/appsec/_iast/_taint_tracking/aspects.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,13 @@ def join_aspect(orig_function: Optional[Callable], flag_added_args: int, *args:


def index_aspect(candidate_text: Text, index: int) -> Text:
result = candidate_text[index]
if isinstance(candidate_text, IAST.TEXT_TYPES) and isinstance(index, int):
try:
return _index_aspect(candidate_text, index)
except Exception as e:
iast_taint_log_error("IAST propagation error. index_aspect. {}".format(e))

if not isinstance(candidate_text, IAST.TEXT_TYPES) or not isinstance(index, int):
return result

try:
return _index_aspect(candidate_text, index)
except Exception as e:
iast_taint_log_error("IAST propagation error. index_aspect. {}".format(e))
return result
return candidate_text[index]


def slice_aspect(candidate_text: Text, start: int, stop: int, step: int) -> Text:
Expand All @@ -230,15 +227,12 @@ def slice_aspect(candidate_text: Text, start: int, stop: int, step: int) -> Text
or (step is not None and not isinstance(step, int))
):
return candidate_text[start:stop:step]
result = candidate_text[start:stop:step]

try:
new_result = _slice_aspect(candidate_text, start, stop, step)
if new_result != result:
raise Exception("Propagation result %r is different to candidate_text[slice] %r" % (new_result, result))
return new_result
return _slice_aspect(candidate_text, start, stop, step)
except Exception as e:
iast_taint_log_error("IAST propagation error. slice_aspect. {}".format(e))
return result
return candidate_text[start:stop:step]


def bytearray_extend_aspect(orig_function: Optional[Callable], flag_added_args: int, *args: Any, **kwargs: Any) -> Any:
Expand Down Expand Up @@ -319,29 +313,25 @@ def ljust_aspect(
candidate_text = args[0]
args = args[flag_added_args:]

result = candidate_text.ljust(*args, **kwargs)

if not isinstance(candidate_text, IAST.TEXT_TYPES):
return result

try:
ranges_new = get_ranges(candidate_text)
fillchar = parse_params(1, "fillchar", " ", *args, **kwargs)
fillchar_ranges = get_ranges(fillchar)
if ranges_new is None or (not ranges_new and not fillchar_ranges):
if isinstance(candidate_text, IAST.TEXT_TYPES):
try:
ranges_new = get_ranges(candidate_text)
fillchar = parse_params(1, "fillchar", " ", *args, **kwargs)
fillchar_ranges = get_ranges(fillchar)
if ranges_new is None or (not ranges_new and not fillchar_ranges):
return candidate_text.ljust(*args, **kwargs)

if fillchar_ranges:
# Can only be one char, so we create one range to cover from the start to the end
ranges_new = ranges_new + [shift_taint_range(fillchar_ranges[0], len(candidate_text))]

result = candidate_text.ljust(parse_params(0, "width", None, *args, **kwargs), fillchar)
taint_pyobject_with_ranges(result, ranges_new)
return result
except Exception as e:
iast_taint_log_error("IAST propagation error. ljust_aspect. {}".format(e))

if fillchar_ranges:
# Can only be one char, so we create one range to cover from the start to the end
ranges_new = ranges_new + [shift_taint_range(fillchar_ranges[0], len(candidate_text))]

new_result = candidate_text.ljust(parse_params(0, "width", None, *args, **kwargs), fillchar)
taint_pyobject_with_ranges(new_result, ranges_new)
return new_result
except Exception as e:
iast_taint_log_error("IAST propagation error. ljust_aspect. {}".format(e))

return result
return candidate_text.ljust(*args, **kwargs)


def zfill_aspect(
Expand Down Expand Up @@ -410,16 +400,14 @@ def format_aspect(
if not isinstance(candidate_text, IAST.TEXT_TYPES):
return result

try:
params = tuple(args) + tuple(kwargs.values())
new_result = _format_aspect(candidate_text, params, *args, **kwargs)
if new_result != result:
raise Exception("Propagation result %r is different to candidate_text.format %r" % (new_result, result))
return new_result
except Exception as e:
iast_taint_log_error("IAST propagation error. format_aspect. {}".format(e))
if isinstance(candidate_text, IAST.TEXT_TYPES):
try:
params = tuple(args) + tuple(kwargs.values())
return _format_aspect(candidate_text, params, *args, **kwargs)
except Exception as e:
iast_taint_log_error("IAST propagation error. format_aspect. {}".format(e))

return result
return candidate_text.format(*args, **kwargs)


def format_map_aspect(
Expand Down Expand Up @@ -684,18 +672,15 @@ def decode_aspect(
self = args[0]
args = args[(flag_added_args or 1) :]
# Assume we call decode method of the first argument
result = self.decode(*args, **kwargs)

if not is_pyobject_tainted(self) or not isinstance(self, bytes):
return result

try:
codec = args[0] if args else "utf-8"
inc_dec = codecs.getincrementaldecoder(codec)(**kwargs)
return incremental_translation(self, inc_dec, inc_dec.decode, "")
except Exception as e:
iast_taint_log_error("IAST propagation error. decode_aspect. {}".format(e))
return result
if is_pyobject_tainted(self) and isinstance(self, bytes):
try:
codec = args[0] if args else "utf-8"
inc_dec = codecs.getincrementaldecoder(codec)(**kwargs)
return incremental_translation(self, inc_dec, inc_dec.decode, "")
except Exception as e:
iast_taint_log_error("IAST propagation error. decode_aspect. {}".format(e))
return self.decode(*args, **kwargs)


def encode_aspect(
Expand All @@ -708,18 +693,15 @@ def encode_aspect(

self = args[0]
args = args[(flag_added_args or 1) :]
# Assume we call encode method of the first argument
result = self.encode(*args, **kwargs)

if not is_pyobject_tainted(self) or not isinstance(self, str):
return result

try:
codec = args[0] if args else "utf-8"
inc_enc = codecs.getincrementalencoder(codec)(**kwargs)
return incremental_translation(self, inc_enc, inc_enc.encode, b"")
except Exception as e:
iast_taint_log_error("IAST propagation error. encode_aspect. {}".format(e))
if is_pyobject_tainted(self) and isinstance(self, str):
try:
codec = args[0] if args else "utf-8"
inc_enc = codecs.getincrementalencoder(codec)(**kwargs)
return incremental_translation(self, inc_enc, inc_enc.encode, b"")
except Exception as e:
iast_taint_log_error("IAST propagation error. encode_aspect. {}".format(e))
result = self.encode(*args, **kwargs)
return result


Expand Down
26 changes: 24 additions & 2 deletions tests/appsec/iast/aspects/test_encode_decode_aspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def test_encode_and_add_aspect(infix, args, kwargs, should_be_tainted, prefix, s
assert list_ranges[0].length == len_infix


def test_encode_error_and_no_log_metric(telemetry_writer):
def test_encode_error_with_tainted_gives_one_log_metric(telemetry_writer):
string_input = taint_pyobject(
pyobject="abcde",
source_name="test_add_aspect_tainting_left_hand",
Expand All @@ -141,11 +141,22 @@ def test_encode_error_and_no_log_metric(telemetry_writer):
with pytest.raises(LookupError):
mod.do_encode(string_input, "encoding-not-exists")

list_metrics_logs = list(telemetry_writer._logs)
assert len(list_metrics_logs) == 1
assert "message" in list_metrics_logs[0]
assert "IAST propagation error. encode_aspect. " in list_metrics_logs[0]["message"]


def test_encode_error_with_not_tainted_gives_no_log_metric(telemetry_writer):
string_input = "abcde"
with pytest.raises(LookupError):
mod.do_encode(string_input, "encoding-not-exists")

list_metrics_logs = list(telemetry_writer._logs)
assert len(list_metrics_logs) == 0


def test_dencode_error_and_no_log_metric(telemetry_writer):
def test_dencode_error_with_tainted_gives_one_log_metric(telemetry_writer):
string_input = taint_pyobject(
pyobject=b"abcde",
source_name="test_add_aspect_tainting_left_hand",
Expand All @@ -155,5 +166,16 @@ def test_dencode_error_and_no_log_metric(telemetry_writer):
with pytest.raises(LookupError):
mod.do_decode(string_input, "decoding-not-exists")

list_metrics_logs = list(telemetry_writer._logs)
assert len(list_metrics_logs) == 1
assert "message" in list_metrics_logs[0]
assert "IAST propagation error. decode_aspect. " in list_metrics_logs[0]["message"]


def test_dencode_error_with_not_tainted_gives_no_log_metric(telemetry_writer):
string_input = b"abcde"
with pytest.raises(LookupError):
mod.do_decode(string_input, "decoding-not-exists")

list_metrics_logs = list(telemetry_writer._logs)
assert len(list_metrics_logs) == 0
5 changes: 3 additions & 2 deletions tests/appsec/iast/aspects/test_index_aspect_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_string_index(input_str, index_pos, expected_result, tainted):


@pytest.mark.skipif(sys.version_info < (3, 9, 0), reason="Python version not supported by IAST")
def test_index_error_and_no_log_metric(telemetry_writer):
def test_index_error_with_tainted_gives_one_log_metric(telemetry_writer):
string_input = taint_pyobject(
pyobject="abcde",
source_name="test_add_aspect_tainting_left_hand",
Expand All @@ -75,7 +75,8 @@ def test_index_error_and_no_log_metric(telemetry_writer):
mod.do_index(string_input, 100)

list_metrics_logs = list(telemetry_writer._logs)
assert len(list_metrics_logs) == 0
assert len(list_metrics_logs) == 1
assert "IAST propagation error. index_aspect" in list_metrics_logs[0]["message"]


@pytest.mark.skip_iast_check_logs
Expand Down
6 changes: 3 additions & 3 deletions tests/appsec/iast/aspects/test_str_aspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,13 +527,13 @@ def test_aspect_ljust_str_tainted(self):
ljusted = mod.do_ljust(string_input, 4) # pylint: disable=no-member
assert as_formatted_evidence(ljusted) == ":+-foo-+: "

def test_aspect_ljust_error_and_no_log_metric(self, telemetry_writer):
def test_aspect_ljust_error_with_tainted_gives_one_log_metric(self, telemetry_writer):
string_input = create_taint_range_with_format(":+-foo-+:")
with pytest.raises(TypeError):
mod.do_ljust(string_input, "aaaaa")

list_metrics_logs = list(telemetry_writer._logs)
assert len(list_metrics_logs) == 0
assert len(list_metrics_logs) == 1

def test_zfill(self):
# Not tainted
Expand Down Expand Up @@ -572,4 +572,4 @@ def test_format(self):
result = mod.do_format_fill(string_input) # pylint: disable=no-member
# TODO format with params doesn't work correctly the assert should be
# assert as_formatted_evidence(result) == ":+-foo -+:"
assert as_formatted_evidence(result) == "foo "
assert as_formatted_evidence(result) == ":+-foo-+:"

0 comments on commit 11321de

Please sign in to comment.