Skip to content

Commit

Permalink
Export: Use InfluxDBClient instead of DataFrameClient
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Jun 6, 2021
1 parent 1d3f2c8 commit 1ecab8f
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Expand Up @@ -6,6 +6,7 @@ Development

- Change cli base to click
- Add support for wetterdienst core API in cli and restapi
- Export: Use InfluxDBClient instead of DataFrameClient and improve connection handling with InfluxDB 1.x

0.19.0 (14.05.2021)
*******************
Expand Down
31 changes: 25 additions & 6 deletions wetterdienst/core/scalar/export.py
Expand Up @@ -11,6 +11,7 @@
import pytz

from wetterdienst.metadata.columns import Columns
from wetterdienst.util.pandas import chunker
from wetterdienst.util.url import ConnectionString

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -295,15 +296,15 @@ def to_target(self, target: str):
Acquire data::
wetterdienst dwd observations values --station=1048,4411 --parameter=kl --resolution=daily --period=recent --target="influxdb://localhost/?database=dwd&table=weather"
wetterdienst values --provider=dwd --kind=observation --parameter=kl --resolution=daily --period=recent --station=1048,4411 --target="influxdb://localhost/?database=dwd&table=weather"
Example queries::
http 'localhost:8086/query?db=dwd&q=SELECT * FROM weather;'
http 'localhost:8086/query?db=dwd&q=SELECT COUNT(*) FROM weather;'
"""
log.info(f"Writing to InfluxDB. database={database}, table={tablename}")
from influxdb.dataframe_client import DataFrameClient
from influxdb import InfluxDBClient

# 1. Mungle the data frame.
# Use the "date" column as appropriate timestamp index.
Expand Down Expand Up @@ -348,7 +349,7 @@ def to_target(self, target: str):
tag_columns.append(column)

# Setup the connection.
c = DataFrameClient(
c = InfluxDBClient(
host=connspec.url.hostname,
port=connspec.url.port or 8086,
username=connspec.url.username,
Expand All @@ -363,11 +364,29 @@ def to_target(self, target: str):
# will fail.
# https://github.com/pandas-dev/pandas/issues/32988

points = []
for items in chunker(df, chunksize=50000):

for date, record in items.iterrows():

record = record.dropna()
if record.empty:
continue

point = {
"measurement": tablename,
"tags": {
tag: record.pop(tag)
for tag in tag_columns if tag in record
},
"time": date.isoformat(),
"fields": record.dropna().to_dict()
}
points.append(point)

# Write to InfluxDB.
c.write_points(
dataframe=df,
measurement=tablename,
tag_columns=tag_columns,
points=points,
batch_size=50000,
)
log.info("Writing to InfluxDB finished")
Expand Down
12 changes: 12 additions & 0 deletions wetterdienst/util/pandas.py
@@ -0,0 +1,12 @@
import pandas as pd


def chunker(seq: pd.DataFrame, chunksize: int):
"""
Chunks generator function for iterating pandas Dataframes and Series.
https://stackoverflow.com/a/61798585
:return:
"""
for pos in range(0, len(seq), chunksize):
yield seq.iloc[pos:pos + chunksize]

0 comments on commit 1ecab8f

Please sign in to comment.