Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions awswrangler/timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def _write_batch(
table: str,
cols_names: List[str],
measure_type: str,
version: int,
batch: List[Any],
boto3_primitives: _utils.Boto3PrimitivesType,
) -> List[Dict[str, str]]:
Expand Down Expand Up @@ -59,6 +60,7 @@ def _write_batch(
"MeasureValue": str(rec[1]),
"Time": str(round(rec[0].timestamp() * 1_000)),
"TimeUnit": "MILLISECONDS",
"Version": version,
}
for rec in batch
],
Expand Down Expand Up @@ -117,6 +119,7 @@ def write(
time_col: str,
measure_col: str,
dimensions_cols: List[str],
version: int = 1,
num_threads: int = 32,
boto3_session: Optional[boto3.Session] = None,
) -> List[Dict[str, str]]:
Expand All @@ -136,6 +139,9 @@ def write(
DataFrame column name to be used as measure.
dimensions_cols : List[str]
List of DataFrame column names to be used as dimensions.
version : int
Version number used for upserts.
Documentation https://docs.aws.amazon.com/timestream/latest/developerguide/API_WriteRecords.html.
num_threads : str
Number of thread to be used for concurrent writing.
boto3_session : boto3.Session(), optional
Expand Down Expand Up @@ -185,6 +191,7 @@ def write(
itertools.repeat(table),
itertools.repeat(cols_names),
itertools.repeat(measure_type),
itertools.repeat(version),
batches,
itertools.repeat(_utils.boto3_to_primitives(boto3_session=boto3_session)),
)
Expand Down
53 changes: 53 additions & 0 deletions tests/test_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,59 @@ def test_basic_scenario(timestream_database_and_table):
assert df.shape == (3, 8)


def test_versioned(timestream_database_and_table):
name = timestream_database_and_table
time = [datetime.now(), datetime.now(), datetime.now()]
dfs = [
pd.DataFrame(
{
"time": time,
"dim0": ["foo", "boo", "bar"],
"dim1": [1, 2, 3],
"measure": [1.0, 1.1, 1.2],
}
),
pd.DataFrame(
{
"time": time,
"dim0": ["foo", "boo", "bar"],
"dim1": [1, 2, 3],
"measure": [1.0, 1.1, 1.9],
}
),
pd.DataFrame(
{
"time": time,
"dim0": ["foo", "boo", "bar"],
"dim1": [1, 2, 3],
"measure": [1.0, 1.1, 1.9],
}
),
]
versions = [1, 1, 2]
rejected_rec_nums = [0, 1, 0]
for df, version, rejected_rec_num in zip(dfs, versions, rejected_rec_nums):
rejected_records = wr.timestream.write(
df=df,
database=name,
table=name,
time_col="time",
measure_col="measure",
dimensions_cols=["dim0", "dim1"],
version=version,
)
assert len(rejected_records) == rejected_rec_num
df_out = wr.timestream.query(
f"""
SELECT
*
FROM "{name}"."{name}"
DESC LIMIT 10
"""
)
assert df_out.shape == (3, 5)


def test_real_csv_load_scenario(timestream_database_and_table):
name = timestream_database_and_table
df = pd.read_csv(
Expand Down