Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions Examples/batching-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,23 @@
import influxdb_client_3 as InfluxDBClient3
import pandas as pd
import numpy as np
from influxdb_client_3 import write_options, WritePrecision
from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError
import datetime
import time


class BatchingCallback(object):

def success(self, conf, data: str):
print(f"Written batch: {conf}, data: {data}")

def error(self, conf, data: str, exception: InfluxDBError):
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

def retry(self, conf, data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")


# Creating 5.000 gatewayId values as MongoDB ObjectIDs
gatewayIds = [ObjectId() for x in range(0, 100)]

Expand All @@ -18,27 +30,37 @@

# Setting timestamp for first sensor reading
now = datetime.datetime.now()
now = now - datetime.timedelta(days=366)
now = now - datetime.timedelta(days=30)
teststart = datetime.datetime.now()

# InfluxDB connection details
token = ""
org = ""
bucket = ""
url = ""
database = ""
url = "eu-central-1-1.aws.cloud2.influxdata.com"

callback = BatchingCallback()

write_options = WriteOptions(batch_size=5_000,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)

wco = write_client_options(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry,
WriteOptions=write_options
)
# Opening InfluxDB client with a batch size of 5k points or flush interval
# of 10k ms and gzip compression
with InfluxDBClient3.InfluxDBClient3(token=token,
host=url,
org=org,
database="solarmanager", enable_gzip=True, write_options=write_options(batch_size=5_000,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2, write_type='batching')) as _client:
database=database, enable_gzip=True, write_client_options=wco) as _client:


# Creating iterator for one hour worth of data (6 sensor readings per
# minute)
Expand Down
25 changes: 14 additions & 11 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
from pyarrow import csv
from pyarrow.flight import FlightClient, Ticket, FlightCallOptions
from influxdb_client import InfluxDBClient as _InfluxDBClient
from influxdb_client import WriteOptions as _WriteOptions
from influxdb_client import WriteOptions as WriteOptions
from influxdb_client.client.write_api import WriteApi as _WriteApi
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
from influxdb_client.client.write_api import PointSettings
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, PointSettings
from influxdb_client.domain.write_precision import WritePrecision
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client import Point
import json


def write_options(**kwargs):
return _WriteOptions(**kwargs)
def write_client_options(**kwargs):
return kwargs


def flight_client_options(**kwargs):
Expand All @@ -27,7 +27,7 @@ def __init__(
org=None,
database=None,
token=None,
write_options=None,
write_client_options=None,
flight_client_options=None,
**kwargs):
"""
Expand All @@ -36,24 +36,26 @@ def __init__(
* org (str, optional): The InfluxDB organization name to be used for operations. Defaults to None.
* database (str, optional): The database to be used for InfluxDB operations. Defaults to None.
* token (str, optional): The authentication token for accessing the InfluxDB server. Defaults to None.
* write_options (enum, optional): Specifies the write mode (synchronous or asynchronous) to use when writing data points to InfluxDB. Defaults to SYNCHRONOUS.
* write_options (ANY, optional): Exposes InfuxDB WriteAPI options.
* **kwargs: Additional arguments to be passed to the InfluxDB Client.
"""
self._org = org
self._database = database
self.write_options = write_options if write_options is not None else SYNCHRONOUS
self.write_client_options = write_client_options if write_client_options is not None else write_client_options(write_options=SYNCHRONOUS)
self._client = _InfluxDBClient(
url=f"https://{host}",
token=token,
org=self._org,
**kwargs)

self._write_api = _WriteApi(
self._client, write_options=self.write_options)
self._client, **self.write_client_options)

self._flight_client_options = flight_client_options if flight_client_options is not None else {}
self._flight_client = FlightClient(
f"grpc+tls://{host}:443",
**self._flight_client_options)

# create an authorization header
self._options = FlightCallOptions(
headers=[(b"authorization", f"Bearer {token}".encode('utf-8'))])
Expand Down Expand Up @@ -140,6 +142,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
"PointSettings",
"SYNCHRONOUS",
"ASYNCHRONOUS",
"write_options",
"write_client_options",
"WritePrecision",
"flight_client_options"]
"flight_client_options",
"WriteOptions"]