Skip to content
30 changes: 25 additions & 5 deletions src/google/adk/skills/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""Utility functions for Agent Skills."""

from __future__ import annotations

import os
import logging
import pathlib
from typing import Union
Expand Down Expand Up @@ -401,20 +401,40 @@ def _load_skill_from_gcs_dir(
f" name '{skill_name_expected}'."
)

def _load_files_in_dir(subdir: str) -> Dict[str, Union[str, bytes]]:
def _load_files_in_dir(subdir: str) -> dict[str, str | bytes]:
prefix = f"{skill_dir_prefix}{subdir}/"
blobs = bucket.list_blobs(prefix=prefix)
result = {}

for blob in blobs:
relative_path = blob.name[len(prefix) :]
relative_path = blob.name[len(prefix):]
if not relative_path:
continue

# Use PurePosixPath for platform-independent GCS path validation
p = pathlib.PurePosixPath(relative_path)

# Reject absolute paths and traversal sequences
if p.is_absolute() or ".." in p.parts:
raise ValueError(
f"Unsafe path in skill resource: {relative_path!r}"
)

normalized = p.as_posix()

# Prevent silent file overwrites via path aliasing
if normalized in result:
raise ValueError(
f"Duplicate normalized path detected: {normalized!r}"
)

# NOTE: Final path safety enforced during materialization
# via realpath + commonpath checks in skill_toolset.py
try:
result[relative_path] = blob.download_as_text()
result[normalized] = blob.download_as_text()
except UnicodeDecodeError:
result[relative_path] = blob.download_as_bytes()
result[normalized] = blob.download_as_bytes()

return result

references = _load_files_in_dir("references")
Expand Down
9 changes: 8 additions & 1 deletion src/google/adk/tools/skill_toolset.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ def _build_wrapper_code(
)

# Build the boilerplate extract string

code_lines = [
"import os",
"import tempfile",
Expand All @@ -531,8 +532,14 @@ def _build_wrapper_code(
"def _materialize_and_run():",
" _orig_cwd = os.getcwd()",
" with tempfile.TemporaryDirectory() as td:",
" _real_base = os.path.realpath(td)",
" for rel_path, content in _files.items():",
" full_path = os.path.join(td, rel_path)",
" if os.path.isabs(rel_path):",
" raise ValueError(f'Absolute path rejected: {rel_path!r}')",
" _safe = os.path.realpath(os.path.join(td, rel_path))",
" if os.path.commonpath([_real_base, _safe]) != _real_base:",
" raise ValueError(f'Path traversal detected: {rel_path!r}')",
" full_path = _safe",
" os.makedirs(os.path.dirname(full_path), exist_ok=True)",
" mode = 'wb' if isinstance(content, bytes) else 'w'",
" with open(full_path, mode) as f:",
Expand Down