Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false,
"python.testing.pytestArgs": [
"tests"
]
}
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

# 0.17.0 [unreleased]

### Features

1. [#177](https://github.com/InfluxCommunity/influxdb3-python/pull/177): Add dedicated DataFrame methods for improved usability and type safety:
- `write_dataframe()`: New method for writing pandas and polars DataFrames with explicit parameters (`measurement`, `timestamp_column`, `tags`, `timestamp_timezone`).
- `query_dataframe()`: New method for querying data directly to a pandas or polars DataFrame via the `frame_type` parameter.
- Updated README with clear examples for DataFrame operations.

### Bug Fixes

1. [#177](https://github.com/InfluxCommunity/influxdb3-python/pull/177): Fix `TypeError` when writing DataFrames. Serializer-specific kwargs (e.g., `data_frame_measurement_name`) are now filtered before being passed to the HTTP layer.

### CI

1. [#164](https://github.com/InfluxCommunity/influxdb3-python/pull/164): Fix pipelines not downloading the correct python images.
Expand Down
49 changes: 45 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,46 @@ print(f'DONE writing from csv in {callback.write_count} batch(es)')

```

### Pandas DF
### Pandas DataFrame
```python
client._write_api.write(bucket="pokemon-codex", record=pd_df, data_frame_measurement_name='caught', data_frame_tag_columns=['trainer', 'id', 'num'], data_frame_timestamp_column='timestamp')
import pandas as pd

# Create a DataFrame with a timestamp column
df = pd.DataFrame({
'time': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-03']),
'trainer': ['Ash', 'Misty', 'Brock'],
'pokemon_id': [25, 120, 74],
'pokemon_name': ['Pikachu', 'Staryu', 'Geodude']
})

# Write the DataFrame - timestamp_column is required for consistency
client.write_dataframe(
df,
measurement='caught',
timestamp_column='time',
tags=['trainer', 'pokemon_id']
)
```

### Polars DF
### Polars DataFrame
```python
client._write_api.write(bucket="pokemon-codex", record=pl_df, data_frame_measurement_name='caught', data_frame_tag_columns=['trainer', 'id', 'num'], data_frame_timestamp_column='timestamp')
import polars as pl

# Create a DataFrame with a timestamp column
df = pl.DataFrame({
'time': ['2024-01-01T00:00:00Z', '2024-01-02T00:00:00Z'],
'trainer': ['Ash', 'Misty'],
'pokemon_id': [25, 120],
'pokemon_name': ['Pikachu', 'Staryu']
})

# Write the DataFrame - same API works for both pandas and polars
client.write_dataframe(
df,
measurement='caught',
timestamp_column='time',
tags=['trainer', 'pokemon_id']
)
```

## Querying
Expand All @@ -154,6 +186,15 @@ table = reader.read_all()
print(table.to_pandas().to_markdown())
```

### Querying to DataFrame
```python
# Query directly to a pandas DataFrame (default)
df = client.query_dataframe("SELECT * FROM caught WHERE trainer = 'Ash'")

# Query to a polars DataFrame
df = client.query_dataframe("SELECT * FROM caught", frame_type="polars")
```

### Querying with influxql
```python
query = "select * from measurement"
Expand Down
122 changes: 121 additions & 1 deletion influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import importlib.util
import os
import urllib.parse
from typing import Any
from typing import Any, List, Literal, Optional, TYPE_CHECKING

import pyarrow as pa

if TYPE_CHECKING:
import pandas as pd
import polars as pl
from pyarrow import ArrowException

from influxdb_client_3.exceptions import InfluxDB3ClientQueryError
Expand Down Expand Up @@ -385,6 +389,77 @@ def write(self, record=None, database=None, **kwargs):
except InfluxDBError as e:
raise e

def write_dataframe(
self,
df: "pd.DataFrame | pl.DataFrame",
measurement: str,
timestamp_column: str,
tags: Optional[List[str]] = None,
timestamp_timezone: Optional[str] = None,
database: Optional[str] = None,
**kwargs
):
"""
Write a DataFrame to InfluxDB.

This method supports both pandas and polars DataFrames, automatically detecting
the DataFrame type and using the appropriate serializer.

:param df: The DataFrame to write. Can be a pandas or polars DataFrame.
:type df: pandas.DataFrame or polars.DataFrame
:param measurement: The name of the measurement to write to.
:type measurement: str
:param timestamp_column: The name of the column containing timestamps.
This parameter is required for consistency between pandas and polars.
:type timestamp_column: str
:param tags: List of column names to use as tags. Remaining columns will be fields.
:type tags: list[str], optional
:param timestamp_timezone: Timezone for the timestamp column (e.g., 'UTC', 'America/New_York').
:type timestamp_timezone: str, optional
:param database: The database to write to. If not provided, uses the database from initialization.
:type database: str, optional
:param kwargs: Additional arguments to pass to the write API.
:raises TypeError: If df is not a pandas or polars DataFrame.
:raises InfluxDBError: If there is an error writing to the database.

Example:
>>> import pandas as pd
>>> df = pd.DataFrame({
... 'time': pd.to_datetime(['2024-01-01', '2024-01-02']),
... 'city': ['London', 'Paris'],
... 'temperature': [15.0, 18.0]
... })
>>> client.write_dataframe(
... df,
... measurement='weather',
... timestamp_column='time',
... tags=['city']
... )
"""
if database is None:
database = self._database

# Detect DataFrame type
df_type = str(type(df))
if 'pandas' not in df_type and 'polars' not in df_type:
raise TypeError(
f"Expected a pandas or polars DataFrame, but got {type(df).__name__}. "
"Please pass a valid DataFrame object."
)

try:
return self._write_api.write(
bucket=database,
record=df,
data_frame_measurement_name=measurement,
data_frame_tag_columns=tags or [],
data_frame_timestamp_column=timestamp_column,
data_frame_timestamp_timezone=timestamp_timezone,
**kwargs
)
except InfluxDBError as e:
raise e

def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', database=None,
file_parser_options=None, **kwargs):
"""
Expand Down Expand Up @@ -467,6 +542,51 @@ def query(self, query: str, language: str = "sql", mode: str = "all", database:
except ArrowException as e:
raise InfluxDB3ClientQueryError(f"Error while executing query: {e}")

def query_dataframe(
self,
query: str,
language: str = "sql",
database: Optional[str] = None,
frame_type: Literal["pandas", "polars"] = "pandas",
**kwargs
) -> "pd.DataFrame | pl.DataFrame":
"""
Query data from InfluxDB and return as a DataFrame.

This is a convenience method that wraps query() and returns the result
directly as a pandas or polars DataFrame.

:param query: The query to execute on the database.
:type query: str
:param language: The query language to use. Should be "sql" or "influxql". Defaults to "sql".
:type language: str
:param database: The database to query from. If not provided, uses the database from initialization.
:type database: str, optional
:param frame_type: The type of DataFrame to return. Either "pandas" or "polars". Defaults to "pandas".
:type frame_type: Literal["pandas", "polars"]
:param kwargs: Additional arguments to pass to the query API.
:keyword query_parameters: Query parameters as a dictionary of key-value pairs.
:return: Query result as a pandas or polars DataFrame.
:rtype: pandas.DataFrame or polars.DataFrame
:raises ImportError: If polars is requested but not installed.

Example:
>>> # Query and get a pandas DataFrame
>>> df = client.query_dataframe("SELECT * FROM weather WHERE city = 'London'")
>>>
>>> # Query and get a polars DataFrame
>>> df = client.query_dataframe(
... "SELECT * FROM weather",
... frame_type="polars"
... )
"""
if frame_type == "polars" and polars is False:
raise ImportError(
"Polars is not installed. Please install it with `pip install polars`."
)

return self.query(query=query, language=language, mode=frame_type, database=database, **kwargs)

async def query_async(self, query: str, language: str = "sql", mode: str = "all", database: str = None, **kwargs):
"""Query data from InfluxDB asynchronously.

Expand Down
20 changes: 19 additions & 1 deletion influxdb_client_3/write_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@
DEFAULT_WRITE_NO_SYNC = False
DEFAULT_WRITE_TIMEOUT = 10_000

# Kwargs consumed during serialization that should not be passed to _post_write
SERIALIZER_KWARGS = {
# DataFrame-specific kwargs
'data_frame_measurement_name',
'data_frame_tag_columns',
'data_frame_timestamp_column',
'data_frame_timestamp_timezone',
# Record-specific kwargs (dict, NamedTuple, dataclass)
'record_measurement_key',
'record_measurement_name',
'record_time_key',
'record_tag_keys',
'record_field_keys',
}

logger = logging.getLogger('influxdb_client_3.write_client.client.write_api')

if _HAS_DATACLASS:
Expand Down Expand Up @@ -397,9 +412,12 @@ def write(self, bucket: str, org: str = None,

_async_req = True if self._write_options.write_type == WriteType.asynchronous else False

# Filter out serializer-specific kwargs before passing to _post_write
http_kwargs = {k: v for k, v in kwargs.items() if k not in SERIALIZER_KWARGS}

def write_payload(payload):
final_string = b'\n'.join(payload[1])
return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, **kwargs)
return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, **http_kwargs)

results = list(map(write_payload, payloads.items()))
if not _async_req:
Expand Down
Loading
Loading