From 93a9ffeea363df3a6ba5e80779604df3c453c338 Mon Sep 17 00:00:00 2001 From: Jay Clifford Date: Thu, 1 Jun 2023 14:15:34 +0100 Subject: [PATCH] added write_client_option --- Examples/batching-example.py | 44 ++++++++++++++++++++++++++--------- influxdb_client_3/__init__.py | 25 +++++++++++--------- 2 files changed, 47 insertions(+), 22 deletions(-) diff --git a/Examples/batching-example.py b/Examples/batching-example.py index 329ad5a..c3c63ac 100644 --- a/Examples/batching-example.py +++ b/Examples/batching-example.py @@ -5,11 +5,23 @@ import influxdb_client_3 as InfluxDBClient3 import pandas as pd import numpy as np -from influxdb_client_3 import write_options, WritePrecision +from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError import datetime import time +class BatchingCallback(object): + + def success(self, conf, data: str): + print(f"Written batch: {conf}, data: {data}") + + def error(self, conf, data: str, exception: InfluxDBError): + print(f"Cannot write batch: {conf}, data: {data} due: {exception}") + + def retry(self, conf, data: str, exception: InfluxDBError): + print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") + + # Creating 5.000 gatewayId values as MongoDB ObjectIDs gatewayIds = [ObjectId() for x in range(0, 100)] @@ -18,27 +30,37 @@ # Setting timestamp for first sensor reading now = datetime.datetime.now() -now = now - datetime.timedelta(days=366) +now = now - datetime.timedelta(days=30) teststart = datetime.datetime.now() # InfluxDB connection details token = "" org = "" -bucket = "" -url = "" +database = "" +url = "eu-central-1-1.aws.cloud2.influxdata.com" + +callback = BatchingCallback() + +write_options = WriteOptions(batch_size=5_000, + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) +wco = write_client_options(success_callback=callback.success, + error_callback=callback.error, + retry_callback=callback.retry, + WriteOptions=write_options + ) # Opening InfluxDB client with a batch size of 5k points or flush interval # of 10k ms and gzip compression with InfluxDBClient3.InfluxDBClient3(token=token, host=url, org=org, - database="solarmanager", enable_gzip=True, write_options=write_options(batch_size=5_000, - flush_interval=10_000, - jitter_interval=2_000, - retry_interval=5_000, - max_retries=5, - max_retry_delay=30_000, - exponential_base=2, write_type='batching')) as _client: + database=database, enable_gzip=True, write_client_options=wco) as _client: + # Creating iterator for one hour worth of data (6 sensor readings per # minute) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 56c7ae0..33d6705 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -3,17 +3,17 @@ from pyarrow import csv from pyarrow.flight import FlightClient, Ticket, FlightCallOptions from influxdb_client import InfluxDBClient as _InfluxDBClient -from influxdb_client import WriteOptions as _WriteOptions +from influxdb_client import WriteOptions as WriteOptions from influxdb_client.client.write_api import WriteApi as _WriteApi -from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS -from influxdb_client.client.write_api import PointSettings +from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, PointSettings from influxdb_client.domain.write_precision import WritePrecision +from influxdb_client.client.exceptions import InfluxDBError from influxdb_client import Point import json -def write_options(**kwargs): - return _WriteOptions(**kwargs) +def write_client_options(**kwargs): + return kwargs def flight_client_options(**kwargs): @@ -27,7 +27,7 @@ def __init__( org=None, database=None, token=None, - write_options=None, + write_client_options=None, flight_client_options=None, **kwargs): """ @@ -36,24 +36,26 @@ def __init__( * org (str, optional): The InfluxDB organization name to be used for operations. Defaults to None. * database (str, optional): The database to be used for InfluxDB operations. Defaults to None. * token (str, optional): The authentication token for accessing the InfluxDB server. Defaults to None. - * write_options (enum, optional): Specifies the write mode (synchronous or asynchronous) to use when writing data points to InfluxDB. Defaults to SYNCHRONOUS. + * write_options (ANY, optional): Exposes InfuxDB WriteAPI options. * **kwargs: Additional arguments to be passed to the InfluxDB Client. """ self._org = org self._database = database - self.write_options = write_options if write_options is not None else SYNCHRONOUS + self.write_client_options = write_client_options if write_client_options is not None else write_client_options(write_options=SYNCHRONOUS) self._client = _InfluxDBClient( url=f"https://{host}", token=token, org=self._org, **kwargs) + self._write_api = _WriteApi( - self._client, write_options=self.write_options) + self._client, **self.write_client_options) self._flight_client_options = flight_client_options if flight_client_options is not None else {} self._flight_client = FlightClient( f"grpc+tls://{host}:443", **self._flight_client_options) + # create an authorization header self._options = FlightCallOptions( headers=[(b"authorization", f"Bearer {token}".encode('utf-8'))]) @@ -140,6 +142,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): "PointSettings", "SYNCHRONOUS", "ASYNCHRONOUS", - "write_options", + "write_client_options", "WritePrecision", - "flight_client_options"] + "flight_client_options", + "WriteOptions"]