Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions .github/workflows/code-coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,23 @@ on:
# the merge has already happened and the coverage check has no power
# to block. Hence we deliberately don't subscribe to `push:main`.
#
# Serialise E2E runs per ref so a force-push (or a fast follow-up commit)
# on a PR cancels the previous run instead of racing it against shared
# warehouse state (Delta tables, UC Volume files, etc.).
#
# Merge-queue runs are NOT cancelled — each queue entry needs its own
# clean CI signal so a regression on entry N doesn't get hidden by
# entry N+1 arriving seconds later. (Concurrent queue runs can still
# collide on shared warehouse state, but that's the cost of preserving
# per-entry signal; the uuid-suffix conventions in the e2e tests are
# what keep them isolated.)
# Concurrency groups:
# - pull_request: per-ref + cancel-in-progress. A force-push or fast
# follow-up commit on a PR cancels the previous run instead of
# racing it against shared warehouse state (Delta tables, UC Volume
# files, telemetry endpoints, etc.).
# - merge_group: serialised globally with a fixed group name. The
# warehouse can't tolerate two parallel queue entries hammering
# telemetry / retry paths simultaneously — we have observed flaky
# retry-test failures (extra `/telemetry-ext` retries inflating
# mock.call_count) under that load. Running queue entries one at a
# time costs queue throughput (one entry at a time, ~17 min each)
# but keeps signal trustworthy. cancel-in-progress is off so each
# entry gets a complete run.
# - workflow_dispatch: shares the merge_group group; manual triggers
# are rare enough that serialising them with the queue is fine.
concurrency:
group: e2e-${{ github.workflow }}-${{ github.ref }}
group: ${{ github.event_name == 'pull_request' && format('e2e-pr-{0}', github.ref) || 'e2e-mq-serial' }}
cancel-in-progress: ${{ github.event_name == 'pull_request' }}

jobs:
Expand Down
29 changes: 27 additions & 2 deletions tests/e2e/common/retry_test_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)
from databricks.sql.telemetry.telemetry_client import (
NoopTelemetryClient,
TelemetryClient,
TelemetryClientFactory,
_TelemetryClientHolder,
)
Expand All @@ -27,8 +28,22 @@ def _isolated_from_telemetry():
# Tests that mock urllib3 globally (via _get_conn / _validate_conn) also
# intercept background telemetry pushes from the shared
# TelemetryClientFactory executor — inflating mock.call_count and breaking
# assertions like `call_count == 6`. Drain the factory and force any new
# connection to use NoopTelemetryClient for the duration of the test.
# assertions like `call_count == 6`. Three layers of defence:
#
# 1. Drain TelemetryClientFactory and override initialize_telemetry_client
# so new connections install NoopTelemetryClient (which submits nothing).
# 2. Patch TelemetryClient._send_telemetry to a no-op as a backstop — covers
# any real TelemetryClient instance that slips in (e.g. a stale module-
# global, a code path that bypasses initialize_telemetry_client, or
# anything created before this context entered).
# 3. Patch TelemetryClient._export_event to a no-op so even if a real
# client receives an event, the event never reaches the queue and the
# flush logic never fires.
#
# Without layer 2/3 we have observed `/telemetry-ext` requests showing up
# in merge_group runs (concurrent CI load on the warehouse stresses paths
# that single-test runs don't hit), inflating retry-test counts and
# producing flaky AssertionErrors.
with TelemetryClientFactory._lock:
saved_clients = TelemetryClientFactory._clients
saved_executor = TelemetryClientFactory._executor
Expand All @@ -55,11 +70,21 @@ def _install_noop(*args, host_url=None, **kwargs):
NoopTelemetryClient()
)

def _noop_send(self, events):
return None

def _noop_export(self, event):
return None

try:
with patch.object(
TelemetryClientFactory,
"initialize_telemetry_client",
staticmethod(_install_noop),
), patch.object(
TelemetryClient, "_send_telemetry", _noop_send
), patch.object(
TelemetryClient, "_export_event", _noop_export
):
yield
finally:
Expand Down
105 changes: 58 additions & 47 deletions tests/e2e/test_telemetry_e2e.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
"""
E2E test for telemetry - verifies telemetry behavior with different scenarios
"""
import time
import threading
import logging
from contextlib import contextmanager
from unittest.mock import patch
import pytest
from concurrent.futures import wait

import databricks.sql as sql
from databricks.sql.telemetry.telemetry_client import (
Expand Down Expand Up @@ -91,25 +89,43 @@ def telemetry_setup_teardown(self):

@pytest.fixture
def telemetry_interceptors(self):
"""Setup reusable telemetry interceptors as a fixture"""
"""Setup reusable telemetry interceptors as a fixture.

Captures two signals:
* captured_events — every call to TelemetryClient._export_event
(the connector intended to send this event).
* captured_submissions — every call to TelemetryClient._send_telemetry
(the connector submitted this batch to the executor).

We deliberately do NOT capture inside _telemetry_request_callback. That
callback only fires after the HTTP round-trip completes, and the
connector's TelemetryClientFactory.close() shuts down the shared
executor with wait=False — so on the last connection close, in-flight
futures may never get to run their callbacks before the executor is
gone. That's intentional for connection-close latency in production,
but it means a callback-time assertion is racing the connector's
shutdown path. Capturing at submission time tests what the connector
actually controls.
"""
capture_lock = threading.Lock()
captured_events = []
captured_futures = []
captured_submissions = []

original_export = TelemetryClient._export_event
original_callback = TelemetryClient._telemetry_request_callback
original_send = TelemetryClient._send_telemetry

def export_wrapper(self_client, event):
with capture_lock:
captured_events.append(event)
return original_export(self_client, event)

def callback_wrapper(self_client, future, sent_count):
def send_wrapper(self_client, events):
with capture_lock:
captured_futures.append(future)
original_callback(self_client, future, sent_count)
# Record the batch (list of events) the connector submitted.
captured_submissions.append(list(events))
return original_send(self_client, events)

return captured_events, captured_futures, export_wrapper, callback_wrapper
return captured_events, captured_submissions, export_wrapper, send_wrapper

# ==================== ASSERTION HELPERS ====================

Expand Down Expand Up @@ -165,24 +181,33 @@ def assert_error_info(self, event, expected_error_name=None):
if expected_error_name:
assert error_info.error_name == expected_error_name

def verify_events(self, captured_events, captured_futures, expected_count):
"""Common verification for event count and HTTP responses"""
def verify_events(self, captured_events, captured_submissions, expected_count):
"""Common verification for event count and submission count.

Asserts on what the connector did — exported events and submitted
batches — not on what the server returned. Server-side HTTP success
is asserted via end-to-end behavior elsewhere (and would race the
connector's wait=False executor shutdown on connection close, see
the docstring on telemetry_interceptors).

Because these tests use telemetry_batch_size=1, each exported event
triggers its own batch submission, so the submission count should
equal the event count.
"""
if expected_count == 0:
assert len(captured_events) == 0, f"Expected 0 events, got {len(captured_events)}"
assert len(captured_futures) == 0, f"Expected 0 responses, got {len(captured_futures)}"
assert len(captured_events) == 0, \
f"Expected 0 events, got {len(captured_events)}"
assert len(captured_submissions) == 0, \
f"Expected 0 submissions, got {len(captured_submissions)}"
else:
assert len(captured_events) == expected_count, \
f"Expected {expected_count} events, got {len(captured_events)}"
# batch_size=1, so one submission per event.
submitted_event_count = sum(len(batch) for batch in captured_submissions)
assert submitted_event_count == expected_count, \
f"Expected {expected_count} submitted events across batches, " \
f"got {submitted_event_count} (batches: {[len(b) for b in captured_submissions]})"

time.sleep(2)
done, _ = wait(captured_futures, timeout=10)
assert len(done) == expected_count, \
f"Expected {expected_count} responses, got {len(done)}"

for future in done:
response = future.result()
assert 200 <= response.status < 300

# Assert common fields for all events
for event in captured_events:
self.assert_system_config(event)
Expand All @@ -199,11 +224,11 @@ def verify_events(self, captured_events, captured_futures, expected_count):
def test_telemetry_flags(self, telemetry_interceptors, enable_telemetry,
force_enable, expected_count, test_id):
"""Test telemetry behavior with different flag combinations"""
captured_events, captured_futures, export_wrapper, callback_wrapper = \
captured_events, captured_submissions, export_wrapper, send_wrapper = \
telemetry_interceptors

with patch.object(TelemetryClient, "_export_event", export_wrapper), \
patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper):
patch.object(TelemetryClient, "_send_telemetry", send_wrapper):

extra_params = {"telemetry_batch_size": 1}
if enable_telemetry is not None:
Expand All @@ -216,9 +241,7 @@ def test_telemetry_flags(self, telemetry_interceptors, enable_telemetry,
cursor.execute("SELECT 1")
cursor.fetchone()

# Give time for async telemetry submission after connection closes
time.sleep(0.5)
self.verify_events(captured_events, captured_futures, expected_count)
self.verify_events(captured_events, captured_submissions, expected_count)

# Assert statement execution on latency event (if events exist)
if expected_count > 0:
Expand All @@ -230,11 +253,11 @@ def test_telemetry_flags(self, telemetry_interceptors, enable_telemetry,
])
def test_sql_errors(self, telemetry_interceptors, query, expected_error):
"""Test telemetry captures error information for different SQL errors"""
captured_events, captured_futures, export_wrapper, callback_wrapper = \
captured_events, captured_submissions, export_wrapper, send_wrapper = \
telemetry_interceptors

with patch.object(TelemetryClient, "_export_event", export_wrapper), \
patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper):
patch.object(TelemetryClient, "_send_telemetry", send_wrapper):

with self.connection(extra_params={
"force_enable_telemetry": True,
Expand All @@ -245,9 +268,6 @@ def test_sql_errors(self, telemetry_interceptors, query, expected_error):
cursor.execute(query)
cursor.fetchone()

time.sleep(2)
wait(captured_futures, timeout=10)

assert len(captured_events) >= 1

# Find event with error_info
Expand All @@ -261,11 +281,11 @@ def test_sql_errors(self, telemetry_interceptors, query, expected_error):

def test_metadata_operation(self, telemetry_interceptors):
"""Test telemetry for metadata operations (getCatalogs)"""
captured_events, captured_futures, export_wrapper, callback_wrapper = \
captured_events, captured_submissions, export_wrapper, send_wrapper = \
telemetry_interceptors

with patch.object(TelemetryClient, "_export_event", export_wrapper), \
patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper):
patch.object(TelemetryClient, "_send_telemetry", send_wrapper):

with self.connection(extra_params={
"force_enable_telemetry": True,
Expand All @@ -275,21 +295,18 @@ def test_metadata_operation(self, telemetry_interceptors):
catalogs = cursor.catalogs()
catalogs.fetchall()

time.sleep(2)
wait(captured_futures, timeout=10)

assert len(captured_events) >= 1
for event in captured_events:
self.assert_system_config(event)
self.assert_connection_params(event, self.arguments["http_path"])

def test_direct_results(self, telemetry_interceptors):
"""Test telemetry with direct results (use_cloud_fetch=False)"""
captured_events, captured_futures, export_wrapper, callback_wrapper = \
captured_events, captured_submissions, export_wrapper, send_wrapper = \
telemetry_interceptors

with patch.object(TelemetryClient, "_export_event", export_wrapper), \
patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper):
patch.object(TelemetryClient, "_send_telemetry", send_wrapper):

with self.connection(extra_params={
"force_enable_telemetry": True,
Expand All @@ -301,9 +318,6 @@ def test_direct_results(self, telemetry_interceptors):
result = cursor.fetchall()
assert len(result) == 1 and result[0][0] == 100

time.sleep(2)
wait(captured_futures, timeout=10)

assert len(captured_events) >= 2
for event in captured_events:
self.assert_system_config(event)
Expand All @@ -320,11 +334,11 @@ def test_direct_results(self, telemetry_interceptors):
def test_cloudfetch_with_different_close_patterns(self, telemetry_interceptors,
close_type):
"""Test telemetry with cloud fetch using different resource closing patterns"""
captured_events, captured_futures, export_wrapper, callback_wrapper = \
captured_events, captured_submissions, export_wrapper, send_wrapper = \
telemetry_interceptors

with patch.object(TelemetryClient, "_export_event", export_wrapper), \
patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper):
patch.object(TelemetryClient, "_send_telemetry", send_wrapper):

if close_type == "explicit_connection":
# Test explicit connection close
Expand Down Expand Up @@ -365,9 +379,6 @@ def test_cloudfetch_with_different_close_patterns(self, telemetry_interceptors,
result = cursor.fetchall()
assert len(result) == 1000

time.sleep(2)
wait(captured_futures, timeout=10)

assert len(captured_events) >= 2
for event in captured_events:
self.assert_system_config(event)
Expand Down
Loading