diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..dda98d3 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "python.testing.pytestEnabled": true, + "python.testing.unittestEnabled": false, + "python.testing.pytestArgs": [ + "tests" + ] +} diff --git a/CHANGELOG.md b/CHANGELOG.md index 2bf525d..dcb7bca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,17 @@ # 0.17.0 [unreleased] +### Features + +1. [#177](https://github.com/InfluxCommunity/influxdb3-python/pull/177): Add dedicated DataFrame methods for improved usability and type safety: + - `write_dataframe()`: New method for writing pandas and polars DataFrames with explicit parameters (`measurement`, `timestamp_column`, `tags`, `timestamp_timezone`). + - `query_dataframe()`: New method for querying data directly to a pandas or polars DataFrame via the `frame_type` parameter. + - Updated README with clear examples for DataFrame operations. + +### Bug Fixes + +1. [#177](https://github.com/InfluxCommunity/influxdb3-python/pull/177): Fix `TypeError` when writing DataFrames. Serializer-specific kwargs (e.g., `data_frame_measurement_name`) are now filtered before being passed to the HTTP layer. + ### CI 1. [#164](https://github.com/InfluxCommunity/influxdb3-python/pull/164): Fix pipelines not downloading the correct python images. diff --git a/README.md b/README.md index 7794e32..237aaf1 100644 --- a/README.md +++ b/README.md @@ -134,14 +134,46 @@ print(f'DONE writing from csv in {callback.write_count} batch(es)') ``` -### Pandas DF +### Pandas DataFrame ```python -client._write_api.write(bucket="pokemon-codex", record=pd_df, data_frame_measurement_name='caught', data_frame_tag_columns=['trainer', 'id', 'num'], data_frame_timestamp_column='timestamp') +import pandas as pd + +# Create a DataFrame with a timestamp column +df = pd.DataFrame({ + 'time': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-03']), + 'trainer': ['Ash', 'Misty', 'Brock'], + 'pokemon_id': [25, 120, 74], + 'pokemon_name': ['Pikachu', 'Staryu', 'Geodude'] +}) + +# Write the DataFrame - timestamp_column is required for consistency +client.write_dataframe( + df, + measurement='caught', + timestamp_column='time', + tags=['trainer', 'pokemon_id'] +) ``` -### Polars DF +### Polars DataFrame ```python -client._write_api.write(bucket="pokemon-codex", record=pl_df, data_frame_measurement_name='caught', data_frame_tag_columns=['trainer', 'id', 'num'], data_frame_timestamp_column='timestamp') +import polars as pl + +# Create a DataFrame with a timestamp column +df = pl.DataFrame({ + 'time': ['2024-01-01T00:00:00Z', '2024-01-02T00:00:00Z'], + 'trainer': ['Ash', 'Misty'], + 'pokemon_id': [25, 120], + 'pokemon_name': ['Pikachu', 'Staryu'] +}) + +# Write the DataFrame - same API works for both pandas and polars +client.write_dataframe( + df, + measurement='caught', + timestamp_column='time', + tags=['trainer', 'pokemon_id'] +) ``` ## Querying @@ -154,6 +186,15 @@ table = reader.read_all() print(table.to_pandas().to_markdown()) ``` +### Querying to DataFrame +```python +# Query directly to a pandas DataFrame (default) +df = client.query_dataframe("SELECT * FROM caught WHERE trainer = 'Ash'") + +# Query to a polars DataFrame +df = client.query_dataframe("SELECT * FROM caught", frame_type="polars") +``` + ### Querying with influxql ```python query = "select * from measurement" diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index bbfa89b..3451485 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -1,9 +1,13 @@ import importlib.util import os import urllib.parse -from typing import Any +from typing import Any, List, Literal, Optional, TYPE_CHECKING import pyarrow as pa + +if TYPE_CHECKING: + import pandas as pd + import polars as pl from pyarrow import ArrowException from influxdb_client_3.exceptions import InfluxDB3ClientQueryError @@ -385,6 +389,77 @@ def write(self, record=None, database=None, **kwargs): except InfluxDBError as e: raise e + def write_dataframe( + self, + df: "pd.DataFrame | pl.DataFrame", + measurement: str, + timestamp_column: str, + tags: Optional[List[str]] = None, + timestamp_timezone: Optional[str] = None, + database: Optional[str] = None, + **kwargs + ): + """ + Write a DataFrame to InfluxDB. + + This method supports both pandas and polars DataFrames, automatically detecting + the DataFrame type and using the appropriate serializer. + + :param df: The DataFrame to write. Can be a pandas or polars DataFrame. + :type df: pandas.DataFrame or polars.DataFrame + :param measurement: The name of the measurement to write to. + :type measurement: str + :param timestamp_column: The name of the column containing timestamps. + This parameter is required for consistency between pandas and polars. + :type timestamp_column: str + :param tags: List of column names to use as tags. Remaining columns will be fields. + :type tags: list[str], optional + :param timestamp_timezone: Timezone for the timestamp column (e.g., 'UTC', 'America/New_York'). + :type timestamp_timezone: str, optional + :param database: The database to write to. If not provided, uses the database from initialization. + :type database: str, optional + :param kwargs: Additional arguments to pass to the write API. + :raises TypeError: If df is not a pandas or polars DataFrame. + :raises InfluxDBError: If there is an error writing to the database. + + Example: + >>> import pandas as pd + >>> df = pd.DataFrame({ + ... 'time': pd.to_datetime(['2024-01-01', '2024-01-02']), + ... 'city': ['London', 'Paris'], + ... 'temperature': [15.0, 18.0] + ... }) + >>> client.write_dataframe( + ... df, + ... measurement='weather', + ... timestamp_column='time', + ... tags=['city'] + ... ) + """ + if database is None: + database = self._database + + # Detect DataFrame type + df_type = str(type(df)) + if 'pandas' not in df_type and 'polars' not in df_type: + raise TypeError( + f"Expected a pandas or polars DataFrame, but got {type(df).__name__}. " + "Please pass a valid DataFrame object." + ) + + try: + return self._write_api.write( + bucket=database, + record=df, + data_frame_measurement_name=measurement, + data_frame_tag_columns=tags or [], + data_frame_timestamp_column=timestamp_column, + data_frame_timestamp_timezone=timestamp_timezone, + **kwargs + ) + except InfluxDBError as e: + raise e + def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', database=None, file_parser_options=None, **kwargs): """ @@ -467,6 +542,51 @@ def query(self, query: str, language: str = "sql", mode: str = "all", database: except ArrowException as e: raise InfluxDB3ClientQueryError(f"Error while executing query: {e}") + def query_dataframe( + self, + query: str, + language: str = "sql", + database: Optional[str] = None, + frame_type: Literal["pandas", "polars"] = "pandas", + **kwargs + ) -> "pd.DataFrame | pl.DataFrame": + """ + Query data from InfluxDB and return as a DataFrame. + + This is a convenience method that wraps query() and returns the result + directly as a pandas or polars DataFrame. + + :param query: The query to execute on the database. + :type query: str + :param language: The query language to use. Should be "sql" or "influxql". Defaults to "sql". + :type language: str + :param database: The database to query from. If not provided, uses the database from initialization. + :type database: str, optional + :param frame_type: The type of DataFrame to return. Either "pandas" or "polars". Defaults to "pandas". + :type frame_type: Literal["pandas", "polars"] + :param kwargs: Additional arguments to pass to the query API. + :keyword query_parameters: Query parameters as a dictionary of key-value pairs. + :return: Query result as a pandas or polars DataFrame. + :rtype: pandas.DataFrame or polars.DataFrame + :raises ImportError: If polars is requested but not installed. + + Example: + >>> # Query and get a pandas DataFrame + >>> df = client.query_dataframe("SELECT * FROM weather WHERE city = 'London'") + >>> + >>> # Query and get a polars DataFrame + >>> df = client.query_dataframe( + ... "SELECT * FROM weather", + ... frame_type="polars" + ... ) + """ + if frame_type == "polars" and polars is False: + raise ImportError( + "Polars is not installed. Please install it with `pip install polars`." + ) + + return self.query(query=query, language=language, mode=frame_type, database=database, **kwargs) + async def query_async(self, query: str, language: str = "sql", mode: str = "all", database: str = None, **kwargs): """Query data from InfluxDB asynchronously. diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index 1c4484a..9c601eb 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -27,6 +27,21 @@ DEFAULT_WRITE_NO_SYNC = False DEFAULT_WRITE_TIMEOUT = 10_000 +# Kwargs consumed during serialization that should not be passed to _post_write +SERIALIZER_KWARGS = { + # DataFrame-specific kwargs + 'data_frame_measurement_name', + 'data_frame_tag_columns', + 'data_frame_timestamp_column', + 'data_frame_timestamp_timezone', + # Record-specific kwargs (dict, NamedTuple, dataclass) + 'record_measurement_key', + 'record_measurement_name', + 'record_time_key', + 'record_tag_keys', + 'record_field_keys', +} + logger = logging.getLogger('influxdb_client_3.write_client.client.write_api') if _HAS_DATACLASS: @@ -397,9 +412,12 @@ def write(self, bucket: str, org: str = None, _async_req = True if self._write_options.write_type == WriteType.asynchronous else False + # Filter out serializer-specific kwargs before passing to _post_write + http_kwargs = {k: v for k, v in kwargs.items() if k not in SERIALIZER_KWARGS} + def write_payload(payload): final_string = b'\n'.join(payload[1]) - return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, **kwargs) + return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, **http_kwargs) results = list(map(write_payload, payloads.items())) if not _async_req: diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 7bab679..87097f1 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -1,6 +1,6 @@ import re import unittest -from unittest.mock import patch +from unittest.mock import patch, MagicMock from pytest_httpserver import HTTPServer @@ -11,6 +11,14 @@ from tests.util import asyncio_run from tests.util.mocks import ConstantFlightServer, ConstantData, ErrorFlightServer +import pandas as pd + +try: + import polars as pl + HAS_POLARS = True +except ImportError: + HAS_POLARS = False + def http_server(): httpserver = HTTPServer() @@ -353,5 +361,185 @@ def test_get_version_fail(self): ).get_server_version() +class TestWriteDataFrame(unittest.TestCase): + """Tests for the write_dataframe() method.""" + + @patch('influxdb_client_3._InfluxDBClient') + @patch('influxdb_client_3._WriteApi') + @patch('influxdb_client_3._QueryApi') + def setUp(self, mock_query_api, mock_write_api, mock_influx_db_client): + self.mock_write_api = mock_write_api + self.client = InfluxDBClient3( + host="localhost", + org="my_org", + database="my_db", + token="my_token" + ) + + def test_write_dataframe_with_pandas(self): + """Test write_dataframe() with a pandas DataFrame.""" + df = pd.DataFrame({ + 'time': pd.to_datetime(['2024-01-01', '2024-01-02']), + 'city': ['London', 'Paris'], + 'temperature': [15.0, 18.0] + }) + + self.client.write_dataframe( + df, + measurement='weather', + timestamp_column='time', + tags=['city'] + ) + + # Verify _write_api.write was called with correct parameters + self.client._write_api.write.assert_called_once() + call_kwargs = self.client._write_api.write.call_args[1] + self.assertEqual(call_kwargs['bucket'], 'my_db') + self.assertEqual(call_kwargs['data_frame_measurement_name'], 'weather') + self.assertEqual(call_kwargs['data_frame_tag_columns'], ['city']) + self.assertEqual(call_kwargs['data_frame_timestamp_column'], 'time') + + def test_write_dataframe_with_custom_database(self): + """Test write_dataframe() with a custom database.""" + df = pd.DataFrame({ + 'time': pd.to_datetime(['2024-01-01']), + 'value': [42.0] + }) + + self.client.write_dataframe( + df, + measurement='test', + timestamp_column='time', + database='other_db' + ) + + call_kwargs = self.client._write_api.write.call_args[1] + self.assertEqual(call_kwargs['bucket'], 'other_db') + + def test_write_dataframe_with_timezone(self): + """Test write_dataframe() with timestamp timezone.""" + df = pd.DataFrame({ + 'time': pd.to_datetime(['2024-01-01']), + 'value': [42.0] + }) + + self.client.write_dataframe( + df, + measurement='test', + timestamp_column='time', + timestamp_timezone='UTC' + ) + + call_kwargs = self.client._write_api.write.call_args[1] + self.assertEqual(call_kwargs['data_frame_timestamp_timezone'], 'UTC') + + def test_write_dataframe_raises_type_error_for_invalid_input(self): + """Test write_dataframe() raises TypeError for non-DataFrame input.""" + with self.assertRaises(TypeError) as context: + self.client.write_dataframe( + [1, 2, 3], # A list, not a DataFrame + measurement='test', + timestamp_column='time' + ) + self.assertIn("Expected a pandas or polars DataFrame", str(context.exception)) + self.assertIn("list", str(context.exception)) + + def test_write_dataframe_raises_type_error_for_dict(self): + """Test write_dataframe() raises TypeError for dict input.""" + with self.assertRaises(TypeError) as context: + self.client.write_dataframe( + {'time': [1, 2], 'value': [10, 20]}, + measurement='test', + timestamp_column='time' + ) + self.assertIn("Expected a pandas or polars DataFrame", str(context.exception)) + + @unittest.skipUnless(HAS_POLARS, "Polars not installed") + def test_write_dataframe_with_polars(self): + """Test write_dataframe() with a polars DataFrame.""" + df = pl.DataFrame({ + 'time': ['2024-01-01', '2024-01-02'], + 'city': ['London', 'Paris'], + 'temperature': [15.0, 18.0] + }) + + self.client.write_dataframe( + df, + measurement='weather', + timestamp_column='time', + tags=['city'] + ) + + # Verify _write_api.write was called with correct parameters + self.client._write_api.write.assert_called_once() + call_kwargs = self.client._write_api.write.call_args[1] + self.assertEqual(call_kwargs['data_frame_measurement_name'], 'weather') + self.assertEqual(call_kwargs['data_frame_tag_columns'], ['city']) + + +class TestQueryDataFrame(unittest.TestCase): + """Tests for the query_dataframe() method.""" + + def test_query_dataframe_returns_pandas_by_default(self): + """Test query_dataframe() returns pandas DataFrame by default.""" + with ConstantFlightServer() as server: + client = InfluxDBClient3( + host=f"http://localhost:{server.port}", + org="my_org", + database="my_db", + token="my_token", + ) + + result = client.query_dataframe("SELECT * FROM test") + + # Should return a pandas DataFrame + self.assertIsInstance(result, pd.DataFrame) + + def test_query_dataframe_with_sql_language(self): + """Test query_dataframe() with explicit SQL language.""" + with ConstantFlightServer() as server: + client = InfluxDBClient3( + host=f"http://localhost:{server.port}", + org="my_org", + database="my_db", + token="my_token", + ) + + result = client.query_dataframe("SELECT * FROM test", language="sql") + self.assertIsInstance(result, pd.DataFrame) + + @unittest.skipUnless(HAS_POLARS, "Polars not installed") + def test_query_dataframe_returns_polars_when_requested(self): + """Test query_dataframe() returns polars DataFrame when frame_type='polars'.""" + with ConstantFlightServer() as server: + client = InfluxDBClient3( + host=f"http://localhost:{server.port}", + org="my_org", + database="my_db", + token="my_token", + ) + + result = client.query_dataframe("SELECT * FROM test", frame_type="polars") + + # Should return a polars DataFrame + self.assertIsInstance(result, pl.DataFrame) + + @patch('influxdb_client_3.polars', False) + def test_query_dataframe_raises_import_error_for_polars_when_not_installed(self): + """Test query_dataframe() raises ImportError when polars is requested but not installed.""" + with ConstantFlightServer() as server: + client = InfluxDBClient3( + host=f"http://localhost:{server.port}", + org="my_org", + database="my_db", + token="my_token", + ) + + with self.assertRaises(ImportError) as context: + client.query_dataframe("SELECT * FROM test", frame_type="polars") + self.assertIn("Polars is not installed", str(context.exception)) + self.assertIn("pip install polars", str(context.exception)) + + if __name__ == '__main__': unittest.main() diff --git a/tests/test_write_local_server.py b/tests/test_write_local_server.py index 9e7e406..ab71ff2 100644 --- a/tests/test_write_local_server.py +++ b/tests/test_write_local_server.py @@ -2,6 +2,7 @@ import time from http import HTTPStatus +import pandas as pd import pytest from pytest_httpserver import HTTPServer, RequestMatcher from urllib3.exceptions import TimeoutError as urllib3_TimeoutError @@ -183,3 +184,32 @@ def test_write_with_timeout_arg(self, httpserver: HTTPServer): ), enable_gzip=True ).write(self.SAMPLE_RECORD, _request_timeout=1) + + def test_write_dataframe_does_not_raise_type_error(self, httpserver: HTTPServer): + """ + Regression test: writing a DataFrame should not raise TypeError. + + Before the fix, serializer kwargs were passed to post_write(), causing a TypeError. + """ + self.set_response_status(httpserver, 200) + + df = pd.DataFrame({ + 'time': pd.to_datetime(['2024-01-01', '2024-01-02']), + 'city': ['London', 'Paris'], + 'temperature': [15.0, 18.0] + }) + + try: + InfluxDBClient3( + host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN", + write_client_options=write_client_options( + write_options=WriteOptions(write_type=WriteType.synchronous) + ) + ).write_dataframe( + df, + measurement='weather', + timestamp_column='time', + tags=['city'] + ) + except TypeError as e: + pytest.fail(f"write_dataframe raised TypeError: {e}")