Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions Examples/pokemon-trainer/write-batching-flight-calloptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from influxdb_client_3 import InfluxDBClient3,InfluxDBError,WriteOptions,write_client_options
import pandas as pd
import numpy as np
import random


now = pd.Timestamp.now(tz='UTC').floor('ms')
two_days_ago = now


class BatchingCallback(object):

def success(self, conf, data: str):
print(f"Written batch: {conf}, data: {data}")

def error(self, conf, data: str, exception: InfluxDBError):
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

def retry(self, conf, data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")

callback = BatchingCallback()


write_options = WriteOptions(batch_size=100,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)

wco = write_client_options(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry,
WriteOptions=write_options
)

client = InfluxDBClient3(
token="",
host="eu-central-1-1.aws.cloud2.influxdata.com",
org="6a841c0c08328fb1", enable_gzip=True, write_client_options=wco)

now = pd.Timestamp.now(tz='UTC').floor('ms')

# Lists of possible trainers
trainers = ["ash", "brock", "misty", "gary", "jessie", "james"]

# Read the CSV into a DataFrame
pokemon_df = pd.read_csv("https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv")

# Creating an empty list to store the data
data = []

# Dictionary to keep track of the number of times each trainer has caught each Pokémon
trainer_pokemon_counts = {}

# Number of entries we want to create
num_entries = 1000

# Generating random data
for i in range(num_entries):
trainer = random.choice(trainers)

# Randomly select a row from pokemon_df
random_pokemon = pokemon_df.sample().iloc[0]
caught = random_pokemon['Name']

# Count the number of times this trainer has caught this Pokémon
if (trainer, caught) in trainer_pokemon_counts:
trainer_pokemon_counts[(trainer, caught)] += 1
else:
trainer_pokemon_counts[(trainer, caught)] = 1

# Get the number for this combination of trainer and Pokémon
num = trainer_pokemon_counts[(trainer, caught)]

entry = {
"trainer": trainer,
"id": f"{0000 + random_pokemon['#']:04d}",
"num": str(num),
"caught": caught,
"level": random.randint(5, 20),
"attack": random_pokemon['Attack'],
"defense": random_pokemon['Defense'],
"hp": random_pokemon['HP'],
"speed": random_pokemon['Speed'],
"type1": random_pokemon['Type 1'],
"type2": random_pokemon['Type 2'],
"timestamp": two_days_ago
}
data.append(entry)

# Convert the list of dictionaries to a DataFrame
caught_pokemon_df = pd.DataFrame(data).set_index('timestamp')

# Print the DataFrame
#print(caught_pokemon_df)





# Query
try:
table = client.query(query='''SELECT * FROM caught WHERE time >= now() - 30m''', database='pokemon-codex', timeout=90.0, language='sql', mode='pandas')
print(table)
except Exception as e:
print(f"Error querying points: {e}")
38 changes: 25 additions & 13 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def default_client_options(**kwargs):
def flight_client_options(**kwargs):
return kwargs # You can replace this with a specific data structure if needed


class InfluxDBClient3:
def __init__(
self,
Expand Down Expand Up @@ -47,6 +48,7 @@ def __init__(
"""
self._org = org
self._database = database
self._token = token
self._write_client_options = write_client_options if write_client_options is not None else default_client_options(write_options=SYNCHRONOUS)

# Extracting the hostname from URL if provided
Expand All @@ -55,32 +57,34 @@ def __init__(

self._client = _InfluxDBClient(
url=f"https://{host}",
token=token,
token=self._token,
org=self._org,
**kwargs)

self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options)
self._flight_client_options = flight_client_options or {}
self._flight_client = FlightClient(f"grpc+tls://{host}:443", **self._flight_client_options)

# Create an authorization header
self._options = FlightCallOptions(headers=[(b"authorization", f"Bearer {token}".encode('utf-8'))])

def write(self, record=None, **kwargs):

def write(self, record=None, database=None ,**kwargs):
"""
Write data to InfluxDB.

:param record: The data point(s) to write.
:type record: Point or list of Point objects
:param kwargs: Additional arguments to pass to the write API.
"""
if database is None:
database = self._database

try:
self._write_api.write(bucket=self._database, record=record, **kwargs)
self._write_api.write(bucket=database, record=record, **kwargs)
except InfluxDBError as e:
raise e


def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', **kwargs):
def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', database=None, **kwargs):
"""
Write data from a file to InfluxDB.

Expand All @@ -94,15 +98,18 @@ def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_co
:type timestamp_column: str
:param kwargs: Additional arguments to pass to the write API.
"""
if database is None:
database = self._database

try:
table = upload_file(file).load_file()
df = table.to_pandas() if isinstance(table, pa.Table) else table
self._process_dataframe(df, measurement_name, tag_columns or [], timestamp_column)
self._process_dataframe(df, measurement_name, tag_columns or [], timestamp_column, database=database, **kwargs)
except Exception as e:
raise e


def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column):
def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column, database, **kwargs):
# This function is factored out for clarity.
# It processes a DataFrame before writing to InfluxDB.

Expand All @@ -120,12 +127,12 @@ def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column
print("'measurement' column not found in the dataframe.")
else:
df = df.drop(columns=['measurement'], errors='ignore')
self._write_api.write(bucket=self._database, record=df,
self._write_api.write(bucket=database, record=df,
data_frame_measurement_name=measurement_name,
data_frame_tag_columns=tag_columns,
data_frame_timestamp_column=timestamp_column)
data_frame_timestamp_column=timestamp_column, **kwargs)

def query(self, query, language="sql", mode="all"):
def query(self, query, language="sql", mode="all", database=None, **kwargs ):
"""
Query data from InfluxDB.

Expand All @@ -137,10 +144,15 @@ def query(self, query, language="sql", mode="all"):
:type mode: str
:return: The queried data.
"""
if database is None:
database = self._database

try:
ticket_data = {"database": self._database, "sql_query": query, "query_type": language}
# Create an authorization header
_options = FlightCallOptions(headers=[(b"authorization", f"Bearer {self._token}".encode('utf-8'))], **kwargs)
ticket_data = {"database": database, "sql_query": query, "query_type": language}
ticket = Ticket(json.dumps(ticket_data).encode('utf-8'))
flight_reader = self._flight_client.do_get(ticket, self._options)
flight_reader = self._flight_client.do_get(ticket, _options)

mode_func = {
"all": flight_reader.read_all,
Expand Down