Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions src/google/adk/plugins/bigquery_agent_analytics_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1430,14 +1430,20 @@ async def _parse_content_object(

# CASE C: Text
elif hasattr(part, "text") and part.text:
text_len = len(part.text.encode("utf-8"))
# If max_length is set and smaller than inline limit, use it as threshold
# to prefer offloading over truncation.
offload_threshold = self.inline_text_limit
if self.max_length != -1 and self.max_length < offload_threshold:
offload_threshold = self.max_length

if self.offloader and text_len > offload_threshold:
char_len = len(part.text)
byte_len = len(part.text.encode("utf-8"))

# Decide whether to offload using each limit in its own
# unit. inline_text_limit is a byte-based storage guard;
# max_length is a character-based truncation limit.
# Comparing them in a single min() mixes units and
# produces wrong decisions for multi-byte text.
exceeds_inline_byte_limit = byte_len > self.inline_text_limit
exceeds_char_limit = (
self.max_length != -1 and char_len > self.max_length
)

if self.offloader and (exceeds_inline_byte_limit or exceeds_char_limit):
# Text is too big, treat as file
path = f"{datetime.now().date()}/{self.trace_id}/{self.span_id}_p{idx}.txt"
try:
Expand Down Expand Up @@ -2653,7 +2659,14 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:

async def _ensure_started(self, **kwargs) -> None:
"""Ensures that the plugin is started and initialized."""
if os.getpid() != self._init_pid:
# _init_pid == 0 means the plugin was unpickled and has never been
# initialized in this process (the pickle sentinel set by
# __getstate__). Skip the fork reset in that case — no fork
# happened, and _started is already False so _lazy_setup will run.
# Real forks are caught by os.register_at_fork (line 108) and by
# this check when _init_pid is a real (non-zero) PID from a
# different process.
if self._init_pid != 0 and os.getpid() != self._init_pid:
self._reset_runtime_state()
if not self._started:
# Kept original lock name as it was not explicitly changed.
Expand All @@ -2665,6 +2678,12 @@ async def _ensure_started(self, **kwargs) -> None:
await self._lazy_setup(**kwargs)
self._started = True
self._startup_error = None
# Record the current PID so fork detection works for
# the rest of this instance's lifetime. Without this,
# an unpickled plugin would keep _init_pid == 0 forever,
# disabling the PID-based fork check.
if self._init_pid == 0:
self._init_pid = os.getpid()
except Exception as e:
self._startup_error = e
logger.error("Failed to initialize BigQuery Plugin: %s", e)
Expand Down
196 changes: 196 additions & 0 deletions tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7408,3 +7408,199 @@ async def test_view_error_still_logged(
) as plugin:
await plugin._ensure_started()
assert plugin._started


# ================================================================
# TEST CLASS: Fork detection after pickle (Issue #86)
# ================================================================
class TestForkDetectionAfterPickle:
"""Tests that unpickled plugins do not false-positive fork detection."""

@pytest.mark.asyncio
async def test_no_reset_after_unpickle(
self,
mock_auth_default,
mock_bq_client,
mock_write_client,
mock_to_arrow_schema,
mock_asyncio_to_thread,
):
"""Unpickled plugin does not trigger _reset_runtime_state."""
import pickle

config = bigquery_agent_analytics_plugin.BigQueryLoggerConfig(
create_views=False,
)
plugin = bigquery_agent_analytics_plugin.BigQueryAgentAnalyticsPlugin(
PROJECT_ID, DATASET_ID, table_id=TABLE_ID, config=config
)
# Pickle round-trip simulates Vertex AI Agent Engine deployment
pickled = pickle.dumps(plugin)
unpickled = pickle.loads(pickled)

assert unpickled._init_pid == 0 # pickle sentinel

with mock.patch.object(unpickled, "_reset_runtime_state") as mock_reset:
await unpickled._ensure_started()
# Should NOT have called _reset_runtime_state because
# _init_pid == 0 means "unpickled, never initialized"
mock_reset.assert_not_called()

assert unpickled._started
# After successful startup, _init_pid should be recorded so
# fork detection works for the rest of this instance's lifetime.
assert unpickled._init_pid == os.getpid()
await unpickled.shutdown()

@pytest.mark.asyncio
async def test_reset_on_real_fork(
self,
mock_auth_default,
mock_bq_client,
mock_write_client,
mock_to_arrow_schema,
mock_asyncio_to_thread,
):
"""Plugin detects real fork when _init_pid is a real non-zero PID."""
config = bigquery_agent_analytics_plugin.BigQueryLoggerConfig(
create_views=False,
)
async with managed_plugin(
project_id=PROJECT_ID,
dataset_id=DATASET_ID,
table_id=TABLE_ID,
config=config,
) as plugin:
await plugin._ensure_started()
# Simulate a fork: set _init_pid to a different real PID
plugin._init_pid = max(os.getpid() - 1, 1)
plugin._started = True # pretend it was started in parent

with mock.patch.object(
plugin, "_reset_runtime_state", wraps=plugin._reset_runtime_state
) as mock_reset:
await plugin._ensure_started()
# Should have called _reset_runtime_state because
# _init_pid is a real PID different from os.getpid()
mock_reset.assert_called_once()


# ================================================================
# TEST CLASS: GCS offload unit mismatch fix (Issue #5561)
# ================================================================
class TestOffloadUnitSeparation:
"""Tests that byte-based inline limit and character-based truncation
limit are evaluated independently for the GCS offload decision."""

@pytest.mark.asyncio
async def test_multibyte_text_offloaded_by_byte_limit(self):
"""Multi-byte text exceeding inline_text_limit bytes is offloaded."""
mock_offloader = mock.AsyncMock()
mock_offloader.upload_content.return_value = "gs://bucket/offloaded.txt"

parser = bigquery_agent_analytics_plugin.HybridContentParser(
offloader=mock_offloader,
trace_id="t",
span_id="s",
max_length=-1, # no character truncation
)
# 10K emoji chars → ~40KB UTF-8, exceeds inline_text_limit (32KB)
text = "\U0001f600" * 10000
assert len(text) == 10000 # characters
assert len(text.encode("utf-8")) > 32 * 1024 # bytes

content = types.Content(parts=[types.Part(text=text)])
_, parts, _ = await parser._parse_content_object(content)

mock_offloader.upload_content.assert_called_once()
assert parts[0]["storage_mode"] == "GCS_REFERENCE"

@pytest.mark.asyncio
async def test_ascii_under_both_limits_stays_inline(self):
"""ASCII text under both byte and character limits stays inline."""
mock_offloader = mock.AsyncMock()

parser = bigquery_agent_analytics_plugin.HybridContentParser(
offloader=mock_offloader,
trace_id="t",
span_id="s",
max_length=50000,
)
text = "A" * 1000 # 1K chars = 1K bytes, under both limits
content = types.Content(parts=[types.Part(text=text)])
_, parts, _ = await parser._parse_content_object(content)

mock_offloader.upload_content.assert_not_called()
assert parts[0]["storage_mode"] == "INLINE"
assert parts[0]["text"] == text

@pytest.mark.asyncio
async def test_text_exceeding_char_limit_offloaded(self):
"""ASCII text exceeding max_length characters is offloaded."""
mock_offloader = mock.AsyncMock()
mock_offloader.upload_content.return_value = "gs://bucket/big.txt"

parser = bigquery_agent_analytics_plugin.HybridContentParser(
offloader=mock_offloader,
trace_id="t",
span_id="s",
max_length=100, # small char limit
)
# 200 ASCII chars — under byte limit (32KB) but over char limit
text = "X" * 200
assert len(text.encode("utf-8")) < 32 * 1024
assert len(text) > 100

content = types.Content(parts=[types.Part(text=text)])
_, parts, _ = await parser._parse_content_object(content)

mock_offloader.upload_content.assert_called_once()
assert parts[0]["storage_mode"] == "GCS_REFERENCE"

@pytest.mark.asyncio
async def test_no_offloader_falls_back_to_truncate(self):
"""Without offloader, text exceeding char limit is truncated inline."""
parser = bigquery_agent_analytics_plugin.HybridContentParser(
offloader=None,
trace_id="t",
span_id="s",
max_length=50,
)
text = "Z" * 200
content = types.Content(parts=[types.Part(text=text)])
_, parts, is_truncated = await parser._parse_content_object(content)

assert is_truncated
assert parts[0]["storage_mode"] == "INLINE"
assert "TRUNCATED" in parts[0]["text"]

@pytest.mark.asyncio
async def test_multibyte_under_char_and_byte_limits_stays_inline(self):
"""Multi-byte text under both char limit and byte limit stays inline.

This is the specific regression case from #5561: with the old
mixed-unit min(), max_length=10000 became the offload_threshold,
and byte_len (12K) > 10000 triggered a false offload even though
char_len (3K) < max_length and byte_len (12K) < inline_text_limit
(32KB).
"""
mock_offloader = mock.AsyncMock()
parser = bigquery_agent_analytics_plugin.HybridContentParser(
offloader=mock_offloader,
trace_id="t",
span_id="s",
max_length=10000,
)

# 3K emoji chars → ~12K bytes
text = "\U0001f600" * 3000
assert len(text) < 10000 # under char limit
assert len(text.encode("utf-8")) > 10000 # bytes > max_length
assert len(text.encode("utf-8")) < 32 * 1024 # under byte limit

content = types.Content(parts=[types.Part(text=text)])
_, parts, _ = await parser._parse_content_object(content)

# Should NOT offload: under both real limits
mock_offloader.upload_content.assert_not_called()
assert parts[0]["storage_mode"] == "INLINE"