From 050f2dad001d1e328daddfc0bc87a1a84d23e034 Mon Sep 17 00:00:00 2001 From: Mateo Gianolio Date: Wed, 16 Mar 2022 22:25:06 +0100 Subject: [PATCH 1/5] Add support for Timestream multi-measure records --- awswrangler/timestream.py | 41 +++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/awswrangler/timestream.py b/awswrangler/timestream.py index 660b7edb5..fcdd45dc7 100644 --- a/awswrangler/timestream.py +++ b/awswrangler/timestream.py @@ -31,7 +31,9 @@ def _write_batch( database: str, table: str, cols_names: List[str], - measure_type: str, + measure_name: str, + measure_cols: List[str], + measure_types: List[str], version: int, batch: List[Any], boto3_primitives: _utils.Boto3PrimitivesType, @@ -43,6 +45,10 @@ def _write_batch( botocore_config=Config(read_timeout=20, max_pool_connections=5000, retries={"max_attempts": 10}), ) try: + time_loc = 0 + measure_cols_loc = 1 + dimensions_cols_loc = 1 + len(measure_cols) + _utils.try_it( f=client.write_records, ex=(client.exceptions.ThrottlingException, client.exceptions.InternalServerException), @@ -53,12 +59,19 @@ def _write_batch( { "Dimensions": [ {"Name": name, "DimensionValueType": "VARCHAR", "Value": str(value)} - for name, value in zip(cols_names[2:], rec[2:]) + for name, value in zip(cols_names[dimensions_cols_loc:], rec[dimensions_cols_loc:]) ], - "MeasureName": cols_names[1], - "MeasureValueType": measure_type, - "MeasureValue": str(rec[1]), - "Time": str(round(rec[0].timestamp() * 1_000)), + "MeasureName": measure_cols[0] if len(measure_cols) == 1 else None, + "MeasureValueType": measure_types[0] if len(measure_types) == 1 else "MULTI", + "MeasureValue": str(rec[measure_cols_loc]) + if len(measure_cols) == 1 + else ( + {"Name": measure_name, "Value": str(measure_value), "Type": measure_value_type} + for measure_name, measure_value, measure_value_type in zip( + measure_cols, rec[measure_cols_loc:dimensions_cols_loc], measure_types + ) + ), + "Time": str(round(rec[time_loc].timestamp() * 1_000)), "TimeUnit": "MILLISECONDS", "Version": version, } @@ -149,6 +162,7 @@ def write( table: str, time_col: str, measure_col: str, + measure_cols: List[str], dimensions_cols: List[str], version: int = 1, num_threads: int = 32, @@ -168,6 +182,8 @@ def write( DataFrame column name to be used as time. MUST be a timestamp column. measure_col : str DataFrame column name to be used as measure. + measure_cols : List[str] + DataFrame column names to be used as measures. Takes precedence over measure_col. dimensions_cols : List[str] List of DataFrame column names to be used as dimensions. version : int @@ -208,9 +224,13 @@ def write( >>> assert len(rejected_records) == 0 """ - measure_type: str = _data_types.timestream_type_from_pandas(df[[measure_col]]) - _logger.debug("measure_type: %s", measure_type) - cols_names: List[str] = [time_col, measure_col] + dimensions_cols + measure_cols: List[str] = measure_cols or [measure_col] + _logger.debug("measure_cols: %s", measure_cols) + measure_types: List[str] = [ + _data_types.timestream_type_from_pandas(df[[measure_col]]) for measure_col in measure_cols + ] + _logger.debug("measure_types: %s", measure_types) + cols_names: List[str] = [time_col] + measure_cols + dimensions_cols _logger.debug("cols_names: %s", cols_names) batches: List[List[Any]] = _utils.chunkify(lst=_df2list(df=df[cols_names]), max_length=100) _logger.debug("len(batches): %s", len(batches)) @@ -221,7 +241,8 @@ def write( itertools.repeat(database), itertools.repeat(table), itertools.repeat(cols_names), - itertools.repeat(measure_type), + itertools.repeat(measure_cols), + itertools.repeat(measure_types), itertools.repeat(version), batches, itertools.repeat(_utils.boto3_to_primitives(boto3_session=boto3_session)), From 8c6289d6c78c1c1e95691fbd0917582582152bf6 Mon Sep 17 00:00:00 2001 From: Mateo Gianolio Date: Wed, 16 Mar 2022 22:55:42 +0100 Subject: [PATCH 2/5] Move multi-measure values to correct key MeasureValues --- awswrangler/timestream.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/awswrangler/timestream.py b/awswrangler/timestream.py index fcdd45dc7..738c48ab6 100644 --- a/awswrangler/timestream.py +++ b/awswrangler/timestream.py @@ -31,7 +31,6 @@ def _write_batch( database: str, table: str, cols_names: List[str], - measure_name: str, measure_cols: List[str], measure_types: List[str], version: int, @@ -63,14 +62,15 @@ def _write_batch( ], "MeasureName": measure_cols[0] if len(measure_cols) == 1 else None, "MeasureValueType": measure_types[0] if len(measure_types) == 1 else "MULTI", - "MeasureValue": str(rec[measure_cols_loc]) - if len(measure_cols) == 1 - else ( + "MeasureValue": str(rec[measure_cols_loc]) if len(measure_cols) == 1 else None, + "MeasureValues": [ {"Name": measure_name, "Value": str(measure_value), "Type": measure_value_type} for measure_name, measure_value, measure_value_type in zip( measure_cols, rec[measure_cols_loc:dimensions_cols_loc], measure_types ) - ), + ] + if len(measure_cols) > 1 + else None, "Time": str(round(rec[time_loc].timestamp() * 1_000)), "TimeUnit": "MILLISECONDS", "Version": version, From 5691abe09b37c2ef211adeecd5830aeb3344f507 Mon Sep 17 00:00:00 2001 From: Mateo Gianolio Date: Wed, 16 Mar 2022 23:20:42 +0100 Subject: [PATCH 3/5] Fix variable definition error --- awswrangler/timestream.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/awswrangler/timestream.py b/awswrangler/timestream.py index 738c48ab6..d5c32210b 100644 --- a/awswrangler/timestream.py +++ b/awswrangler/timestream.py @@ -31,7 +31,7 @@ def _write_batch( database: str, table: str, cols_names: List[str], - measure_cols: List[str], + measure_cols_names: List[str], measure_types: List[str], version: int, batch: List[Any], @@ -46,7 +46,7 @@ def _write_batch( try: time_loc = 0 measure_cols_loc = 1 - dimensions_cols_loc = 1 + len(measure_cols) + dimensions_cols_loc = 1 + len(measure_cols_names) _utils.try_it( f=client.write_records, @@ -60,16 +60,16 @@ def _write_batch( {"Name": name, "DimensionValueType": "VARCHAR", "Value": str(value)} for name, value in zip(cols_names[dimensions_cols_loc:], rec[dimensions_cols_loc:]) ], - "MeasureName": measure_cols[0] if len(measure_cols) == 1 else None, + "MeasureName": measure_cols_names[0] if len(measure_cols_names) == 1 else None, "MeasureValueType": measure_types[0] if len(measure_types) == 1 else "MULTI", - "MeasureValue": str(rec[measure_cols_loc]) if len(measure_cols) == 1 else None, + "MeasureValue": str(rec[measure_cols_loc]) if len(measure_cols_names) == 1 else None, "MeasureValues": [ {"Name": measure_name, "Value": str(measure_value), "Type": measure_value_type} for measure_name, measure_value, measure_value_type in zip( - measure_cols, rec[measure_cols_loc:dimensions_cols_loc], measure_types + measure_cols_names, rec[measure_cols_loc:dimensions_cols_loc], measure_types ) ] - if len(measure_cols) > 1 + if len(measure_cols_names) > 1 else None, "Time": str(round(rec[time_loc].timestamp() * 1_000)), "TimeUnit": "MILLISECONDS", @@ -224,13 +224,13 @@ def write( >>> assert len(rejected_records) == 0 """ - measure_cols: List[str] = measure_cols or [measure_col] - _logger.debug("measure_cols: %s", measure_cols) + measure_cols_names: List[str] = measure_cols or [measure_col] + _logger.debug("measure_cols_names: %s", measure_cols_names) measure_types: List[str] = [ - _data_types.timestream_type_from_pandas(df[[measure_col]]) for measure_col in measure_cols + _data_types.timestream_type_from_pandas(df[[measure_col_name]]) for measure_col_name in measure_cols_names ] _logger.debug("measure_types: %s", measure_types) - cols_names: List[str] = [time_col] + measure_cols + dimensions_cols + cols_names: List[str] = [time_col] + measure_cols_names + dimensions_cols _logger.debug("cols_names: %s", cols_names) batches: List[List[Any]] = _utils.chunkify(lst=_df2list(df=df[cols_names]), max_length=100) _logger.debug("len(batches): %s", len(batches)) @@ -241,7 +241,7 @@ def write( itertools.repeat(database), itertools.repeat(table), itertools.repeat(cols_names), - itertools.repeat(measure_cols), + itertools.repeat(measure_cols_names), itertools.repeat(measure_types), itertools.repeat(version), batches, From 1f2412198a5e218762043d549ceb361e5799bc47 Mon Sep 17 00:00:00 2001 From: Anton Kukushkin <3997468+kukushking@users.noreply.github.com> Date: Mon, 21 Mar 2022 12:57:53 +0000 Subject: [PATCH 4/5] Multiple meaure timestream write fix --- awswrangler/timestream.py | 68 +++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/awswrangler/timestream.py b/awswrangler/timestream.py index d5c32210b..88805501b 100644 --- a/awswrangler/timestream.py +++ b/awswrangler/timestream.py @@ -10,7 +10,7 @@ import pandas as pd from botocore.config import Config -from awswrangler import _data_types, _utils +from awswrangler import _data_types, _utils, exceptions _logger: logging.Logger = logging.getLogger(__name__) @@ -47,36 +47,36 @@ def _write_batch( time_loc = 0 measure_cols_loc = 1 dimensions_cols_loc = 1 + len(measure_cols_names) - + records: List[Dict[str, Any]] = [] + for rec in batch: + record: Dict[str, Any] = { + "Dimensions": [ + {"Name": name, "DimensionValueType": "VARCHAR", "Value": str(value)} + for name, value in zip(cols_names[dimensions_cols_loc:], rec[dimensions_cols_loc:]) + ], + "Time": str(round(rec[time_loc].timestamp() * 1_000)), + "TimeUnit": "MILLISECONDS", + "Version": version, + } + if len(measure_cols_names) == 1: + record["MeasureName"] = measure_cols_names[0] + record["MeasureValueType"] = measure_types[0] + record["MeasureValue"] = str(rec[measure_cols_loc]) + else: + record["MeasureValues"] = [ + {"Name": measure_name, "Value": str(measure_value), "Type": measure_value_type} + for measure_name, measure_value, measure_value_type in zip( + measure_cols_names, rec[measure_cols_loc:dimensions_cols_loc], measure_types + ) + ] + records.append(record) _utils.try_it( f=client.write_records, ex=(client.exceptions.ThrottlingException, client.exceptions.InternalServerException), max_num_tries=5, DatabaseName=database, TableName=table, - Records=[ - { - "Dimensions": [ - {"Name": name, "DimensionValueType": "VARCHAR", "Value": str(value)} - for name, value in zip(cols_names[dimensions_cols_loc:], rec[dimensions_cols_loc:]) - ], - "MeasureName": measure_cols_names[0] if len(measure_cols_names) == 1 else None, - "MeasureValueType": measure_types[0] if len(measure_types) == 1 else "MULTI", - "MeasureValue": str(rec[measure_cols_loc]) if len(measure_cols_names) == 1 else None, - "MeasureValues": [ - {"Name": measure_name, "Value": str(measure_value), "Type": measure_value_type} - for measure_name, measure_value, measure_value_type in zip( - measure_cols_names, rec[measure_cols_loc:dimensions_cols_loc], measure_types - ) - ] - if len(measure_cols_names) > 1 - else None, - "Time": str(round(rec[time_loc].timestamp() * 1_000)), - "TimeUnit": "MILLISECONDS", - "Version": version, - } - for rec in batch - ], + Records=records, ) except client.exceptions.RejectedRecordsException as ex: return cast(List[Dict[str, str]], ex.response["RejectedRecords"]) @@ -161,9 +161,9 @@ def write( database: str, table: str, time_col: str, - measure_col: str, - measure_cols: List[str], - dimensions_cols: List[str], + measure_col: Optional[str] = None, + measure_cols: Optional[List[str]] = None, + dimensions_cols: Optional[List[str]] = None, version: int = 1, num_threads: int = 32, boto3_session: Optional[boto3.Session] = None, @@ -180,11 +180,11 @@ def write( Amazon Timestream table name. time_col : str DataFrame column name to be used as time. MUST be a timestamp column. - measure_col : str + measure_col : Optional[str] DataFrame column name to be used as measure. - measure_cols : List[str] + measure_cols : Optional[List[str]] DataFrame column names to be used as measures. Takes precedence over measure_col. - dimensions_cols : List[str] + dimensions_cols : Optional[List[str]] List of DataFrame column names to be used as dimensions. version : int Version number used for upserts. @@ -224,7 +224,11 @@ def write( >>> assert len(rejected_records) == 0 """ - measure_cols_names: List[str] = measure_cols or [measure_col] + if not measure_col and not measure_cols: + raise exceptions.InvalidArgumentCombination("Either `measure_col` or `measure_cols` is required.") + if not dimensions_cols: + raise exceptions.InvalidArgumentValue("`dimensions_cols` is required.") + measure_cols_names: List[str] = measure_cols if measure_cols else [measure_col] if measure_col else [] _logger.debug("measure_cols_names: %s", measure_cols_names) measure_types: List[str] = [ _data_types.timestream_type_from_pandas(df[[measure_col_name]]) for measure_col_name in measure_cols_names From 26339834ef497a8f800fad33edbb287cca466b54 Mon Sep 17 00:00:00 2001 From: Anton Kukushkin <3997468+kukushking@users.noreply.github.com> Date: Tue, 22 Mar 2022 12:16:51 +0000 Subject: [PATCH 5/5] Use measure_col for multi-measure records --- awswrangler/timestream.py | 23 +++++++++-------------- tests/test_timestream.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/awswrangler/timestream.py b/awswrangler/timestream.py index 88805501b..fc89b80f7 100644 --- a/awswrangler/timestream.py +++ b/awswrangler/timestream.py @@ -10,7 +10,7 @@ import pandas as pd from botocore.config import Config -from awswrangler import _data_types, _utils, exceptions +from awswrangler import _data_types, _utils _logger: logging.Logger = logging.getLogger(__name__) @@ -63,6 +63,8 @@ def _write_batch( record["MeasureValueType"] = measure_types[0] record["MeasureValue"] = str(rec[measure_cols_loc]) else: + record["MeasureName"] = measure_cols_names[0] + record["MeasureValueType"] = "MULTI" record["MeasureValues"] = [ {"Name": measure_name, "Value": str(measure_value), "Type": measure_value_type} for measure_name, measure_value, measure_value_type in zip( @@ -161,9 +163,8 @@ def write( database: str, table: str, time_col: str, - measure_col: Optional[str] = None, - measure_cols: Optional[List[str]] = None, - dimensions_cols: Optional[List[str]] = None, + measure_col: Union[str, List[str]], + dimensions_cols: List[str], version: int = 1, num_threads: int = 32, boto3_session: Optional[boto3.Session] = None, @@ -180,11 +181,9 @@ def write( Amazon Timestream table name. time_col : str DataFrame column name to be used as time. MUST be a timestamp column. - measure_col : Optional[str] - DataFrame column name to be used as measure. - measure_cols : Optional[List[str]] - DataFrame column names to be used as measures. Takes precedence over measure_col. - dimensions_cols : Optional[List[str]] + measure_col : Union[str, List[str]] + DataFrame column name(s) to be used as measure. + dimensions_cols : List[str] List of DataFrame column names to be used as dimensions. version : int Version number used for upserts. @@ -224,11 +223,7 @@ def write( >>> assert len(rejected_records) == 0 """ - if not measure_col and not measure_cols: - raise exceptions.InvalidArgumentCombination("Either `measure_col` or `measure_cols` is required.") - if not dimensions_cols: - raise exceptions.InvalidArgumentValue("`dimensions_cols` is required.") - measure_cols_names: List[str] = measure_cols if measure_cols else [measure_col] if measure_col else [] + measure_cols_names: List[str] = measure_col if isinstance(measure_col, list) else [measure_col] _logger.debug("measure_cols_names: %s", measure_cols_names) measure_types: List[str] = [ _data_types.timestream_type_from_pandas(df[[measure_col_name]]) for measure_col_name in measure_cols_names diff --git a/tests/test_timestream.py b/tests/test_timestream.py index 2c9a01132..a2d35de6e 100644 --- a/tests/test_timestream.py +++ b/tests/test_timestream.py @@ -180,3 +180,34 @@ def test_real_csv_load_scenario(timestream_database_and_table): assert len(rejected_records) == 0 df = wr.timestream.query(f'SELECT COUNT(*) AS counter FROM "{name}"."{name}"') assert df["counter"].iloc[0] == 126_000 + + +def test_multimeasure_scenario(timestream_database_and_table): + df = pd.DataFrame( + { + "time": [datetime.now(), datetime.now(), datetime.now()], + "dim0": ["foo", "boo", "bar"], + "dim1": [1, 2, 3], + "measure1": [1.0, 1.1, 1.2], + "measure2": [2.0, 2.1, 2.2], + } + ) + rejected_records = wr.timestream.write( + df=df, + database=timestream_database_and_table, + table=timestream_database_and_table, + time_col="time", + measure_col=["measure1", "measure2"], + dimensions_cols=["dim0", "dim1"], + ) + assert len(rejected_records) == 0 + df = wr.timestream.query( + f""" + SELECT + * + FROM "{timestream_database_and_table}"."{timestream_database_and_table}" + ORDER BY time + DESC LIMIT 10 + """, + ) + assert df.shape == (3, 6)