diff --git a/awscrt/cbor.py b/awscrt/cbor.py index 4f2fdcf9..33121d02 100644 --- a/awscrt/cbor.py +++ b/awscrt/cbor.py @@ -55,6 +55,7 @@ def get_encoded_data(self) -> bytes: return _awscrt.cbor_encoder_get_encoded_data(self._binding) def write_int(self, val: int): + # TODO: maybe not support bignum for now. Not needed? """Write an int as cbor formatted, val less than -2^64 will be encoded as Negative bignum for CBOR val between -2^64 to -1, inclusive, will be encode as negative integer for CBOR @@ -156,46 +157,13 @@ def write_null(self): def write_bool(self, val: bool): return _awscrt.cbor_encoder_write_bool(self._binding, val) - def write_data_item(self, data_item: Any): - """Generic API to write any type of an data_item as cbor formatted. - TODO: timestamp <-> datetime?? Decimal fraction <-> decimal?? - - Args: - data_item (Any): any type of data_item. If the type is not supported to be converted to cbor format, ValueError will be raised. - """ - if isinstance(data_item, str): - self.write_string(data_item) - elif isinstance(data_item, bytes): - self.write_bytes(data_item) - elif isinstance(data_item, int): - self.write_int(data_item) - elif isinstance(data_item, float): - self.write_float(data_item) - elif isinstance(data_item, dict): - self.write_dict(data_item) - elif isinstance(data_item, list): - self.write_list(data_item) - elif isinstance(data_item, bool): - self.write_bool(data_item) - elif data_item is None: - self.write_null() - else: - raise ValueError(f"not supported type for data_item: {data_item}") - def write_list(self, val: list): - # return _awscrt.cbor_encoder_write_py_list(self._binding, val) - self.write_array_start(len(val)) - for data_item in val: - self.write_data_item(data_item) + return _awscrt.cbor_encoder_write_py_list(self._binding, val) def write_dict(self, val: dict): - # return _awscrt.cbor_encoder_write_py_dict(self._binding, val) - self.write_map_start(len(val)) - for key, value in val.items(): - self.write_data_item(key) - self.write_data_item(value) + return _awscrt.cbor_encoder_write_py_dict(self._binding, val) - def write_data_item_2(self, data_item: Any): + def write_data_item(self, data_item: Any): """Generic API to write any type of an data_item as cbor formatted. TODO: timestamp <-> datetime?? Decimal fraction <-> decimal?? @@ -253,133 +221,11 @@ def pop_next_map_start(self) -> int: def pop_next_tag_val(self) -> int: return _awscrt.cbor_decoder_pop_next_tag_val(self._binding) - def pop_next_numeric(self) -> Union[int, float]: - type = _awscrt.cbor_decoder_peek_type(self._binding) - if type == AwsCborElementType.UnsignedInt: - return self.pop_next_unsigned_int() - elif type == AwsCborElementType.NegativeInt: - return self.pop_next_negative_int() - elif type == AwsCborElementType.Float: - return self.pop_next_double() - # TODO: support bignum? - # TODO: Instead of ValueError, probably raise the same error from C with the same AWS_ERROR_CBOR_UNEXPECTED_TYPE - raise ValueError("the cbor src is not a numeric type to decode") - - def pop_next_inf_bytes(self) -> bytes: - type = _awscrt.cbor_decoder_peek_type(self._binding) - if type != AwsCborElementType.InfBytes: - raise ValueError("the cbor src is not an indefinite bytes to decode") - result = b"" - # Consume the inf_bytes - self.consume_next_element() - while type != AwsCborElementType.Break: - result += self.pop_next_bytes() - type = _awscrt.cbor_decoder_peek_type(self._binding) - # Consume the break - self.consume_next_element() - return result - - def pop_next_inf_str(self) -> bytes: - type = _awscrt.cbor_decoder_peek_type(self._binding) - if type != AwsCborElementType.InfStr: - raise ValueError("the cbor src is not an indefinite string to decode") - result = "" - # Consume the inf_str - self.consume_next_element() - while type != AwsCborElementType.Break: - result += self.pop_next_str() - type = _awscrt.cbor_decoder_peek_type(self._binding) - # Consume the break - self.consume_next_element() - return result - def pop_next_list(self) -> list: - # return _awscrt.cbor_decoder_pop_next_py_list(self._binding) - type = _awscrt.cbor_decoder_peek_type(self._binding) - return_val = [] - if type == AwsCborElementType.InfArray: - # Consume the inf_array - self.consume_next_element() - while type != AwsCborElementType.Break: - return_val.append(self.pop_next_data_item()) - type = _awscrt.cbor_decoder_peek_type(self._binding) - # Consume the break - self.consume_next_element() - return return_val - elif type == AwsCborElementType.ArrayStart: - number_elements = self.pop_next_array_start() - for i in range(number_elements): - return_val.append(self.pop_next_data_item()) - return return_val - else: - raise ValueError("the cbor src is not a list to decode") + return _awscrt.cbor_decoder_pop_next_py_list(self._binding) def pop_next_map(self) -> dict: - # return _awscrt.cbor_decoder_pop_next_py_dict(self._binding) - type = _awscrt.cbor_decoder_peek_type(self._binding) - return_val = {} - if type == AwsCborElementType.InfMap: - # Consume the inf_map - self.consume_next_element() - while type != AwsCborElementType.Break: - return_val[self.pop_next_data_item()] = self.pop_next_data_item() - type = _awscrt.cbor_decoder_peek_type(self._binding) - # Consume the break - self.consume_next_element() - return return_val - elif type == AwsCborElementType.MapStart: - number_elements = self.pop_next_map_start() - for i in range(number_elements): - key = self.pop_next_data_item() - value = self.pop_next_data_item() - return_val[key] = value - return return_val - else: - raise ValueError("the cbor src is not a map to decode") + return _awscrt.cbor_decoder_pop_next_py_dict(self._binding) def pop_next_data_item(self) -> Any: - # TODO: timestamp, decimal fraction - # TODO: maybe wrote all those if elif in the binding level, so that we can use switch at least??? - # And possible to avoid some call cross language boundary??? - # TODO: If it fails in the middle, with bunch of stuff already popped. Do we want a way to resume?? - type = _awscrt.cbor_decoder_peek_type(self._binding) - if type == AwsCborElementType.UnsignedInt or \ - type == AwsCborElementType.NegativeInt or \ - type == AwsCborElementType.Float: - return self.pop_next_numeric() - elif type == AwsCborElementType.Bytes: - return self.pop_next_bytes() - elif type == AwsCborElementType.String: - return self.pop_next_str() - elif type == AwsCborElementType.Bool: - return self.pop_next_bool() - elif type == AwsCborElementType.Null or \ - type == AwsCborElementType.Undefined: - # Treat both NULL and Undefined as None. - self.consume_next_element() - return None - elif type == AwsCborElementType.ArrayStart or \ - type == AwsCborElementType.InfArray: - return self.pop_next_list() - elif type == AwsCborElementType.MapStart or \ - type == AwsCborElementType.InfMap: - return self.pop_next_map() - elif type == AwsCborElementType.InfBytes: - return self.pop_next_inf_bytes() - elif type == AwsCborElementType.InfStr: - return self.pop_next_inf_str() - elif type == AwsCborElementType.Tag: - tag_val = self.pop_next_tag_val() - if tag_val == AwsCborTags.NegativeBigNum: - bytes_val = self.pop_next_bytes() - return -1 - int.from_bytes(bytes_val, "big") - elif tag_val == AwsCborTags.UnsignedBigNum: - bytes_val = self.pop_next_bytes() - return int.from_bytes(bytes_val, "big") - else: - raise ValueError(f"unsupported tag value: {tag_val}") - else: - raise ValueError(f"unsupported type: {type.name}") - - def pop_next_data_item_2(self) -> Any: return _awscrt.cbor_decoder_pop_next_data_item(self._binding) diff --git a/benchmark_cbor.py b/benchmark_cbor.py deleted file mode 100644 index 1486b375..00000000 --- a/benchmark_cbor.py +++ /dev/null @@ -1,108 +0,0 @@ -from awscrt.cbor import * -import random -import time -import cbor2 - - -def ns_to_secs(ns: int) -> float: - return ns / 1_000_000_000.0 - - -def bytes_to_MiB(bytes: int) -> float: - return bytes / float(1024**2) - - -class TestData: - # generate predictable, but variable test values of different types - @staticmethod - def random_value(i=0, seed=0): - r = random.Random(i + seed) # use the index as the seed for predictable results - random_number = TestData.random_number(r, 5) - if random_number == 0: - return f"Some String value {i}" - elif random_number == 1: - return r.random() # a float value - elif random_number == 2: - return TestData.random_number(r, 100000) # a large integer - elif random_number == 3: - return list(range(TestData.random_number(r, 100))) # an array - elif random_number == 4: - return {"a": 1, "b": 2, "c": 3} # a hash - else: - return "generic string" - - # generate a predictable, but variable hash with a range of data types - @staticmethod - def test_hash(n_keys=5, seed=0): - return {f"key{i}": TestData.random_value(i, seed) for i in range(n_keys)} - - @staticmethod - def random_number(r, n): - return int(r.random() * n) - - -t = TestData.test_hash(100000) - - -# print(t) - -print("cbor2 -- encode") -run_start_ns = time.perf_counter_ns() -cbor2_encoded = cbor2.dumps(t) -run_secs = ns_to_secs(time.perf_counter_ns() - run_start_ns) -print(f"encoded MB: {bytes_to_MiB(len(cbor2_encoded))}") -print(f"time passed: {run_secs} secs") - - -print("CRT -- encode") -encoder = AwsCborEncoder() - -run_start_ns = time.perf_counter_ns() -encoder.write_data_item(t) -encoded = encoder.get_encoded_data() -run_secs = ns_to_secs(time.perf_counter_ns() - run_start_ns) -print(f"encoded MB: {bytes_to_MiB(len(encoded))}") -print(f"time passed: {run_secs} secs") - - -print("CRT -- encode 2") -encoder_2 = AwsCborEncoder() -run_start_ns = time.perf_counter_ns() -try: - encoder_2.write_data_item_2(t) - encoded_2 = encoder_2.get_encoded_data() -except Exception as e: - print(e) -run_secs = ns_to_secs(time.perf_counter_ns() - run_start_ns) -print(f"encoded MB: {bytes_to_MiB(len(encoded_2))}") -print(f"time passed: {run_secs} secs") - - -print(cbor2_encoded == encoded) -print(cbor2_encoded == encoded_2) - -print("cbor2 -- decode") -run_start_ns = time.perf_counter_ns() -decoded = cbor2.loads(encoded) -run_secs = ns_to_secs(time.perf_counter_ns() - run_start_ns) -print(f"time passed: {run_secs} secs") - -print("CRT -- decode") -run_start_ns = time.perf_counter_ns() -decoder = AwsCborDecoder(encoded) -crt_decoded = decoder.pop_next_data_item() - -run_secs = ns_to_secs(time.perf_counter_ns() - run_start_ns) -print(f"time passed: {run_secs} secs") - - -print("CRT -- decode 2") -run_start_ns = time.perf_counter_ns() -decoder_2 = AwsCborDecoder(encoded) -crt_decoded_2 = decoder_2.pop_next_data_item_2() - -run_secs = ns_to_secs(time.perf_counter_ns() - run_start_ns) -print(f"time passed: {run_secs} secs") - -print(crt_decoded == t) -print(crt_decoded_2 == t) diff --git a/source/cbor.c b/source/cbor.c index fc754c36..33ea4e39 100644 --- a/source/cbor.c +++ b/source/cbor.c @@ -642,7 +642,7 @@ static PyObject *s_cbor_decoder_pop_next_tag_to_pyobject(struct aws_cbor_decoder case AWS_CBOR_TAG_NEGATIVE_BIGNUM: case AWS_CBOR_TAG_DECIMAL_FRACTION: default: - PyErr_SetString(PyExc_ValueError, "Unsupported tag value: %" PRIu64 ".", out_tag_val); + PyErr_Format(PyExc_ValueError, "Unsupported tag value: %" PRIu64 ".", out_tag_val); return NULL; } Py_RETURN_NONE; diff --git a/test/test_cbor.py b/test/test_cbor.py index c3b16233..d9e4abc7 100644 --- a/test/test_cbor.py +++ b/test/test_cbor.py @@ -30,7 +30,7 @@ def test_cbor_encode_decode_int(self): self.assertTrue(False) for val in val_to_write: - t = decoder.pop_next_numeric() + t = decoder.pop_next_data_item() self.assertEqual(t, val) self.assertEqual(decoder.get_remaining_bytes_len(), 0) @@ -39,7 +39,7 @@ def test_cbor_encode_decode_data_item(self): encoder = AwsCborEncoder() numerics = [-100.12, 100.0, -100, 100, 2**64 - 1, -2**64, 18446744073709551616.0] another_map = { - # "bignum": 2**65, TODO: big number are not supported from C impl yet. + # "bignum": 2**65, # TODO: big number are not supported from C impl yet. # "negative bignum": -2**75, 2**6: [1, 2, 3], -2**6: [1, ["2", b"3"], {"most complicated": numerics}, 2**6, -2**7] @@ -57,10 +57,12 @@ def test_cbor_encode_decode_data_item(self): "empty str": "", "empty bytes": b"", } - encoder.write_data_item_2(val_to_write) + encoder.write_data_item(val_to_write) decoder = AwsCborDecoder(encoder.get_encoded_data()) # Temp val only for easier to debug. t = decoder.pop_next_data_item() self.assertEqual(val_to_write, t) + +# TODO: More tests: inf str/bytes/array/map