Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 0 additions & 28 deletions Examples/debugging/get_trace.py

This file was deleted.

47 changes: 39 additions & 8 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,27 @@ def file_parser_options(**kwargs):
return kwargs


def _deep_merge(target, source):
"""
Performs a deep merge of dictionaries or lists,
recursively merging the contents, handling nested structures, and concatenation of lists.
"""
if isinstance(target, dict) and isinstance(source, dict):
for key, value in source.items():
if key in target and isinstance(target[key], (dict, list)) and isinstance(value, (dict, list)):
# If both target and source values are dictionaries or lists, merge them recursively
target[key] = _deep_merge(target[key], value)
else:
# Otherwise, replace the target value with the source value
target[key] = value
elif isinstance(target, list) and isinstance(source, list):
# If both target and source are lists, concatenate them
target.extend(source)
else:
# For other types, simply replace the target with the source
target = source
return target

class InfluxDBClient3:
def __init__(
self,
Expand Down Expand Up @@ -100,7 +121,13 @@ def __init__(
port = query_port_overwrite
self._flight_client = FlightClient(f"grpc+tls://{hostname}:{port}", **self._flight_client_options)


def _merge_options(self, defaults, custom={}):
"""
Merge default option arguments with custom (user-provided) arguments.
"""
if len(custom) == 0:
return defaults
return _deep_merge(defaults, {key: value for key, value in custom.items()})

def write(self, record=None, database=None ,**kwargs):
"""
Expand Down Expand Up @@ -172,7 +199,8 @@ def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column
data_frame_measurement_name=measurement_name,
data_frame_tag_columns=tag_columns,
data_frame_timestamp_column=timestamp_column, **kwargs)



def query(self, query, language="sql", mode="all", database=None,**kwargs ):
"""
Query data from InfluxDB.
Expand All @@ -185,7 +213,7 @@ def query(self, query, language="sql", mode="all", database=None,**kwargs ):
:type mode: str
:param database: The database to query from. If not provided, uses the database provided during initialization.
:type database: str
:param kwargs: Additional arguments for the query.
:param kwargs: FlightClientCallOptions for the query.
:return: The queried data.
"""

Expand All @@ -194,10 +222,14 @@ def query(self, query, language="sql", mode="all", database=None,**kwargs ):
database = self._database

try:
headers = [(b"authorization", f"Bearer {self._token}".encode('utf-8'))]

# Create an authorization header
_options = FlightCallOptions(headers=headers, **kwargs)
optargs = {
"headers": [(b"authorization", f"Bearer {self._token}".encode('utf-8'))],
"timeout": 300
}
opts = self._merge_options(optargs, kwargs)
_options = FlightCallOptions(**opts)

ticket_data = {"database": database, "sql_query": query, "query_type": language}
ticket = Ticket(json.dumps(ticket_data).encode('utf-8'))
flight_reader = self._flight_client.do_get(ticket, _options)
Expand Down Expand Up @@ -225,8 +257,7 @@ def __enter__(self):

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()



__all__ = [
"InfluxDBClient3",
"Point",
Expand Down
19 changes: 0 additions & 19 deletions influxdb_client_3/debug.py

This file was deleted.