In [1]:
# pip install google-cloud-discoveryengine google-cloud-storage

from typing import List, Dict, Any
from google.cloud import discoveryengine_v1 as de
from google.api_core.client_options import ClientOptions
from google.cloud import storage

def search_and_fetch_txt(
    serving_config_path: str,   # np. "projects/<PID>/locations/us/collections/default_collection/dataStores/<DS_ID>/servingConfigs/default_config"
    location: str = "us",       # MUSI odpowiadać lokalizacji DS/Engine ("us" lub "global")
    query: str = "",
    top_k: int = 3,
    head_chars: int = 600       # ile znaków z .txt pobrać (dla podglądu)
) -> List[Dict[str, Any]]:
    """
    Standard edition:
      - szuka po 'serving_config_path',
      - zwraca listę hitów + od razu pobiera treść .txt z GCS (pierwsze head_chars znaków).
    Wymaga, aby NDJSON miał: structData.links.txt_uri / plan_uri / transcript_uri / metrics_uri.
    """
    # klient w regionie
    client = de.SearchServiceClient(
        client_options=ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
    )
    req = de.SearchRequest(
        serving_config=serving_config_path,
        query=query,
        page_size=top_k,
        # bez enterprise-only ficzerów; snippet_spec może działać, ale nie wymagamy
    )

    storage_client = storage.Client()
    results: List[Dict[str, Any]] = []

    for r in client.search(request=req):
        doc = r.document
        sdata = doc.struct_data or {}
        links = sdata.get("links", {}) or {}

        txt_uri = links.get("txt_uri")
        txt_preview = ""
        if txt_uri and txt_uri.startswith("gs://"):
            bkt, _, path = txt_uri[5:].partition("/")
            blob = storage_client.bucket(bkt).blob(path)
            # pobierz max head_chars (jeśli plik duży)
            data = blob.download_as_text(encoding="utf-8")
            txt_preview = data[:head_chars] + ("…" if len(data) > head_chars else "")

        results.append({
            "id": r.id or getattr(doc, "id", None),
            "txt_uri": txt_uri,
            "txt_preview": txt_preview,                 # ← podgląd treści
            "plan_uri": links.get("plan_uri"),
            "transcript_uri": links.get("transcript_uri"),
            "metrics_uri": links.get("metrics_uri"),
            "structData": sdata,                        # pełne Twoje metadane (np. tags, approved, itp.)
        })
    return results


In [2]:
SERVING_CONFIG = "projects/815755318672/locations/us/collections/default_collection/dataStores/external-memory-connector_1756845276280_gcs_store/servingConfigs/default_config"


hits = search_and_fetch_txt(
    serving_config_path=SERVING_CONFIG,
    location="us",
    query="retry i rollback w odkrywaniu przyczynowości",
    top_k=3,
    head_chars=600,
)
for h in hits:
    print("ID:", h["id"])
    print("TXT URI:", h["txt_uri"])
    print("PREVIEW:\n", h["txt_preview"])
    print("PLAN:", h["plan_uri"])
    print("---")


ID: 20250829_212954_1e55a591
TXT URI: gs://external_memory/missions/2025/08/29/20250829_212954-zbuduj-adaptacyjny-system-ml-z-continuous-learning-1e55a591.txt
PREVIEW:
 # Mission: Zbuduj adaptacyjny system ML z continuous learning
ID: mission_20250829_212954_1e55a591
Timestamp: 2025-08-29T21:29:54Z
Type: general
Tags: optimized, robust
Outcome | Score | Verdict: Success | 94.9 | ZATWIERDZONY

## Executive Summary
Misja zakończona w 1 iteracji. Zastosowano: optimization, adaptive routing, rollback mechanism. Struktura: 25 węzłów, 21 ścieżek sukcesu, 10 ścieżek obsługi błędów. Kluczowe komponenty: error_handler, validate_data, optimize_performance, rollback. Plan zatwierdzony przez krytyka bez zastrzeżeń.

## Final Plan (skrót)
Entry: Load_Initial_Data
Węzły: L…
PLAN: gs://external_memory/missions/2025/08/29/20250829_212954-zbuduj-adaptacyjny-system-ml-z-continuous-learning-1e55a591.plan.json
---
ID: 20250829_212413_92ed8ebc
TXT URI: gs://external_memory/missions/2025/08/29/20250829_2124