Skip to content

Conversation

jstirnaman
Copy link
Contributor

…Options

  • Allow passing FlightCallOptions as keyword args in query().
  • Merges default args with custom args set by the user, and then passes them into the FlightCallOptions constructor.
  • Only supports headers for now.
  • Successfully tested with iox-debug header.
  • TODO: verify that the server is receiving the specified trace-id header.
  • TODO: test InfluxDB support for other FlightCallOptions (e.g. timeout).

…Options

- Allow passing FlightCallOptions as keyword args in query().
- Merges default args with custom args set by the user, and then passes them into the FlightCallOptions constructor.
- Only supports headers for now.
- Successfully tested with `iox-debug` header.
- TODO: verify that the server is receiving the specified `trace-id` header.
- TODO: test InfluxDB support for other FlightCallOptions (e.g. timeout).
@jstirnaman
Copy link
Contributor Author

@Jayclifford345 As noted in the description, this needs a bit more testing. I haven't verified the trace-id yet. I can probably use btasker's callback example to test that. I really need to setup a good grpc sniffer.... Oh, nvm...
@reidkaufmann to the rescue.

jstirnaman and others added 3 commits September 27, 2023 08:55
Co-authored-by: Jay Clifford <45856600+Jayclifford345@users.noreply.github.com>
Co-authored-by: Jay Clifford <45856600+Jayclifford345@users.noreply.github.com>
@jstirnaman
Copy link
Contributor Author

jstirnaman commented Sep 27, 2023

  • Added the check for user-provided headers before merging them.
  • Factored out a _merge_options() function.
  • Got a successful test sending an influx-debug-id header and getting a trace-id in the response:
...
TracingClientMiddlewareFactory
...
class TracingClientMiddleware(flight.ClientMiddleware):
    def __init__(self, method, callback_obj):
        self._method = method
        self.callback = callback_obj
    def call_completed(self, exception):
      print('callback: call_completed')
      if(exception):
        print(f"  ...with exception: {exception}")
      
    def sending_headers(self):
      print('callback: sending_headers')
      
    def received_headers(self, headers):
        print('callback: received_headers')
        print(headers)

def test_custom_headers():
    print('test_custom_headers')
    res = TracingClientMiddleWareFactory()
    client = InfluxDBClient3(token = DATABASE_TOKEN,
                        host = HOSTNAME,
                        org = '',
                        database=DATABASE_NAME,
                        flight_client_options={"middleware": (res,)})
    client.query('select * from home',
                        headers=[(b"influx-debug-id", b"JasTraceId")],
                        timeout=60)
    print("client attrs: ", vars(client))
    reader = client.query('''SELECT query_type, completed_duration, trace_id
                          FROM system.queries
                          WHERE issue_time >= now() - INTERVAL '1 day'
                          AND trace_id != 'None'
                          AND query_text LIKE '%select * from home%'
                        ''',
                        language='sql',
                        headers=[(b"iox-debug", b"true")],
                        mode="reader")
    print(reader.read_all().to_pandas())

test_custom_headers()
test_custom_headers
{'headers': [(b'authorization', b'Bearer <Token>'), (b'influx-debug-id', b'JasTraceId')], 'timeout': 60}
callback: sending_headers
callback: received_headers
{'date': ['Wed, 27 Sep 2023 18:53:51 GMT'], 'server': ['envoy'], 'trace-id': ['<redacted>'], 'x-envoy-upstream-service-time': ['3']}
callback: call_completed
client attrs:  {'_org': '', '_database': 'get-started', '_token': '<Token>', '_write_client_options': {'write_options': <influxdb_client_3.write_client.client.write_api.WriteOptions object at 0x112a76910>}, '_client': <influxdb_client_3.write_client.client.influxdb_client.InfluxDBClient object at 0x17fd87b10>, '_write_api': <influxdb_client_3.write_client.client.write_api.WriteApi object at 0x17dc6e350>, '_flight_client_options': {'middleware': (<__main__.TracingClientMiddleWareFactory object at 0x17fd878d0>,)}, '_flight_client': <pyarrow._flight.FlightClient object at 0x112a8a280>}
{'headers': [(b'authorization', b'Bearer <Token>'), (b'iox-debug', b'true')], 'timeout': 300}
callback: sending_headers
callback: received_headers
{'date': ['Wed, 27 Sep 2023 18:53:51 GMT'], 'server': ['envoy'], 'x-envoy-upstream-service-time': ['2']}
callback: call_completed
  query_type        completed_duration                          trace_id
0        sql 0 days 00:00:00.002831083  <redacted>
1        sql 0 days 00:00:00.003215176  <redacted>
2        sql 0 days 00:00:00.002598998  <redacted>
3        sql 0 days 00:00:00.002932482  <redacted>
4        sql 0 days 00:00:00.005362857  <redacted>
5        sql 0 days 00:00:00.002949846 <redacted>

@jstirnaman
Copy link
Contributor Author

jstirnaman commented Sep 29, 2023

  • Added the check for user-provided headers before merging them.
  • Factored out a _merge_options() function.
  • Got a successful test sending an influx-debug-id header and getting a trace-id in the response:

Test output for extracting trace-id response header and using it query system.queries table:

test_get_system_query_info
Received headers {'date': ['Mon, 02 Oct 2023 16:04:50 GMT'], 'server': ['envoy'], 'trace-id': ['REDACTED'], 'x-envoy-> upstream-service-time': ['3']} from FlightMethod.DO_GET
0 days 00:00:00.002047636

Test output for timeout=0:

query_timeout_option
callback: sending_headers
callback: received_headers
{}
callback: call_completed
 ...with exception: Flight returned timeout error, with message: Deadline Exceeded

@Jayclifford345 can you review this again?

@Jayclifford345 Jayclifford345 merged commit 436930d into InfluxCommunity:main Oct 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants