From f571d3ec79ef448d90d17d475fe95f2bb51bcd2c Mon Sep 17 00:00:00 2001 From: Jason Stirnaman Date: Tue, 26 Sep 2023 17:24:25 -0500 Subject: [PATCH 1/6] fix: Allow custom headers and other arguments supported by FlightCallOptions - Allow passing FlightCallOptions as keyword args in query(). - Merges default args with custom args set by the user, and then passes them into the FlightCallOptions constructor. - Only supports headers for now. - Successfully tested with `iox-debug` header. - TODO: verify that the server is receiving the specified `trace-id` header. - TODO: test InfluxDB support for other FlightCallOptions (e.g. timeout). --- influxdb_client_3/__init__.py | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 699678b..b592d9f 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -41,6 +41,23 @@ def file_parser_options(**kwargs): return kwargs +def _deep_merge(target, source): + if isinstance(target, dict) and isinstance(source, dict): + for key, value in source.items(): + if key in target and isinstance(target[key], (dict, list)) and isinstance(value, (dict, list)): + # If both target and source values are dictionaries or lists, merge them recursively + target[key] = _deep_merge(target[key], value) + else: + # Otherwise, replace the target value with the source value + target[key] = value + elif isinstance(target, list) and isinstance(source, list): + # If both target and source are lists, concatenate them + target.extend(source) + else: + # For other types, simply replace the target with the source + target = source + return target + class InfluxDBClient3: def __init__( self, @@ -172,7 +189,8 @@ def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column data_frame_measurement_name=measurement_name, data_frame_tag_columns=tag_columns, data_frame_timestamp_column=timestamp_column, **kwargs) - + + def query(self, query, language="sql", mode="all", database=None,**kwargs ): """ Query data from InfluxDB. @@ -185,7 +203,7 @@ def query(self, query, language="sql", mode="all", database=None,**kwargs ): :type mode: str :param database: The database to query from. If not provided, uses the database provided during initialization. :type database: str - :param kwargs: Additional arguments for the query. + :param kwargs: FlightClientCallOptions for the query. :return: The queried data. """ @@ -194,10 +212,14 @@ def query(self, query, language="sql", mode="all", database=None,**kwargs ): database = self._database try: - headers = [(b"authorization", f"Bearer {self._token}".encode('utf-8'))] - # Create an authorization header - _options = FlightCallOptions(headers=headers, **kwargs) + args = {"headers": [(b"authorization", f"Bearer {self._token}".encode('utf-8'))]} + custom_args = {key: value for key, value in kwargs.items()} + # Merge defaults with user-provided arguments + _deep_merge(args, custom_args) + print(args["headers"]) + _options = FlightCallOptions(headers=args["headers"]) + print(_options) ticket_data = {"database": database, "sql_query": query, "query_type": language} ticket = Ticket(json.dumps(ticket_data).encode('utf-8')) flight_reader = self._flight_client.do_get(ticket, _options) @@ -225,8 +247,7 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.close() - - + __all__ = [ "InfluxDBClient3", "Point", From d3922958ec85b7c79e0cee38526ba24d6a0a0d67 Mon Sep 17 00:00:00 2001 From: Jason Stirnaman Date: Wed, 27 Sep 2023 08:55:28 -0500 Subject: [PATCH 2/6] Update influxdb_client_3/__init__.py Co-authored-by: Jay Clifford <45856600+Jayclifford345@users.noreply.github.com> --- influxdb_client_3/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index b592d9f..b8f2327 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -219,7 +219,6 @@ def query(self, query, language="sql", mode="all", database=None,**kwargs ): _deep_merge(args, custom_args) print(args["headers"]) _options = FlightCallOptions(headers=args["headers"]) - print(_options) ticket_data = {"database": database, "sql_query": query, "query_type": language} ticket = Ticket(json.dumps(ticket_data).encode('utf-8')) flight_reader = self._flight_client.do_get(ticket, _options) From fb3cee0d1aee6907c42fb6c484e1bd4e2150c834 Mon Sep 17 00:00:00 2001 From: Jason Stirnaman Date: Wed, 27 Sep 2023 08:55:34 -0500 Subject: [PATCH 3/6] Update influxdb_client_3/__init__.py Co-authored-by: Jay Clifford <45856600+Jayclifford345@users.noreply.github.com> --- influxdb_client_3/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index b8f2327..c87f294 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -217,7 +217,6 @@ def query(self, query, language="sql", mode="all", database=None,**kwargs ): custom_args = {key: value for key, value in kwargs.items()} # Merge defaults with user-provided arguments _deep_merge(args, custom_args) - print(args["headers"]) _options = FlightCallOptions(headers=args["headers"]) ticket_data = {"database": database, "sql_query": query, "query_type": language} ticket = Ticket(json.dumps(ticket_data).encode('utf-8')) From 4abd41c710e79f85333ba81258b10daff54d05b0 Mon Sep 17 00:00:00 2001 From: Jason Stirnaman Date: Wed, 27 Sep 2023 10:55:53 -0500 Subject: [PATCH 4/6] fix: Refactor options merging to a client function, only merge, if kwargs has items, add query timeout option. --- influxdb_client_3/__init__.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index c87f294..247ef5d 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -117,7 +117,11 @@ def __init__( port = query_port_overwrite self._flight_client = FlightClient(f"grpc+tls://{hostname}:{port}", **self._flight_client_options) - + # Merge defaults with user-provided arguments + def _merge_options(self, defaults, custom={}): + if len(custom) == 0: + return defaults + return _deep_merge(defaults, {key: value for key, value in custom.items()}) def write(self, record=None, database=None ,**kwargs): """ @@ -213,11 +217,14 @@ def query(self, query, language="sql", mode="all", database=None,**kwargs ): try: # Create an authorization header - args = {"headers": [(b"authorization", f"Bearer {self._token}".encode('utf-8'))]} - custom_args = {key: value for key, value in kwargs.items()} - # Merge defaults with user-provided arguments - _deep_merge(args, custom_args) - _options = FlightCallOptions(headers=args["headers"]) + optargs = { + "headers": [(b"authorization", f"Bearer {self._token}".encode('utf-8'))], + "timeout": 300 + } + opts = self._merge_options(optargs, kwargs) + _options = FlightCallOptions( + headers=opts["headers"], + timeout=opts["timeout"]) ticket_data = {"database": database, "sql_query": query, "query_type": language} ticket = Ticket(json.dumps(ticket_data).encode('utf-8')) flight_reader = self._flight_client.do_get(ticket, _options) From bd5b55fbb567b3c10d4f7fbae974d921a6833b6e Mon Sep 17 00:00:00 2001 From: Jason Stirnaman Date: Wed, 4 Oct 2023 12:12:17 -0500 Subject: [PATCH 5/6] fix: Unpack all provided options and pass into FlightCallOptions. --- influxdb_client_3/__init__.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 247ef5d..a488e2e 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -42,6 +42,10 @@ def file_parser_options(**kwargs): def _deep_merge(target, source): + """ + Performs a deep merge of dictionaries or lists, + recursively merging the contents, handling nested structures, and concatenation of lists. + """ if isinstance(target, dict) and isinstance(source, dict): for key, value in source.items(): if key in target and isinstance(target[key], (dict, list)) and isinstance(value, (dict, list)): @@ -117,8 +121,10 @@ def __init__( port = query_port_overwrite self._flight_client = FlightClient(f"grpc+tls://{hostname}:{port}", **self._flight_client_options) - # Merge defaults with user-provided arguments def _merge_options(self, defaults, custom={}): + """ + Merge default option arguments with custom (user-provided) arguments. + """ if len(custom) == 0: return defaults return _deep_merge(defaults, {key: value for key, value in custom.items()}) @@ -222,9 +228,7 @@ def query(self, query, language="sql", mode="all", database=None,**kwargs ): "timeout": 300 } opts = self._merge_options(optargs, kwargs) - _options = FlightCallOptions( - headers=opts["headers"], - timeout=opts["timeout"]) + _options = FlightCallOptions(**opts) ticket_data = {"database": database, "sql_query": query, "query_type": language} ticket = Ticket(json.dumps(ticket_data).encode('utf-8')) flight_reader = self._flight_client.do_get(ticket, _options) From 8c9b9b13f8833939b6510a8ff71281e8118ae39d Mon Sep 17 00:00:00 2001 From: Jay Clifford Date: Fri, 13 Oct 2023 16:25:20 +0100 Subject: [PATCH 6/6] Removed old trace example --- Examples/debugging/get_trace.py | 28 ---------------------------- influxdb_client_3/__init__.py | 1 + influxdb_client_3/debug.py | 19 ------------------- 3 files changed, 1 insertion(+), 47 deletions(-) delete mode 100644 Examples/debugging/get_trace.py delete mode 100644 influxdb_client_3/debug.py diff --git a/Examples/debugging/get_trace.py b/Examples/debugging/get_trace.py deleted file mode 100644 index 5404135..0000000 --- a/Examples/debugging/get_trace.py +++ /dev/null @@ -1,28 +0,0 @@ -from influxdb_client_3 import InfluxDBClient3 -import pandas as pd -from influxdb_client_3.debug import TracingClientMiddleWareFactory - - - -client = InfluxDBClient3( - token="", - host="eu-central-1-1.aws.cloud2.influxdata.com", - org="6a841c0c08328fb1", - database="pokemon-codex", - flight_client_options={"middleware": (TracingClientMiddleWareFactory(),)}) - - -sql = '''SELECT * FROM caught WHERE trainer = 'ash' AND time >= now() - interval '1 hour' LIMIT 5''' -table = client.query(query=sql, language='sql', mode='all') -print(table) - - -influxql = '''SELECT * FROM caught WHERE trainer = 'ash' AND time > now() - 1h LIMIT 5''' -reader = client.query(query=influxql, language='influxql', mode='chunk') -try: - while True: - batch, buff = reader.read_chunk() - print("batch:") - print(buff) -except StopIteration: - print("No more chunks to read") \ No newline at end of file diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index a488e2e..5c80b2e 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -229,6 +229,7 @@ def query(self, query, language="sql", mode="all", database=None,**kwargs ): } opts = self._merge_options(optargs, kwargs) _options = FlightCallOptions(**opts) + ticket_data = {"database": database, "sql_query": query, "query_type": language} ticket = Ticket(json.dumps(ticket_data).encode('utf-8')) flight_reader = self._flight_client.do_get(ticket, _options) diff --git a/influxdb_client_3/debug.py b/influxdb_client_3/debug.py deleted file mode 100644 index 73cd866..0000000 --- a/influxdb_client_3/debug.py +++ /dev/null @@ -1,19 +0,0 @@ -import pyarrow.flight as flight - -class TracingClientMiddleWareFactory(flight.ClientMiddleware): - def start_call(self, info): - print("Starting new call:", info) - return TracingClientMiddleware() - -class TracingClientMiddleware(flight.ClientMiddleware): - def sending_headers(self): - print("Sending trace ID:", "traceheader") - return { - "x-tracing-id": "traceheader", - } - - def received_headers(self, headers): - if "trace-id" in headers: - trace_id = headers["trace-id"][0] - print("Found trace header with value:", trace_id) - # Don't overwrite our trace ID \ No newline at end of file