diff --git a/Examples/debugging/get_trace.py b/Examples/debugging/get_trace.py deleted file mode 100644 index 5404135..0000000 --- a/Examples/debugging/get_trace.py +++ /dev/null @@ -1,28 +0,0 @@ -from influxdb_client_3 import InfluxDBClient3 -import pandas as pd -from influxdb_client_3.debug import TracingClientMiddleWareFactory - - - -client = InfluxDBClient3( - token="", - host="eu-central-1-1.aws.cloud2.influxdata.com", - org="6a841c0c08328fb1", - database="pokemon-codex", - flight_client_options={"middleware": (TracingClientMiddleWareFactory(),)}) - - -sql = '''SELECT * FROM caught WHERE trainer = 'ash' AND time >= now() - interval '1 hour' LIMIT 5''' -table = client.query(query=sql, language='sql', mode='all') -print(table) - - -influxql = '''SELECT * FROM caught WHERE trainer = 'ash' AND time > now() - 1h LIMIT 5''' -reader = client.query(query=influxql, language='influxql', mode='chunk') -try: - while True: - batch, buff = reader.read_chunk() - print("batch:") - print(buff) -except StopIteration: - print("No more chunks to read") \ No newline at end of file diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 699678b..5c80b2e 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -41,6 +41,27 @@ def file_parser_options(**kwargs): return kwargs +def _deep_merge(target, source): + """ + Performs a deep merge of dictionaries or lists, + recursively merging the contents, handling nested structures, and concatenation of lists. + """ + if isinstance(target, dict) and isinstance(source, dict): + for key, value in source.items(): + if key in target and isinstance(target[key], (dict, list)) and isinstance(value, (dict, list)): + # If both target and source values are dictionaries or lists, merge them recursively + target[key] = _deep_merge(target[key], value) + else: + # Otherwise, replace the target value with the source value + target[key] = value + elif isinstance(target, list) and isinstance(source, list): + # If both target and source are lists, concatenate them + target.extend(source) + else: + # For other types, simply replace the target with the source + target = source + return target + class InfluxDBClient3: def __init__( self, @@ -100,7 +121,13 @@ def __init__( port = query_port_overwrite self._flight_client = FlightClient(f"grpc+tls://{hostname}:{port}", **self._flight_client_options) - + def _merge_options(self, defaults, custom={}): + """ + Merge default option arguments with custom (user-provided) arguments. + """ + if len(custom) == 0: + return defaults + return _deep_merge(defaults, {key: value for key, value in custom.items()}) def write(self, record=None, database=None ,**kwargs): """ @@ -172,7 +199,8 @@ def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column data_frame_measurement_name=measurement_name, data_frame_tag_columns=tag_columns, data_frame_timestamp_column=timestamp_column, **kwargs) - + + def query(self, query, language="sql", mode="all", database=None,**kwargs ): """ Query data from InfluxDB. @@ -185,7 +213,7 @@ def query(self, query, language="sql", mode="all", database=None,**kwargs ): :type mode: str :param database: The database to query from. If not provided, uses the database provided during initialization. :type database: str - :param kwargs: Additional arguments for the query. + :param kwargs: FlightClientCallOptions for the query. :return: The queried data. """ @@ -194,10 +222,14 @@ def query(self, query, language="sql", mode="all", database=None,**kwargs ): database = self._database try: - headers = [(b"authorization", f"Bearer {self._token}".encode('utf-8'))] - # Create an authorization header - _options = FlightCallOptions(headers=headers, **kwargs) + optargs = { + "headers": [(b"authorization", f"Bearer {self._token}".encode('utf-8'))], + "timeout": 300 + } + opts = self._merge_options(optargs, kwargs) + _options = FlightCallOptions(**opts) + ticket_data = {"database": database, "sql_query": query, "query_type": language} ticket = Ticket(json.dumps(ticket_data).encode('utf-8')) flight_reader = self._flight_client.do_get(ticket, _options) @@ -225,8 +257,7 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.close() - - + __all__ = [ "InfluxDBClient3", "Point", diff --git a/influxdb_client_3/debug.py b/influxdb_client_3/debug.py deleted file mode 100644 index 73cd866..0000000 --- a/influxdb_client_3/debug.py +++ /dev/null @@ -1,19 +0,0 @@ -import pyarrow.flight as flight - -class TracingClientMiddleWareFactory(flight.ClientMiddleware): - def start_call(self, info): - print("Starting new call:", info) - return TracingClientMiddleware() - -class TracingClientMiddleware(flight.ClientMiddleware): - def sending_headers(self): - print("Sending trace ID:", "traceheader") - return { - "x-tracing-id": "traceheader", - } - - def received_headers(self, headers): - if "trace-id" in headers: - trace_id = headers["trace-id"][0] - print("Found trace header with value:", trace_id) - # Don't overwrite our trace ID \ No newline at end of file