Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions awswrangler/timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,199 @@ def query(sql: str, boto3_session: Optional[boto3.Session] = None) -> pd.DataFra
if col["type"] == "VARCHAR":
df[col["name"]] = df[col["name"]].astype("string")
return df


def create_database(
database: str,
kms_key_id: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
boto3_session: Optional[boto3.Session] = None,
) -> str:
"""Create a new Timestream database.

Note
----
If the KMS key is not specified, the database will be encrypted with a
Timestream managed KMS key located in your account.

Parameters
----------
database: str
Database name.
kms_key_id: Optional[str]
The KMS key for the database. If the KMS key is not specified,
the database will be encrypted with a Timestream managed KMS key located in your account.
tags: Optional[Dict[str, str]]
Key/Value dict to put on the database.
Tags enable you to categorize databases and/or tables, for example,
by purpose, owner, or environment.
e.g. {"foo": "boo", "bar": "xoo"})
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 Session will be used if boto3_session receive None.

Returns
-------
str
The Amazon Resource Name that uniquely identifies this database. (ARN)

Examples
--------
Creating a database.

>>> import awswrangler as wr
>>> arn = wr.timestream.create_database("MyDatabase")

"""
client: boto3.client = _utils.client(service_name="timestream-write", session=boto3_session)
args: Dict[str, Any] = {"DatabaseName": database}
if kms_key_id is not None:
args["KmsKeyId"] = kms_key_id
if tags is not None:
args["Tags"] = [{"Key": k, "Value": v} for k, v in tags.items()]
response: Dict[str, Dict[str, Any]] = client.create_database(**args)
return cast(str, response["Database"]["Arn"])


def delete_database(
database: str,
boto3_session: Optional[boto3.Session] = None,
) -> None:
"""Delete a given Timestream database. This is an irreversible operation.

After a database is deleted, the time series data from its tables cannot be recovered.

All tables in the database must be deleted first, or a ValidationException error will be thrown.

Due to the nature of distributed retries,
the operation can return either success or a ResourceNotFoundException.
Clients should consider them equivalent.

Parameters
----------
database: str
Database name.
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 Session will be used if boto3_session receive None.

Returns
-------
None
None.

Examples
--------
Deleting a database

>>> import awswrangler as wr
>>> arn = wr.timestream.delete_database("MyDatabase")

"""
client: boto3.client = _utils.client(service_name="timestream-write", session=boto3_session)
client.delete_database(DatabaseName=database)


def create_table(
database: str,
table: str,
memory_retention_hours: int,
magnetic_retention_days: int,
tags: Optional[Dict[str, str]] = None,
boto3_session: Optional[boto3.Session] = None,
) -> str:
"""Create a new Timestream database.

Note
----
If the KMS key is not specified, the database will be encrypted with a
Timestream managed KMS key located in your account.

Parameters
----------
database: str
Database name.
table: str
Table name.
memory_retention_hours: int
The duration for which data must be stored in the memory store.
magnetic_retention_days: int
The duration for which data must be stored in the magnetic store.
tags: Optional[Dict[str, str]]
Key/Value dict to put on the table.
Tags enable you to categorize databases and/or tables, for example,
by purpose, owner, or environment.
e.g. {"foo": "boo", "bar": "xoo"})
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 Session will be used if boto3_session receive None.

Returns
-------
str
The Amazon Resource Name that uniquely identifies this database. (ARN)

Examples
--------
Creating a table.

>>> import awswrangler as wr
>>> arn = wr.timestream.create_table(
... database="MyDatabase",
... table="MyTable",
... memory_retention_hours=3,
... magnetic_retention_days=7
... )

"""
client: boto3.client = _utils.client(service_name="timestream-write", session=boto3_session)
args: Dict[str, Any] = {
"DatabaseName": database,
"TableName": table,
"RetentionProperties": {
"MemoryStoreRetentionPeriodInHours": memory_retention_hours,
"MagneticStoreRetentionPeriodInDays": magnetic_retention_days,
},
}
if tags is not None:
args["Tags"] = [{"Key": k, "Value": v} for k, v in tags.items()]
response: Dict[str, Dict[str, Any]] = client.create_table(**args)
return cast(str, response["Table"]["Arn"])


def delete_table(
database: str,
table: str,
boto3_session: Optional[boto3.Session] = None,
) -> None:
"""Delete a given Timestream table.

This is an irreversible operation.

After a Timestream database table is deleted, the time series data stored in the table cannot be recovered.

Due to the nature of distributed retries,
the operation can return either success or a ResourceNotFoundException.
Clients should consider them equivalent.

Parameters
----------
database: str
Database name.
table: str
Table name.
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 Session will be used if boto3_session receive None.

Returns
-------
None
None.

Examples
--------
Deleting a table

>>> import awswrangler as wr
>>> arn = wr.timestream.delete_table("MyDatabase", "MyTable")

"""
client: boto3.client = _utils.client(service_name="timestream-write", session=boto3_session)
client.delete_table(DatabaseName=database, TableName=table)
6 changes: 5 additions & 1 deletion docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,12 @@ Amazon Timestream
.. autosummary::
:toctree: stubs

write
create_database
create_table
delete_database
delete_table
query
write

Amazon EMR
----------
Expand Down
11 changes: 11 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,14 @@ def mysql_table():
with con.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS test.{name}")
con.close()


@pytest.fixture(scope="function")
def timestream_database_and_table():
name = f"tbl_{get_time_str_with_random_suffix()}"
print(f"Timestream name: {name}")
wr.timestream.create_database(name)
wr.timestream.create_table(name, name, 1, 1)
yield name
wr.timestream.delete_table(name, name)
wr.timestream.delete_database(name)
57 changes: 29 additions & 28 deletions tests/test_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
logging.getLogger("awswrangler").setLevel(logging.DEBUG)


def test_write_simple():
def test_basic_scenario(timestream_database_and_table):
name = timestream_database_and_table
df = pd.DataFrame(
{
"time": [datetime.now(), datetime.now(), datetime.now()],
Expand All @@ -19,16 +20,34 @@ def test_write_simple():
)
rejected_records = wr.timestream.write(
df=df,
database="sampleDB",
table="sampleTable",
database=name,
table=name,
time_col="time",
measure_col="measure",
dimensions_cols=["dim0", "dim1"],
)
assert len(rejected_records) == 0
df = wr.timestream.query(
f"""
SELECT
1 as col_int,
try_cast(now() as time) as col_time,
TRUE as col_bool,
current_date as col_date,
'foo' as col_str,
measure_value::double,
measure_name,
time
FROM "{name}"."{name}"
ORDER BY time
DESC LIMIT 10
"""
)
assert df.shape == (3, 8)


def test_write_real():
def test_real_csv_load_scenario(timestream_database_and_table):
name = timestream_database_and_table
df = pd.read_csv(
"https://raw.githubusercontent.com/awslabs/amazon-timestream-tools/master/sample_apps/data/sample.csv",
names=[
Expand All @@ -52,39 +71,21 @@ def test_write_real():
df_memory = df[df.measure_kind == "memory_utilization"]
rejected_records = wr.timestream.write(
df=df_cpu,
database="sampleDB",
table="sampleTable",
database=name,
table=name,
time_col="time",
measure_col="measure",
dimensions_cols=["index", "region", "az", "hostname"],
)
assert len(rejected_records) == 0
rejected_records = wr.timestream.write(
df=df_memory,
database="sampleDB",
table="sampleTable",
database=name,
table=name,
time_col="time",
measure_col="measure",
dimensions_cols=["index", "region", "az", "hostname"],
)
assert len(rejected_records) == 0


def test_query():
df = wr.timestream.query(
"""
SELECT
1 as col_int,
try_cast(now() as time) as col_time,
TRUE as col_bool,
current_date as col_date,
'foo' as col_str,
measure_value::double,
measure_name,
time
FROM "sampleDB"."sampleTable"
ORDER BY time
DESC LIMIT 10
"""
)
assert df.shape == (10, 8)
df = wr.timestream.query(f'SELECT COUNT(*) AS counter FROM "{name}"."{name}"')
assert df["counter"].iloc[0] == 126_000
Loading