diff --git a/.github/workflows/code-coverage.yml b/.github/workflows/code-coverage.yml index 3a1050573..f4882f669 100644 --- a/.github/workflows/code-coverage.yml +++ b/.github/workflows/code-coverage.yml @@ -16,18 +16,23 @@ on: # the merge has already happened and the coverage check has no power # to block. Hence we deliberately don't subscribe to `push:main`. # -# Serialise E2E runs per ref so a force-push (or a fast follow-up commit) -# on a PR cancels the previous run instead of racing it against shared -# warehouse state (Delta tables, UC Volume files, etc.). -# -# Merge-queue runs are NOT cancelled — each queue entry needs its own -# clean CI signal so a regression on entry N doesn't get hidden by -# entry N+1 arriving seconds later. (Concurrent queue runs can still -# collide on shared warehouse state, but that's the cost of preserving -# per-entry signal; the uuid-suffix conventions in the e2e tests are -# what keep them isolated.) +# Concurrency groups: +# - pull_request: per-ref + cancel-in-progress. A force-push or fast +# follow-up commit on a PR cancels the previous run instead of +# racing it against shared warehouse state (Delta tables, UC Volume +# files, telemetry endpoints, etc.). +# - merge_group: serialised globally with a fixed group name. The +# warehouse can't tolerate two parallel queue entries hammering +# telemetry / retry paths simultaneously — we have observed flaky +# retry-test failures (extra `/telemetry-ext` retries inflating +# mock.call_count) under that load. Running queue entries one at a +# time costs queue throughput (one entry at a time, ~17 min each) +# but keeps signal trustworthy. cancel-in-progress is off so each +# entry gets a complete run. +# - workflow_dispatch: shares the merge_group group; manual triggers +# are rare enough that serialising them with the queue is fine. concurrency: - group: e2e-${{ github.workflow }}-${{ github.ref }} + group: ${{ github.event_name == 'pull_request' && format('e2e-pr-{0}', github.ref) || 'e2e-mq-serial' }} cancel-in-progress: ${{ github.event_name == 'pull_request' }} jobs: diff --git a/tests/e2e/common/retry_test_mixins.py b/tests/e2e/common/retry_test_mixins.py index af635331d..08876e145 100755 --- a/tests/e2e/common/retry_test_mixins.py +++ b/tests/e2e/common/retry_test_mixins.py @@ -17,6 +17,7 @@ ) from databricks.sql.telemetry.telemetry_client import ( NoopTelemetryClient, + TelemetryClient, TelemetryClientFactory, _TelemetryClientHolder, ) @@ -27,8 +28,22 @@ def _isolated_from_telemetry(): # Tests that mock urllib3 globally (via _get_conn / _validate_conn) also # intercept background telemetry pushes from the shared # TelemetryClientFactory executor — inflating mock.call_count and breaking - # assertions like `call_count == 6`. Drain the factory and force any new - # connection to use NoopTelemetryClient for the duration of the test. + # assertions like `call_count == 6`. Three layers of defence: + # + # 1. Drain TelemetryClientFactory and override initialize_telemetry_client + # so new connections install NoopTelemetryClient (which submits nothing). + # 2. Patch TelemetryClient._send_telemetry to a no-op as a backstop — covers + # any real TelemetryClient instance that slips in (e.g. a stale module- + # global, a code path that bypasses initialize_telemetry_client, or + # anything created before this context entered). + # 3. Patch TelemetryClient._export_event to a no-op so even if a real + # client receives an event, the event never reaches the queue and the + # flush logic never fires. + # + # Without layer 2/3 we have observed `/telemetry-ext` requests showing up + # in merge_group runs (concurrent CI load on the warehouse stresses paths + # that single-test runs don't hit), inflating retry-test counts and + # producing flaky AssertionErrors. with TelemetryClientFactory._lock: saved_clients = TelemetryClientFactory._clients saved_executor = TelemetryClientFactory._executor @@ -55,11 +70,21 @@ def _install_noop(*args, host_url=None, **kwargs): NoopTelemetryClient() ) + def _noop_send(self, events): + return None + + def _noop_export(self, event): + return None + try: with patch.object( TelemetryClientFactory, "initialize_telemetry_client", staticmethod(_install_noop), + ), patch.object( + TelemetryClient, "_send_telemetry", _noop_send + ), patch.object( + TelemetryClient, "_export_event", _noop_export ): yield finally: diff --git a/tests/e2e/test_telemetry_e2e.py b/tests/e2e/test_telemetry_e2e.py index 83c2dbf81..1f5bce73c 100644 --- a/tests/e2e/test_telemetry_e2e.py +++ b/tests/e2e/test_telemetry_e2e.py @@ -1,13 +1,11 @@ """ E2E test for telemetry - verifies telemetry behavior with different scenarios """ -import time import threading import logging from contextlib import contextmanager from unittest.mock import patch import pytest -from concurrent.futures import wait import databricks.sql as sql from databricks.sql.telemetry.telemetry_client import ( @@ -91,25 +89,43 @@ def telemetry_setup_teardown(self): @pytest.fixture def telemetry_interceptors(self): - """Setup reusable telemetry interceptors as a fixture""" + """Setup reusable telemetry interceptors as a fixture. + + Captures two signals: + * captured_events — every call to TelemetryClient._export_event + (the connector intended to send this event). + * captured_submissions — every call to TelemetryClient._send_telemetry + (the connector submitted this batch to the executor). + + We deliberately do NOT capture inside _telemetry_request_callback. That + callback only fires after the HTTP round-trip completes, and the + connector's TelemetryClientFactory.close() shuts down the shared + executor with wait=False — so on the last connection close, in-flight + futures may never get to run their callbacks before the executor is + gone. That's intentional for connection-close latency in production, + but it means a callback-time assertion is racing the connector's + shutdown path. Capturing at submission time tests what the connector + actually controls. + """ capture_lock = threading.Lock() captured_events = [] - captured_futures = [] + captured_submissions = [] original_export = TelemetryClient._export_event - original_callback = TelemetryClient._telemetry_request_callback + original_send = TelemetryClient._send_telemetry def export_wrapper(self_client, event): with capture_lock: captured_events.append(event) return original_export(self_client, event) - def callback_wrapper(self_client, future, sent_count): + def send_wrapper(self_client, events): with capture_lock: - captured_futures.append(future) - original_callback(self_client, future, sent_count) + # Record the batch (list of events) the connector submitted. + captured_submissions.append(list(events)) + return original_send(self_client, events) - return captured_events, captured_futures, export_wrapper, callback_wrapper + return captured_events, captured_submissions, export_wrapper, send_wrapper # ==================== ASSERTION HELPERS ==================== @@ -165,24 +181,33 @@ def assert_error_info(self, event, expected_error_name=None): if expected_error_name: assert error_info.error_name == expected_error_name - def verify_events(self, captured_events, captured_futures, expected_count): - """Common verification for event count and HTTP responses""" + def verify_events(self, captured_events, captured_submissions, expected_count): + """Common verification for event count and submission count. + + Asserts on what the connector did — exported events and submitted + batches — not on what the server returned. Server-side HTTP success + is asserted via end-to-end behavior elsewhere (and would race the + connector's wait=False executor shutdown on connection close, see + the docstring on telemetry_interceptors). + + Because these tests use telemetry_batch_size=1, each exported event + triggers its own batch submission, so the submission count should + equal the event count. + """ if expected_count == 0: - assert len(captured_events) == 0, f"Expected 0 events, got {len(captured_events)}" - assert len(captured_futures) == 0, f"Expected 0 responses, got {len(captured_futures)}" + assert len(captured_events) == 0, \ + f"Expected 0 events, got {len(captured_events)}" + assert len(captured_submissions) == 0, \ + f"Expected 0 submissions, got {len(captured_submissions)}" else: assert len(captured_events) == expected_count, \ f"Expected {expected_count} events, got {len(captured_events)}" + # batch_size=1, so one submission per event. + submitted_event_count = sum(len(batch) for batch in captured_submissions) + assert submitted_event_count == expected_count, \ + f"Expected {expected_count} submitted events across batches, " \ + f"got {submitted_event_count} (batches: {[len(b) for b in captured_submissions]})" - time.sleep(2) - done, _ = wait(captured_futures, timeout=10) - assert len(done) == expected_count, \ - f"Expected {expected_count} responses, got {len(done)}" - - for future in done: - response = future.result() - assert 200 <= response.status < 300 - # Assert common fields for all events for event in captured_events: self.assert_system_config(event) @@ -199,11 +224,11 @@ def verify_events(self, captured_events, captured_futures, expected_count): def test_telemetry_flags(self, telemetry_interceptors, enable_telemetry, force_enable, expected_count, test_id): """Test telemetry behavior with different flag combinations""" - captured_events, captured_futures, export_wrapper, callback_wrapper = \ + captured_events, captured_submissions, export_wrapper, send_wrapper = \ telemetry_interceptors with patch.object(TelemetryClient, "_export_event", export_wrapper), \ - patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper): + patch.object(TelemetryClient, "_send_telemetry", send_wrapper): extra_params = {"telemetry_batch_size": 1} if enable_telemetry is not None: @@ -216,9 +241,7 @@ def test_telemetry_flags(self, telemetry_interceptors, enable_telemetry, cursor.execute("SELECT 1") cursor.fetchone() - # Give time for async telemetry submission after connection closes - time.sleep(0.5) - self.verify_events(captured_events, captured_futures, expected_count) + self.verify_events(captured_events, captured_submissions, expected_count) # Assert statement execution on latency event (if events exist) if expected_count > 0: @@ -230,11 +253,11 @@ def test_telemetry_flags(self, telemetry_interceptors, enable_telemetry, ]) def test_sql_errors(self, telemetry_interceptors, query, expected_error): """Test telemetry captures error information for different SQL errors""" - captured_events, captured_futures, export_wrapper, callback_wrapper = \ + captured_events, captured_submissions, export_wrapper, send_wrapper = \ telemetry_interceptors with patch.object(TelemetryClient, "_export_event", export_wrapper), \ - patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper): + patch.object(TelemetryClient, "_send_telemetry", send_wrapper): with self.connection(extra_params={ "force_enable_telemetry": True, @@ -245,9 +268,6 @@ def test_sql_errors(self, telemetry_interceptors, query, expected_error): cursor.execute(query) cursor.fetchone() - time.sleep(2) - wait(captured_futures, timeout=10) - assert len(captured_events) >= 1 # Find event with error_info @@ -261,11 +281,11 @@ def test_sql_errors(self, telemetry_interceptors, query, expected_error): def test_metadata_operation(self, telemetry_interceptors): """Test telemetry for metadata operations (getCatalogs)""" - captured_events, captured_futures, export_wrapper, callback_wrapper = \ + captured_events, captured_submissions, export_wrapper, send_wrapper = \ telemetry_interceptors with patch.object(TelemetryClient, "_export_event", export_wrapper), \ - patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper): + patch.object(TelemetryClient, "_send_telemetry", send_wrapper): with self.connection(extra_params={ "force_enable_telemetry": True, @@ -275,9 +295,6 @@ def test_metadata_operation(self, telemetry_interceptors): catalogs = cursor.catalogs() catalogs.fetchall() - time.sleep(2) - wait(captured_futures, timeout=10) - assert len(captured_events) >= 1 for event in captured_events: self.assert_system_config(event) @@ -285,11 +302,11 @@ def test_metadata_operation(self, telemetry_interceptors): def test_direct_results(self, telemetry_interceptors): """Test telemetry with direct results (use_cloud_fetch=False)""" - captured_events, captured_futures, export_wrapper, callback_wrapper = \ + captured_events, captured_submissions, export_wrapper, send_wrapper = \ telemetry_interceptors with patch.object(TelemetryClient, "_export_event", export_wrapper), \ - patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper): + patch.object(TelemetryClient, "_send_telemetry", send_wrapper): with self.connection(extra_params={ "force_enable_telemetry": True, @@ -301,9 +318,6 @@ def test_direct_results(self, telemetry_interceptors): result = cursor.fetchall() assert len(result) == 1 and result[0][0] == 100 - time.sleep(2) - wait(captured_futures, timeout=10) - assert len(captured_events) >= 2 for event in captured_events: self.assert_system_config(event) @@ -320,11 +334,11 @@ def test_direct_results(self, telemetry_interceptors): def test_cloudfetch_with_different_close_patterns(self, telemetry_interceptors, close_type): """Test telemetry with cloud fetch using different resource closing patterns""" - captured_events, captured_futures, export_wrapper, callback_wrapper = \ + captured_events, captured_submissions, export_wrapper, send_wrapper = \ telemetry_interceptors with patch.object(TelemetryClient, "_export_event", export_wrapper), \ - patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper): + patch.object(TelemetryClient, "_send_telemetry", send_wrapper): if close_type == "explicit_connection": # Test explicit connection close @@ -365,9 +379,6 @@ def test_cloudfetch_with_different_close_patterns(self, telemetry_interceptors, result = cursor.fetchall() assert len(result) == 1000 - time.sleep(2) - wait(captured_futures, timeout=10) - assert len(captured_events) >= 2 for event in captured_events: self.assert_system_config(event)