diff --git a/README.md b/README.md index 6e344184..54fee129 100644 --- a/README.md +++ b/README.md @@ -101,6 +101,7 @@ here. | `batch_detection_threshold` | `["integer", "null"]` | `5000`, or 1/40th `max_batch_rows` | How often, in rows received, to count the buffered rows and bytes to check if a flush is necessary. There's a slight performance penalty to checking the buffered records count or bytesize, so this controls how often this is polled in order to mitigate the penalty. This value is usually not necessary to set as the default is dynamically adjusted to check reasonably often. | | `state_support` | `["boolean", "null"]` | `True` | Whether the Target should emit `STATE` messages to stdout for further consumption. In this mode, which is on by default, STATE messages are buffered in memory until all the records that occurred before them are flushed according to the batch flushing schedule the target is configured with. | | `add_upsert_indexes` | `["boolean", "null"]` | `True` | Whether the Target should create column indexes on the important columns used during data loading. These indexes will make data loading slightly slower but the deduplication phase much faster. Defaults to on for better baseline performance. | +| `decimal_precision` | `["integer", "null"]` | `None`, the python default (28) | Set the precision of decimal operations. This is required to avoid `decimal.DivisionImpossible` errors for some taps which encode data type precision using JSON schema `minimum`/`maximum`/`multipleOf` validations. | | `before_run_sql` | `["string", "null"]` | `None` | Raw SQL statement(s) to execute as soon as the connection to Postgres is opened by the target. Useful for setup like `SET ROLE` or other connection state that is important. | | `after_run_sql` | `["string", "null"]` | `None` | Raw SQL statement(s) to execute as soon as the connection to Postgres is opened by the target. Useful for setup like `SET ROLE` or other connection state that is important. | diff --git a/target_postgres/target_tools.py b/target_postgres/target_tools.py index c01cc40b..82bdd447 100644 --- a/target_postgres/target_tools.py +++ b/target_postgres/target_tools.py @@ -1,3 +1,4 @@ +import decimal import http.client import io import json @@ -53,6 +54,10 @@ def stream_to_target(stream, target, config={}): max_batch_size = config.get('max_batch_size', 104857600) # 100MB batch_detection_threshold = config.get('batch_detection_threshold', max(max_batch_rows / 40, 50)) + prec = config.get('decimal_precision') + if prec: + decimal.getcontext().prec = prec + line_count = 0 for line in stream: _line_handler(state_tracker, diff --git a/tests/unit/test_target_tools.py b/tests/unit/test_target_tools.py index d12450a0..c387124d 100644 --- a/tests/unit/test_target_tools.py +++ b/tests/unit/test_target_tools.py @@ -154,6 +154,54 @@ class TestStream(ListStream): assert rows_persisted == expected_rows +def test_record_with_target_postgres_precision(): + values = [1, 1.0, 2, 2.0, 3, 7, 10.1] + records = [] + for value in values: + records.append({ + "type": "RECORD", + "stream": "test", + "record": {"multipleOfKey": value}, + }) + + class TestStream(ListStream): + stream = [ + { + "type": "SCHEMA", + "stream": "test", + "schema": { + "properties": { + "multipleOfKey": { + "type": [ + "null", + "number" + ], + "exclusiveMaximum": True, + "maximum": 100000000000000000000000000000000000000000000000000000000000000, + "multipleOf": 1e-38, + "exclusiveMinimum": True, + "minimum": -100000000000000000000000000000000000000000000000000000000000000 + } + } + }, + "key_properties": [] + } + ] + records + + target = Target() + + config = CONFIG.copy() + config['decimal_precision'] = 100 + target_tools.stream_to_target(TestStream(), target, config=config) + + expected_rows = len(records) + rows_persisted = 0 + for call in target.calls['write_batch']: + rows_persisted += call['records_count'] + + assert rows_persisted == expected_rows + + def test_state__capture(capsys): stream = [ json.dumps({'type': 'STATE', 'value': {'test': 'state-1'}}),