Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions awswrangler/timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,17 @@ def write(
return [item for sublist in res for item in sublist]


def query(sql: str, boto3_session: Optional[boto3.Session] = None) -> pd.DataFrame:
def query(
sql: str, pagination_config: Dict[str, Any] = None, boto3_session: Optional[boto3.Session] = None
) -> pd.DataFrame:
"""Run a query and retrieve the result as a Pandas DataFrame.

Parameters
----------
sql: str
SQL query.
pagination_config: Dict[str, Any]
Pagination configuration dictionary of a form {'MaxItems': 10, 'PageSize': 10, 'StartingToken': '...'}
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 Session will be used if boto3_session receive None.

Expand All @@ -230,7 +234,7 @@ def query(sql: str, boto3_session: Optional[boto3.Session] = None) -> pd.DataFra
paginator = client.get_paginator("query")
rows: List[List[Any]] = []
schema: List[Dict[str, str]] = []
for page in paginator.paginate(QueryString=sql):
for page in paginator.paginate(QueryString=sql, PaginationConfig=pagination_config or {}):
if not schema:
schema = _process_schema(page=page)
for row in page["Rows"]:
Expand Down
7 changes: 5 additions & 2 deletions tests/test_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
from datetime import datetime

import pandas as pd
import pytest

import awswrangler as wr

logging.getLogger("awswrangler").setLevel(logging.DEBUG)


def test_basic_scenario(timestream_database_and_table):
@pytest.mark.parametrize("pagination", [None, {}, {"MaxItems": 3, "PageSize": 2}])
def test_basic_scenario(timestream_database_and_table, pagination):
name = timestream_database_and_table
df = pd.DataFrame(
{
Expand Down Expand Up @@ -41,7 +43,8 @@ def test_basic_scenario(timestream_database_and_table):
FROM "{name}"."{name}"
ORDER BY time
DESC LIMIT 10
"""
""",
pagination_config=pagination,
)
assert df.shape == (3, 8)

Expand Down