From d3044a0a6a511b3e0bcc3bc722da6ef03b33b66c Mon Sep 17 00:00:00 2001 From: Vikrant Puppala Date: Wed, 27 May 2026 13:02:16 +0000 Subject: [PATCH 1/2] test(telemetry/e2e): make TestTelemetryE2E deterministic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous tests asserted "telemetry round-tripped to the server" by intercepting TelemetryClient._telemetry_request_callback and counting completed futures. That recording lags the actual work — the callback fires asynchronously after the HTTP request completes, and on the *last* connection close TelemetryClientFactory.close() shuts the shared executor down with wait=False (intentional, for connection-close latency in production). Two consequences: 1. A `wait(captured_futures, timeout=10)` call right after `with conn:` can return before any callbacks have fired — so the wait is "waiting on" an empty list, returns immediately, and the assertion `assert len(done) == expected_count` fails non- deterministically with `assert 0 == 2` or `assert 1 == 2`. 2. The shared-executor shutdown(wait=False) can drop in-flight submissions that haven't started running yet, so even if we drained correctly we'd be testing whether the server happened to receive the request in time, not whether the connector correctly dispatched it. Switch interception from `_telemetry_request_callback` to `_send_telemetry`. That captures the connector's *intent to submit* synchronously, which is what we actually want to test — the connector either decided to send a batch or it didn't, regardless of what happens to the future afterward. No sleep needed, no timeout-based wait needed, no race against the executor shutdown. 5 consecutive local runs pass deterministically in ~20s each (down from ~17 min when the flake hit). Co-authored-by: Isaac Signed-off-by: Vikrant Puppala --- tests/e2e/test_telemetry_e2e.py | 105 ++++++++++++++++++-------------- 1 file changed, 58 insertions(+), 47 deletions(-) diff --git a/tests/e2e/test_telemetry_e2e.py b/tests/e2e/test_telemetry_e2e.py index 83c2dbf81..1f5bce73c 100644 --- a/tests/e2e/test_telemetry_e2e.py +++ b/tests/e2e/test_telemetry_e2e.py @@ -1,13 +1,11 @@ """ E2E test for telemetry - verifies telemetry behavior with different scenarios """ -import time import threading import logging from contextlib import contextmanager from unittest.mock import patch import pytest -from concurrent.futures import wait import databricks.sql as sql from databricks.sql.telemetry.telemetry_client import ( @@ -91,25 +89,43 @@ def telemetry_setup_teardown(self): @pytest.fixture def telemetry_interceptors(self): - """Setup reusable telemetry interceptors as a fixture""" + """Setup reusable telemetry interceptors as a fixture. + + Captures two signals: + * captured_events — every call to TelemetryClient._export_event + (the connector intended to send this event). + * captured_submissions — every call to TelemetryClient._send_telemetry + (the connector submitted this batch to the executor). + + We deliberately do NOT capture inside _telemetry_request_callback. That + callback only fires after the HTTP round-trip completes, and the + connector's TelemetryClientFactory.close() shuts down the shared + executor with wait=False — so on the last connection close, in-flight + futures may never get to run their callbacks before the executor is + gone. That's intentional for connection-close latency in production, + but it means a callback-time assertion is racing the connector's + shutdown path. Capturing at submission time tests what the connector + actually controls. + """ capture_lock = threading.Lock() captured_events = [] - captured_futures = [] + captured_submissions = [] original_export = TelemetryClient._export_event - original_callback = TelemetryClient._telemetry_request_callback + original_send = TelemetryClient._send_telemetry def export_wrapper(self_client, event): with capture_lock: captured_events.append(event) return original_export(self_client, event) - def callback_wrapper(self_client, future, sent_count): + def send_wrapper(self_client, events): with capture_lock: - captured_futures.append(future) - original_callback(self_client, future, sent_count) + # Record the batch (list of events) the connector submitted. + captured_submissions.append(list(events)) + return original_send(self_client, events) - return captured_events, captured_futures, export_wrapper, callback_wrapper + return captured_events, captured_submissions, export_wrapper, send_wrapper # ==================== ASSERTION HELPERS ==================== @@ -165,24 +181,33 @@ def assert_error_info(self, event, expected_error_name=None): if expected_error_name: assert error_info.error_name == expected_error_name - def verify_events(self, captured_events, captured_futures, expected_count): - """Common verification for event count and HTTP responses""" + def verify_events(self, captured_events, captured_submissions, expected_count): + """Common verification for event count and submission count. + + Asserts on what the connector did — exported events and submitted + batches — not on what the server returned. Server-side HTTP success + is asserted via end-to-end behavior elsewhere (and would race the + connector's wait=False executor shutdown on connection close, see + the docstring on telemetry_interceptors). + + Because these tests use telemetry_batch_size=1, each exported event + triggers its own batch submission, so the submission count should + equal the event count. + """ if expected_count == 0: - assert len(captured_events) == 0, f"Expected 0 events, got {len(captured_events)}" - assert len(captured_futures) == 0, f"Expected 0 responses, got {len(captured_futures)}" + assert len(captured_events) == 0, \ + f"Expected 0 events, got {len(captured_events)}" + assert len(captured_submissions) == 0, \ + f"Expected 0 submissions, got {len(captured_submissions)}" else: assert len(captured_events) == expected_count, \ f"Expected {expected_count} events, got {len(captured_events)}" + # batch_size=1, so one submission per event. + submitted_event_count = sum(len(batch) for batch in captured_submissions) + assert submitted_event_count == expected_count, \ + f"Expected {expected_count} submitted events across batches, " \ + f"got {submitted_event_count} (batches: {[len(b) for b in captured_submissions]})" - time.sleep(2) - done, _ = wait(captured_futures, timeout=10) - assert len(done) == expected_count, \ - f"Expected {expected_count} responses, got {len(done)}" - - for future in done: - response = future.result() - assert 200 <= response.status < 300 - # Assert common fields for all events for event in captured_events: self.assert_system_config(event) @@ -199,11 +224,11 @@ def verify_events(self, captured_events, captured_futures, expected_count): def test_telemetry_flags(self, telemetry_interceptors, enable_telemetry, force_enable, expected_count, test_id): """Test telemetry behavior with different flag combinations""" - captured_events, captured_futures, export_wrapper, callback_wrapper = \ + captured_events, captured_submissions, export_wrapper, send_wrapper = \ telemetry_interceptors with patch.object(TelemetryClient, "_export_event", export_wrapper), \ - patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper): + patch.object(TelemetryClient, "_send_telemetry", send_wrapper): extra_params = {"telemetry_batch_size": 1} if enable_telemetry is not None: @@ -216,9 +241,7 @@ def test_telemetry_flags(self, telemetry_interceptors, enable_telemetry, cursor.execute("SELECT 1") cursor.fetchone() - # Give time for async telemetry submission after connection closes - time.sleep(0.5) - self.verify_events(captured_events, captured_futures, expected_count) + self.verify_events(captured_events, captured_submissions, expected_count) # Assert statement execution on latency event (if events exist) if expected_count > 0: @@ -230,11 +253,11 @@ def test_telemetry_flags(self, telemetry_interceptors, enable_telemetry, ]) def test_sql_errors(self, telemetry_interceptors, query, expected_error): """Test telemetry captures error information for different SQL errors""" - captured_events, captured_futures, export_wrapper, callback_wrapper = \ + captured_events, captured_submissions, export_wrapper, send_wrapper = \ telemetry_interceptors with patch.object(TelemetryClient, "_export_event", export_wrapper), \ - patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper): + patch.object(TelemetryClient, "_send_telemetry", send_wrapper): with self.connection(extra_params={ "force_enable_telemetry": True, @@ -245,9 +268,6 @@ def test_sql_errors(self, telemetry_interceptors, query, expected_error): cursor.execute(query) cursor.fetchone() - time.sleep(2) - wait(captured_futures, timeout=10) - assert len(captured_events) >= 1 # Find event with error_info @@ -261,11 +281,11 @@ def test_sql_errors(self, telemetry_interceptors, query, expected_error): def test_metadata_operation(self, telemetry_interceptors): """Test telemetry for metadata operations (getCatalogs)""" - captured_events, captured_futures, export_wrapper, callback_wrapper = \ + captured_events, captured_submissions, export_wrapper, send_wrapper = \ telemetry_interceptors with patch.object(TelemetryClient, "_export_event", export_wrapper), \ - patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper): + patch.object(TelemetryClient, "_send_telemetry", send_wrapper): with self.connection(extra_params={ "force_enable_telemetry": True, @@ -275,9 +295,6 @@ def test_metadata_operation(self, telemetry_interceptors): catalogs = cursor.catalogs() catalogs.fetchall() - time.sleep(2) - wait(captured_futures, timeout=10) - assert len(captured_events) >= 1 for event in captured_events: self.assert_system_config(event) @@ -285,11 +302,11 @@ def test_metadata_operation(self, telemetry_interceptors): def test_direct_results(self, telemetry_interceptors): """Test telemetry with direct results (use_cloud_fetch=False)""" - captured_events, captured_futures, export_wrapper, callback_wrapper = \ + captured_events, captured_submissions, export_wrapper, send_wrapper = \ telemetry_interceptors with patch.object(TelemetryClient, "_export_event", export_wrapper), \ - patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper): + patch.object(TelemetryClient, "_send_telemetry", send_wrapper): with self.connection(extra_params={ "force_enable_telemetry": True, @@ -301,9 +318,6 @@ def test_direct_results(self, telemetry_interceptors): result = cursor.fetchall() assert len(result) == 1 and result[0][0] == 100 - time.sleep(2) - wait(captured_futures, timeout=10) - assert len(captured_events) >= 2 for event in captured_events: self.assert_system_config(event) @@ -320,11 +334,11 @@ def test_direct_results(self, telemetry_interceptors): def test_cloudfetch_with_different_close_patterns(self, telemetry_interceptors, close_type): """Test telemetry with cloud fetch using different resource closing patterns""" - captured_events, captured_futures, export_wrapper, callback_wrapper = \ + captured_events, captured_submissions, export_wrapper, send_wrapper = \ telemetry_interceptors with patch.object(TelemetryClient, "_export_event", export_wrapper), \ - patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper): + patch.object(TelemetryClient, "_send_telemetry", send_wrapper): if close_type == "explicit_connection": # Test explicit connection close @@ -365,9 +379,6 @@ def test_cloudfetch_with_different_close_patterns(self, telemetry_interceptors, result = cursor.fetchall() assert len(result) == 1000 - time.sleep(2) - wait(captured_futures, timeout=10) - assert len(captured_events) >= 2 for event in captured_events: self.assert_system_config(event) From 8dd756d7c95474132c22b2819e52e5582de0b3aa Mon Sep 17 00:00:00 2001 From: Vikrant Puppala Date: Wed, 27 May 2026 17:01:51 +0000 Subject: [PATCH 2/2] ci/test: deflake retry tests under merge-queue load MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two compounding fixes that surfaced on PR #812's first merge_group run, where test_oserror_retries failed with `assert mock_validate_conn.call_count == 6` — unexpected `/telemetry-ext` requests had been counted alongside the intended session-endpoint retries. 1. tests/e2e/common/retry_test_mixins.py — strengthen `_isolated_from_telemetry()` with two additional defensive patches: - TelemetryClient._send_telemetry → no-op - TelemetryClient._export_event → no-op The existing factory swap installs NoopTelemetryClient for new connections, but doesn't cover real TelemetryClient instances that slip in via other paths (stale module-global, code that bypasses initialize_telemetry_client, anything created before the context entered). Patching at the class level catches all of them. 2. .github/workflows/code-coverage.yml — serialise merge_group runs. Previous concurrency group keyed on github.ref, which is per-PR in the queue (gh-readonly-queue/main/pr-N-…). That allowed multiple queue entries to hammer the same warehouse in parallel, stressing telemetry / retry paths that single-PR runs don't exercise. Group merge_group + workflow_dispatch under a single fixed name (e2e-mq-serial) so they run one at a time. PR-event runs keep per-ref grouping + cancel-in-progress for fast author feedback. Trade-off: queue throughput drops to one ~17-min run at a time. Folded into PR #812 so the telemetry-test rewrite and the retry-test deflake ship as a single unit. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala --- .github/workflows/code-coverage.yml | 27 +++++++++++++++---------- tests/e2e/common/retry_test_mixins.py | 29 +++++++++++++++++++++++++-- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/.github/workflows/code-coverage.yml b/.github/workflows/code-coverage.yml index 3a1050573..f4882f669 100644 --- a/.github/workflows/code-coverage.yml +++ b/.github/workflows/code-coverage.yml @@ -16,18 +16,23 @@ on: # the merge has already happened and the coverage check has no power # to block. Hence we deliberately don't subscribe to `push:main`. # -# Serialise E2E runs per ref so a force-push (or a fast follow-up commit) -# on a PR cancels the previous run instead of racing it against shared -# warehouse state (Delta tables, UC Volume files, etc.). -# -# Merge-queue runs are NOT cancelled — each queue entry needs its own -# clean CI signal so a regression on entry N doesn't get hidden by -# entry N+1 arriving seconds later. (Concurrent queue runs can still -# collide on shared warehouse state, but that's the cost of preserving -# per-entry signal; the uuid-suffix conventions in the e2e tests are -# what keep them isolated.) +# Concurrency groups: +# - pull_request: per-ref + cancel-in-progress. A force-push or fast +# follow-up commit on a PR cancels the previous run instead of +# racing it against shared warehouse state (Delta tables, UC Volume +# files, telemetry endpoints, etc.). +# - merge_group: serialised globally with a fixed group name. The +# warehouse can't tolerate two parallel queue entries hammering +# telemetry / retry paths simultaneously — we have observed flaky +# retry-test failures (extra `/telemetry-ext` retries inflating +# mock.call_count) under that load. Running queue entries one at a +# time costs queue throughput (one entry at a time, ~17 min each) +# but keeps signal trustworthy. cancel-in-progress is off so each +# entry gets a complete run. +# - workflow_dispatch: shares the merge_group group; manual triggers +# are rare enough that serialising them with the queue is fine. concurrency: - group: e2e-${{ github.workflow }}-${{ github.ref }} + group: ${{ github.event_name == 'pull_request' && format('e2e-pr-{0}', github.ref) || 'e2e-mq-serial' }} cancel-in-progress: ${{ github.event_name == 'pull_request' }} jobs: diff --git a/tests/e2e/common/retry_test_mixins.py b/tests/e2e/common/retry_test_mixins.py index af635331d..08876e145 100755 --- a/tests/e2e/common/retry_test_mixins.py +++ b/tests/e2e/common/retry_test_mixins.py @@ -17,6 +17,7 @@ ) from databricks.sql.telemetry.telemetry_client import ( NoopTelemetryClient, + TelemetryClient, TelemetryClientFactory, _TelemetryClientHolder, ) @@ -27,8 +28,22 @@ def _isolated_from_telemetry(): # Tests that mock urllib3 globally (via _get_conn / _validate_conn) also # intercept background telemetry pushes from the shared # TelemetryClientFactory executor — inflating mock.call_count and breaking - # assertions like `call_count == 6`. Drain the factory and force any new - # connection to use NoopTelemetryClient for the duration of the test. + # assertions like `call_count == 6`. Three layers of defence: + # + # 1. Drain TelemetryClientFactory and override initialize_telemetry_client + # so new connections install NoopTelemetryClient (which submits nothing). + # 2. Patch TelemetryClient._send_telemetry to a no-op as a backstop — covers + # any real TelemetryClient instance that slips in (e.g. a stale module- + # global, a code path that bypasses initialize_telemetry_client, or + # anything created before this context entered). + # 3. Patch TelemetryClient._export_event to a no-op so even if a real + # client receives an event, the event never reaches the queue and the + # flush logic never fires. + # + # Without layer 2/3 we have observed `/telemetry-ext` requests showing up + # in merge_group runs (concurrent CI load on the warehouse stresses paths + # that single-test runs don't hit), inflating retry-test counts and + # producing flaky AssertionErrors. with TelemetryClientFactory._lock: saved_clients = TelemetryClientFactory._clients saved_executor = TelemetryClientFactory._executor @@ -55,11 +70,21 @@ def _install_noop(*args, host_url=None, **kwargs): NoopTelemetryClient() ) + def _noop_send(self, events): + return None + + def _noop_export(self, event): + return None + try: with patch.object( TelemetryClientFactory, "initialize_telemetry_client", staticmethod(_install_noop), + ), patch.object( + TelemetryClient, "_send_telemetry", _noop_send + ), patch.object( + TelemetryClient, "_export_event", _noop_export ): yield finally: