Skip to content

Commit

Permalink
Export: Add support for InfluxDB 3.x
Browse files Browse the repository at this point in the history
  • Loading branch information
gutzbenj committed Oct 11, 2023
1 parent 06c4c97 commit 14af9cc
Show file tree
Hide file tree
Showing 8 changed files with 393 additions and 253 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Development

- Remove direct tzdata dependency
- Replace pandas read_fwf calls by polars substitutes
- Export: Add support for InfluxDB 3.x

0.63.0 (08.10.2023)
*******************
Expand Down
386 changes: 186 additions & 200 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ httpx = {version = ">=0.24,<0.26", optional = true}
h5py = { version = ">=3.1,<4", optional = true } # Radar feature.
influxdb = { version = ">=5.3,<6", optional = true } # Export feature.
influxdb-client = { version = ">=1.18,<2", optional = true } # Export feature.
influxdb3-python = {version = ">=0.3.0,<1", optional = true} # Export feature
matplotlib = { version = ">=3.3,<3.9", optional = true }
mysqlclient = { version = ">=2,<3", optional = true } # Export feature.
openpyxl = { version = ">=3,<4", optional = true }
Expand Down Expand Up @@ -218,7 +219,7 @@ cratedb = ["crate"]
duckdb = ["duckdb"]
explorer = ["dash", "dash-bootstrap-components", "dash-leaflet", "geojson", "plotly"]
export = ["openpyxl", "sqlalchemy", "xarray", "zarr"]
influxdb = ["influxdb", "influxdb-client"]
influxdb = ["influxdb", "influxdb-client", "influxdb3-python"]
interpolation = ["scipy", "shapely", "utm"]
ipython = ["ipython", "matplotlib"]
mpl = ["matplotlib"]
Expand Down
76 changes: 76 additions & 0 deletions tests/core/timeseries/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,3 +795,79 @@ def test_export_influxdb2_tidy(settings_si_false):
df = request.values.all().df
ExportMixin(df=df).to_target("influxdb2://orga:token@localhost/?database=dwd&table=weather")
mock_connect.assert_called_once_with(url="http://localhost:8086", org="orga", token="token") # noqa: S106


@surrogate("influxdb_client_3.InfluxDBClient3")
@surrogate("influxdb_client_3.Point")
@surrogate("influxdb_client_3.WriteOptions")
@surrogate("influxdb_client_3.write_client_options")
@surrogate("influxdb_client_3.write_client.client.write_api.WriteType")
@pytest.mark.remote
def test_export_influxdb3_tabular(settings_si_false):
"""Test export of DataFrame to influxdb v3"""
request = DwdObservationRequest(
parameter=DwdObservationDataset.CLIMATE_SUMMARY,
resolution=DwdObservationResolution.DAILY,
period=DwdObservationPeriod.RECENT,
settings=settings_si_false,
).filter_by_station_id(station_id=[1048])
with mock.patch(
"influxdb_client_3.InfluxDBClient3",
) as mock_client, mock.patch(
"influxdb_client_3.Point",
), mock.patch(
"influxdb_client_3.WriteOptions",
) as mock_write_options, mock.patch(
"influxdb_client_3.write_client_options",
) as mock_write_client_options, mock.patch(
"influxdb_client_3.write_client.client.write_api.WriteType",
) as mock_write_type:
df = request.values.all().df
ExportMixin(df=df).to_target("influxdb3://orga:token@localhost/?database=dwd&table=weather")
mock_write_options.assert_called_once_with(write_type=mock_write_type.synchronous)
mock_write_client_options.assert_called_once_with(WriteOptions=mock_write_options())
mock_client.assert_called_once_with(
host="localhost",
org="orga",
database="dwd",
token="token", # noqa: S106
write_client_options=mock_write_client_options(),
)


@surrogate("influxdb_client_3.InfluxDBClient3")
@surrogate("influxdb_client_3.Point")
@surrogate("influxdb_client_3.WriteOptions")
@surrogate("influxdb_client_3.write_client_options")
@surrogate("influxdb_client_3.write_client.client.write_api.WriteType")
@pytest.mark.remote
def test_export_influxdb3_tidy(settings_si_false):
"""Test export of DataFrame to influxdb v3"""
request = DwdObservationRequest(
parameter=DwdObservationDataset.CLIMATE_SUMMARY,
resolution=DwdObservationResolution.DAILY,
period=DwdObservationPeriod.RECENT,
settings=settings_si_false,
).filter_by_station_id(station_id=[1048])
with mock.patch(
"influxdb_client_3.InfluxDBClient3",
) as mock_client, mock.patch(
"influxdb_client_3.Point",
), mock.patch(
"influxdb_client_3.WriteOptions",
) as mock_write_options, mock.patch(
"influxdb_client_3.write_client_options",
) as mock_write_client_options, mock.patch(
"influxdb_client_3.write_client.client.write_api.WriteType",
) as mock_write_type:
df = request.values.all().df
ExportMixin(df=df).to_target("influxdb3://orga:token@localhost/?database=dwd&table=weather")
mock_write_options.assert_called_once_with(write_type=mock_write_type.synchronous)
mock_write_client_options.assert_called_once_with(WriteOptions=mock_write_options())
mock_client.assert_called_once_with(
host="localhost",
org="orga",
database="dwd",
token="token", # noqa: S106
write_client_options=mock_write_client_options(),
)
20 changes: 16 additions & 4 deletions tests/util/test_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,35 @@
def test_connectionstring_database_from_path():
url = "foobar://host:1234/dbname"
cs = ConnectionString(url)
assert cs.get_database() == "dbname"
assert cs.protocol == "foobar"
assert cs.host == "host"
assert cs.port == 1234
assert cs.database == "dbname"
assert cs.table == "weather"


def test_connectionstring_database_from_query_param():
url = "foobar://host:1234/?database=dbname"
cs = ConnectionString(url)
assert cs.get_database() == "dbname"
assert cs.database == "dbname"


def test_connectionstring_username_password_host():
url = "foobar://username:password@host/?database=dbname"
cs = ConnectionString(url)
assert cs.username == "username"
assert cs.password == "password" # noqa: S105
assert cs.host == "host"


def test_connectionstring_table_from_query_param():
url = "foobar://host:1234/?database=dbname&table=tablename"
cs = ConnectionString(url)
assert cs.get_table() == "tablename"
assert cs.table == "tablename"


def test_connectionstring_temporary_file(tmp_path):
filepath = tmp_path.joinpath("foobar.txt")
url = f"file://{filepath}"
cs = ConnectionString(url)
assert cs.get_path() == str(filepath)
assert cs.path == str(filepath)
112 changes: 82 additions & 30 deletions wetterdienst/core/timeseries/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import polars as pl

from wetterdienst.metadata.columns import Columns
from wetterdienst.util.polars_util import chunker
from wetterdienst.util.url import ConnectionString

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -131,12 +130,12 @@ def to_target(self, target: str):
log.info(f"Exporting records to {target}\n{self.df.select(pl.count())}")

connspec = ConnectionString(target)
protocol = connspec.url.scheme
database = connspec.get_database()
tablename = connspec.get_table()
protocol = connspec.protocol
database = connspec.database
tablename = connspec.table

if target.startswith("file://"):
filepath = connspec.get_path()
filepath = connspec.path

if target.endswith(".xlsx"):
log.info(f"Writing to spreadsheet file '{filepath}'")
Expand Down Expand Up @@ -295,6 +294,28 @@ def to_target(self, target: str):
alias fetch="wetterdienst values --provider=dwd --network=observation --parameter=kl --resolution=daily --period=recent --station=1048,4411"
fetch --target="influxdb2://${INFLUXDB_ORGANIZATION}:${INFLUXDB_TOKEN}@localhost/?database=dwd&table=weather"
Example queries::
influx query 'from(bucket:"dwd") |> range(start:-2d) |> limit(n: 10)'
==========================
InfluxDB 3.x database sink
==========================
Install Python driver::
pip install influxdb3_python
Acquire data::
INFLUXDB_ORGANIZATION=acme
INFLUXDB_TOKEN=t5PJry6TyepGsG7IY_n0K4VHp5uPvt9iap60qNHIXL4E6mW9dLmowGdNz0BDi6aK_bAbtD76Z7ddfho6luL2LA==
INFLUXDB_HOST="eu-central-1-1.aws.cloud2.influxdata.com"
alias fetch="wetterdienst values --provider=dwd --network=observation --parameter=kl --resolution=daily --period=recent --station=1048,4411"
fetch --target="influxdb3://${INFLUXDB_ORGANIZATION}:${INFLUXDB_TOKEN}@${INFLUXDB_HOST}/?database=dwd&table=weather"
Example queries::
influx query 'from(bucket:"dwd") |> range(start:-2d) |> limit(n: 10)'
Expand All @@ -304,52 +325,81 @@ def to_target(self, target: str):
version = 1
elif protocol in ["influxdb2", "influxdb2s"]:
version = 2
elif protocol in ["influxdb3", "influxdb3s"]:
version = 3
else:
raise KeyError(f"Unknown protocol variant '{protocol}' for InfluxDB")

log.info(f"Writing to InfluxDB version {version}. database={database}, table={tablename}")

# Setup the connection.
# Set up the connection.
if version == 1:
from influxdb import InfluxDBClient

client = InfluxDBClient(
host=connspec.url.hostname,
port=connspec.url.port or 8086,
username=connspec.url.username,
password=connspec.url.password,
host=connspec.host,
port=connspec.port or 8086,
username=connspec.username,
password=connspec.password,
database=database,
ssl=protocol.endswith("s"),
)
client.create_database(database)
elif version == 2:
from influxdb_client import InfluxDBClient, Point
from influxdb_client import InfluxDBClient as InfluxDBClientV2
from influxdb_client import Point as PointV2
from influxdb_client.client.write_api import SYNCHRONOUS

ssl = protocol.endswith("s")
url = f"http{ssl and 's' or ''}://{connspec.url.hostname}:{connspec.url.port or 8086}"
client = InfluxDBClient(url=url, org=connspec.url.username, token=connspec.url.password)
client = InfluxDBClientV2(url=url, org=connspec.username, token=connspec.password)
write_api = client.write_api(write_options=SYNCHRONOUS)
elif version == 3:
from influxdb_client_3 import (
InfluxDBClient3 as InfluxDBClientV3,
)
from influxdb_client_3 import (
Point as PointV3,
)
from influxdb_client_3 import (
WriteOptions,
write_client_options,
)
from influxdb_client_3.write_client.client.write_api import WriteType

write_options = WriteOptions(write_type=WriteType.synchronous)
wco = write_client_options(WriteOptions=write_options)
client_v3 = InfluxDBClientV3(
host=connspec.host,
org=connspec.username,
token=connspec.password,
write_client_options=wco,
database=database,
)

points = []
for items in chunker(self.df, chunksize=50000):
for record in items.iter_rows(named=True):
time = record.pop("date").isoformat()
fields = {k: v for k, v in record.items() if v is not None}
if not fields:
continue
if version == 1:
point = {
"measurement": tablename,
"time": time,
"fields": fields,
}
elif version == 2:
point = Point(tablename).time(time)
for field, value in fields.items():
point = point.field(field, value)

points.append(point)
for record in self.df.iter_rows(named=True):
# for record in items.iter_rows(named=True):
time = record.pop("date").isoformat()
fields = {k: v for k, v in record.items() if v is not None}
if not fields:
continue
if version == 1:
point = {
"measurement": tablename,
"time": time,
"fields": fields,
}
elif version == 2:
point = PointV2(tablename).time(time)
for field, value in fields.items():
point = point.field(field, value)
elif version == 3:
point = PointV3(tablename).time(time)
for field, value in fields.items():
point = point.field(field, value)

points.append(point)

# Write to InfluxDB.
if version == 1:
Expand All @@ -360,6 +410,8 @@ def to_target(self, target: str):
elif version == 2:
write_api.write(bucket=database, record=points)
write_api.close()
elif version == 3:
client_v3.write(record=points)

log.info("Writing to InfluxDB finished")

Expand Down
15 changes: 2 additions & 13 deletions wetterdienst/util/polars_util.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,11 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2018-2022, earthobservations developers.
# Distributed under the MIT License. See LICENSE for more info.
from typing import Generator, Tuple
from typing import List, Tuple

import polars as pl


def chunker(seq: pl.DataFrame, chunksize: int) -> Generator[pl.DataFrame, None, None]:
"""
Chunks generator function for iterating pandas Dataframes and Series.
https://stackoverflow.com/a/61798585
:return:
"""
for pos in range(0, len(seq), chunksize):
yield seq[pos : pos + chunksize]


def read_fwf_from_df(df: pl.DataFrame, column_specs: Tuple[Tuple[int, int], ...], header: bool = False) -> pl.DataFrame:
"""Function to split a column of a polars DataFrame into multiple columns by given column specs
:param df: the polars DataFrame of which a column is split
Expand All @@ -25,7 +14,7 @@ def read_fwf_from_df(df: pl.DataFrame, column_specs: Tuple[Tuple[int, int], ...]
:return: polars DataFrame with split columns
"""

def _get_columns(column: str):
def _get_columns(column: str) -> List[str]:
cols = []
for col_start, col_end in column_specs:
col = column[col_start : (col_end + 1)]
Expand Down
33 changes: 28 additions & 5 deletions wetterdienst/util/url.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,44 @@ def __init__(self, url):
self.url_raw = url
self.url = urlparse(url)

def get_database(self):
@property
def protocol(self):
return self.url.scheme

@property
def host(self):
return self.url.hostname

@property
def port(self):
return self.url.port

@property
def username(self):
return self.url.username

@property
def password(self):
return self.url.password

@property
def database(self):
# Try to get database name from query parameter.
database = self.get_query_param("database")
database = self.get_query_param("database") or self.get_query_param("bucket")

# Try to get database name from URL path.
if database is None:
if not database:
if self.url.path.startswith("/"):
database = self.url.path[1:]

return database or "dwd"

def get_table(self):
@property
def table(self):
return self.get_query_param("table") or "weather"

def get_path(self):
@property
def path(self):
return self.url.path or self.url.netloc

def get_query_param(self, name):
Expand Down

0 comments on commit 14af9cc

Please sign in to comment.