From 067e6499eabcd9a47156968ea20b6b1c72bd9b8b Mon Sep 17 00:00:00 2001 From: Jay Clifford Date: Thu, 13 Jul 2023 12:21:45 +0100 Subject: [PATCH] added flight option and database paramter --- .../write-batching-flight-calloptions.py | 109 ++++++++++++++++++ influxdb_client_3/__init__.py | 38 +++--- 2 files changed, 134 insertions(+), 13 deletions(-) create mode 100644 Examples/pokemon-trainer/write-batching-flight-calloptions.py diff --git a/Examples/pokemon-trainer/write-batching-flight-calloptions.py b/Examples/pokemon-trainer/write-batching-flight-calloptions.py new file mode 100644 index 0000000..de11010 --- /dev/null +++ b/Examples/pokemon-trainer/write-batching-flight-calloptions.py @@ -0,0 +1,109 @@ +from influxdb_client_3 import InfluxDBClient3,InfluxDBError,WriteOptions,write_client_options +import pandas as pd +import numpy as np +import random + + +now = pd.Timestamp.now(tz='UTC').floor('ms') +two_days_ago = now + + +class BatchingCallback(object): + + def success(self, conf, data: str): + print(f"Written batch: {conf}, data: {data}") + + def error(self, conf, data: str, exception: InfluxDBError): + print(f"Cannot write batch: {conf}, data: {data} due: {exception}") + + def retry(self, conf, data: str, exception: InfluxDBError): + print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") + +callback = BatchingCallback() + + +write_options = WriteOptions(batch_size=100, + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) + +wco = write_client_options(success_callback=callback.success, + error_callback=callback.error, + retry_callback=callback.retry, + WriteOptions=write_options + ) + +client = InfluxDBClient3( + token="", + host="eu-central-1-1.aws.cloud2.influxdata.com", + org="6a841c0c08328fb1", enable_gzip=True, write_client_options=wco) + +now = pd.Timestamp.now(tz='UTC').floor('ms') + +# Lists of possible trainers +trainers = ["ash", "brock", "misty", "gary", "jessie", "james"] + +# Read the CSV into a DataFrame +pokemon_df = pd.read_csv("https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv") + +# Creating an empty list to store the data +data = [] + +# Dictionary to keep track of the number of times each trainer has caught each Pokémon +trainer_pokemon_counts = {} + +# Number of entries we want to create +num_entries = 1000 + +# Generating random data +for i in range(num_entries): + trainer = random.choice(trainers) + + # Randomly select a row from pokemon_df + random_pokemon = pokemon_df.sample().iloc[0] + caught = random_pokemon['Name'] + + # Count the number of times this trainer has caught this Pokémon + if (trainer, caught) in trainer_pokemon_counts: + trainer_pokemon_counts[(trainer, caught)] += 1 + else: + trainer_pokemon_counts[(trainer, caught)] = 1 + + # Get the number for this combination of trainer and Pokémon + num = trainer_pokemon_counts[(trainer, caught)] + + entry = { + "trainer": trainer, + "id": f"{0000 + random_pokemon['#']:04d}", + "num": str(num), + "caught": caught, + "level": random.randint(5, 20), + "attack": random_pokemon['Attack'], + "defense": random_pokemon['Defense'], + "hp": random_pokemon['HP'], + "speed": random_pokemon['Speed'], + "type1": random_pokemon['Type 1'], + "type2": random_pokemon['Type 2'], + "timestamp": two_days_ago + } + data.append(entry) + +# Convert the list of dictionaries to a DataFrame +caught_pokemon_df = pd.DataFrame(data).set_index('timestamp') + +# Print the DataFrame +#print(caught_pokemon_df) + + + + + +# Query +try: + table = client.query(query='''SELECT * FROM caught WHERE time >= now() - 30m''', database='pokemon-codex', timeout=90.0, language='sql', mode='pandas') + print(table) +except Exception as e: + print(f"Error querying points: {e}") diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 530a4d2..f2de362 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -18,6 +18,7 @@ def default_client_options(**kwargs): def flight_client_options(**kwargs): return kwargs # You can replace this with a specific data structure if needed + class InfluxDBClient3: def __init__( self, @@ -47,6 +48,7 @@ def __init__( """ self._org = org self._database = database + self._token = token self._write_client_options = write_client_options if write_client_options is not None else default_client_options(write_options=SYNCHRONOUS) # Extracting the hostname from URL if provided @@ -55,7 +57,7 @@ def __init__( self._client = _InfluxDBClient( url=f"https://{host}", - token=token, + token=self._token, org=self._org, **kwargs) @@ -63,10 +65,9 @@ def __init__( self._flight_client_options = flight_client_options or {} self._flight_client = FlightClient(f"grpc+tls://{host}:443", **self._flight_client_options) - # Create an authorization header - self._options = FlightCallOptions(headers=[(b"authorization", f"Bearer {token}".encode('utf-8'))]) - def write(self, record=None, **kwargs): + + def write(self, record=None, database=None ,**kwargs): """ Write data to InfluxDB. @@ -74,13 +75,16 @@ def write(self, record=None, **kwargs): :type record: Point or list of Point objects :param kwargs: Additional arguments to pass to the write API. """ + if database is None: + database = self._database + try: - self._write_api.write(bucket=self._database, record=record, **kwargs) + self._write_api.write(bucket=database, record=record, **kwargs) except InfluxDBError as e: raise e - def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', **kwargs): + def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', database=None, **kwargs): """ Write data from a file to InfluxDB. @@ -94,15 +98,18 @@ def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_co :type timestamp_column: str :param kwargs: Additional arguments to pass to the write API. """ + if database is None: + database = self._database + try: table = upload_file(file).load_file() df = table.to_pandas() if isinstance(table, pa.Table) else table - self._process_dataframe(df, measurement_name, tag_columns or [], timestamp_column) + self._process_dataframe(df, measurement_name, tag_columns or [], timestamp_column, database=database, **kwargs) except Exception as e: raise e - def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column): + def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column, database, **kwargs): # This function is factored out for clarity. # It processes a DataFrame before writing to InfluxDB. @@ -120,12 +127,12 @@ def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column print("'measurement' column not found in the dataframe.") else: df = df.drop(columns=['measurement'], errors='ignore') - self._write_api.write(bucket=self._database, record=df, + self._write_api.write(bucket=database, record=df, data_frame_measurement_name=measurement_name, data_frame_tag_columns=tag_columns, - data_frame_timestamp_column=timestamp_column) + data_frame_timestamp_column=timestamp_column, **kwargs) - def query(self, query, language="sql", mode="all"): + def query(self, query, language="sql", mode="all", database=None, **kwargs ): """ Query data from InfluxDB. @@ -137,10 +144,15 @@ def query(self, query, language="sql", mode="all"): :type mode: str :return: The queried data. """ + if database is None: + database = self._database + try: - ticket_data = {"database": self._database, "sql_query": query, "query_type": language} + # Create an authorization header + _options = FlightCallOptions(headers=[(b"authorization", f"Bearer {self._token}".encode('utf-8'))], **kwargs) + ticket_data = {"database": database, "sql_query": query, "query_type": language} ticket = Ticket(json.dumps(ticket_data).encode('utf-8')) - flight_reader = self._flight_client.do_get(ticket, self._options) + flight_reader = self._flight_client.do_get(ticket, _options) mode_func = { "all": flight_reader.read_all,