In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Companion Code for SOL310: Adding Agentic Aware Security Next '25

<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/hugoselbie/Companion_SOL310_Next25_Agentic-Aware-Security/blob/main/notebook.ipynb">
      <img width="32px" src="https://www.gstatic.com/pantheon/images/bigquery/welcome_page/colab-logo.svg" alt="Google Colaboratory logo"><br> Open in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2Fhugoselbie%2FCompanion_SOL310_Next25_Agentic-Aware-Security%2Fmain%2Fnotebook.ipynb">
      <img width="32px" src="https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN" alt="Google Cloud Colab Enterprise logo"><br> Open in Colab Enterprise
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/hugoselbie/Companion_SOL310_Next25_Agentic-Aware-Security/main/notebook.ipynb">
      <img src="https://www.gstatic.com/images/branding/gcpiconscolors/vertexai/v1/32px.svg" alt="Vertex AI logo"><br> Open in Vertex AI Workbench
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://github.com/hugoselbie/Companion_SOL310_Next25_Agentic-Aware-Security">
      <img width="32px" src="https://www.svgrepo.com/download/217753/github.svg" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
</table>
<div style="clear: both;"></div>

# Setting up the environment

In [1]:
%pip install --upgrade --quiet \
    "google-cloud-aiplatform[agent_engines,langchain]" \
    langgraph \
    langchain \
    google-cloud-aiplatform google-cloud-discoveryengine \
    cloudpickle==3.0.0 \
    "pydantic>=2.10" \
    requests


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
import hashlib
import json
import os

import subprocess
import time
import operator
from copy import deepcopy
from typing import TypedDict, Annotated, List, Literal, Optional
from dataclasses import dataclass
import google.auth
import google.auth.transport.requests
import requests
import vertexai
from langgraph.graph import START, StateGraph
from langchain_core.messages import SystemMessage
from langgraph.graph.message import add_messages
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode, tools_condition, InjectedState
from langchain_core.runnables.config import RunnableConfig
from vertexai import agent_engines
from vertexai.preview.reasoning_engines import LangchainAgent

In [3]:
from google.colab import auth
auth.authenticate_user()

ModuleNotFoundError: No module named 'google.colab'

# Project Setup
To execute the usecases outlined in SOL310: Adding Agentic Aware Security presented at Next '25, there are a few setup parameters to go through. The case study is using vertex ai search with an associated document datastore derived from documents that are stored in GCS. 

To execute the example below we're expecting you to have already configured a VAIS search app with associated datastore and have documents that are stored in a known GCS bucket that you have read/write permissions on. 


In [3]:
PROJECT_NUMBER = 1077649599081
PROJECT_ID = 'big-data-379417' #@param {type:"string"}
BUCKET_NAME = 'data_storage_aiml_hugo' #@param {type:"string"}
DATA_STORE_ID = 'test-subfolder_1697051023694'  #@param {type:"string"} 
SEARCH_APP = 'test-deloitte_1697050551691'  #@param {type:"string"} 
LOCATION = 'global' #@param {type:"string"} 
API_DOMAIN = 'discoveryengine' #@param {type:"string"}

SEARCH_URL = f'https://{API_DOMAIN}.googleapis.com/v1alpha/projects/{PROJECT_NUMBER}/locations/{LOCATION}/collections/default_collection/engines/{SEARCH_APP}/servingConfigs/default_search:search'
LIST_DOCUMENTS_URL = f'https://{API_DOMAIN}.googleapis.com/v1alpha/projects/{PROJECT_NUMBER}/locations/{LOCATION}/collections/default_collection/dataStores/{DATA_STORE_ID}/branches/default_branch/documents'
CREATE_DOCUMENT_URL_TEMPLATE = f'https://{API_DOMAIN}.googleapis.com/v1alpha/projects/{PROJECT_NUMBER}/locations/{LOCATION}/collections/default_collection/dataStores/{DATA_STORE_ID}/branches/default_branch/documents?documentId={{}}'
DOCUMENT_URL_TEMPLATE = f'https://{API_DOMAIN}.googleapis.com/v1alpha/projects/{PROJECT_NUMBER}/locations/{LOCATION}/collections/default_collection/dataStores/{DATA_STORE_ID}/branches/default_branch/documents/{{}}'

In [4]:
os.environ['LOCAL'] = 'true'

# Defining access control and getting permission information

In [13]:
@dataclass
class Access:
    scope: str
    specifier: str

    def __lt__(self, other):
        return (self.scope, self.specifier) < (other.scope, other.specifier)

    def __hash__(self):
        return hash((self.scope, self.specifier))

    def match(self, other):
        if self.scope != other.scope:
            return None
        if self.specifier == '*':
            return other
        if other.specifier == '*':
            return self
        if self.specifier == other.specifier:
            return self
        return None

    @property
    def name(self):
        return f'{self.scope}_{self.specifier}'


class Restriction:

    HASH_OUTPUT_BYTES = 16

    def __init__(self, access):
        self.accesses = [access]

    def update(self, access):
        if access in self.accesses:
            raise ValueError('this access is already present')
        if access.scope in (x.scope for x in self.accesses):
            raise ValueError('access with the same scope is already present')
        self.accesses.append(access)
        self.accesses.sort()  # we want to have a stable order

    def __str__(self):
        return ' & '.join(str(x) for x in self.accesses)

    def __hash__(self):
        return hash(tuple(self.accesses))  # already sorted

    def __eq__(self, other):
        return self.accesses == other.accesses

    def __repr__(self):
        return str(self)

    @property
    def hash(self):
        string = str(self)
        return hashlib.shake_128(string.encode()).hexdigest(self.HASH_OUTPUT_BYTES)

    def match(self, other):
        if len(self.accesses) != len(other.accesses):
            return None

        accesses = []
        for access in self.accesses:
            matched = [access.match(x) for x in other.accesses]
            matched = [x for x in matched if x]
            if not matched:
                break
            assert len(matched) == 1
            accesses.extend(matched)

        if not accesses:
            return None
        restriction = Restriction(accesses.pop())
        while accesses:
            restriction.update(accesses.pop())
        return restriction

    def has_wildcard(self):
        return any(x for x in self.accesses if x.specifier == '*')

In [14]:
class Principal:

    def __new__(cls, id_):
        instance = cls.instances.get(id_)
        if instance is None:
            instance = super().__new__(cls)
            cls.instances[id_] = instance
        return instance

    def __init__(self, id_, *args, **kwargs):
        if hasattr(self, 'id'):  # is initialized?
            return
        self.id = id_
        self.read_restriction = None
        self.write_restriction = None
        self.membership = set()

    def __repr__(self):
        return f'{self.__class__.__name__}({self.id!r})'

    def __hash__(self):
        return hash(self.id)

    def __lt__(self, other):
        return self.id < other.id

    @property
    def membership_expanded(self):
        expanded = set()
        stack = list(self.membership)
        while stack:
            group = stack.pop()
            expanded.add(group)
            stack.extend(group.membership)
        return expanded

    def restrict_read(self, access):
        if self.read_restriction is None:
            self.read_restriction = Restriction(access)
        else:
            self.read_restriction.update(access)

    def restrict_write(self, access):
        if self.write_restriction is None:
            self.write_restriction = Restriction(access)
        else:
            self.write_restriction.update(access)

    @classmethod
    def all(cls):
        return cls.instances

    @classmethod
    def clear_all(cls):
        cls.instances.clear()

    @property
    def read_restrictions(self):
        restrictions = {group.read_restriction for group in self.membership_expanded}
        restrictions.add(self.read_restriction)
        return {x for x in restrictions if x}

    @property
    def write_restrictions(self):
        restrictions = {group.write_restriction for group in self.membership_expanded}
        restrictions.add(self.write_restriction)
        return {x for x in restrictions if x}


class Individual(Principal):
    pass


class User(Individual):

    instances = {}

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        if hasattr(self, 'initialized'):
            return
        access = Access(scope='user', specifier=self.id)
        self.restrict_read(access)
        self.restrict_write(access)
        self.initialized = ...


class Agent(Individual):

    instances = {}


class Group(Principal):

    instances = {}

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        if hasattr(self, 'initialized'):
            return
        self.members = set()
        self.initialized = ...

    @property
    def members_expanded(self):
        expanded = set()
        stack = list(self.members)
        while stack:
            member = stack.pop()
            expanded.add(member)
            if isinstance(member, Group):
                stack.extend(member.members)
        return expanded

    def add_member(self, member):
        if member in self.members:
            raise ValueError('already a member')
        if isinstance(member, Group):
            if self.id == member.id:
                raise ValueError('cannot be own member')
            if self in member.members_expanded:
                raise ValueError('cycle detected')
        self.members.add(member)
        member.membership.add(self)

    def remove_member(self, member):
        if member not in self.members:
            raise ValueError('not a member')
        self.members.remove(member)
        member.membership.remove(self)

In [15]:
def reconcile_restrictions(*restriction_collections):
    if len(restriction_collections) < 2:
        raise ValueError('at least 2 collections has to be provided')
    stack = list(restriction_collections)
    first = stack.pop()
    while stack:
        second = stack.pop()
        merged = set()
        for first_restriction in first:
            for second_restriction in second:
                match = first_restriction.match(second_restriction)
                if match is not None:
                    merged.add(match)
        first = merged
    # wildcard cannot be present in the output collection
    without_wildcard = {x for x in merged if not x.has_wildcard()}
    return without_wildcard

def get_filter(restrictions):
    if not restrictions:
        return None
    hashes = [x.hash for x in restrictions]
    filter_template = 'restrictions:ANY({})'
    li = ','.join(f'"{x}"' for x in hashes)
    the_filter = filter_template.format(li)
    return the_filter

In [16]:
def upload_choices(principal):
    return {
        ' '.join(x.name for x in restriction.accesses): restriction
        for restriction in principal.write_restrictions
    }

# Interacting with Vertex AI data store's documents

In [11]:
def get_token():
    if os.environ.get('LOCAL'):
        output = subprocess.run(['gcloud', 'auth', 'print-access-token'], capture_output=True, text=True)
        token = output.stdout.strip()
        return token
    creds, _ = google.auth.default()
    auth_req = google.auth.transport.requests.Request()
    creds.refresh(auth_req)
    token = creds.token
    return token

In [12]:
class Resource:
    pass

class Row(Resource):
    pass

@dataclass
class Document(Resource):
    id: str
    restrictions: List[Restriction]
    metadata: dict
    mime: str
    uri: str

    @property
    def title(self):
        return self.uri.rsplit('/', maxsplit=1)[-1].split('.')[0]

    @staticmethod
    def _headers():
        token = get_token()
        headers = {
            'Content-Type': 'application/json',
            'Authorization': f'Bearer {token}'
        }
        return headers

    @classmethod
    def list_all(cls):
        response = requests.get(LIST_DOCUMENTS_URL, headers=cls._headers())
        return response.json()

    def as_resource(self):
        metadata = deepcopy(self.metadata)
        metadata['restrictions'] = [
            x.hash if isinstance(x, Restriction) else x
            for x in self.restrictions
        ]
        resource = {
            "name": f"projects/{PROJECT_NUMBER}/locations/{LOCATION}/collections/default_collection/dataStores/{DATA_STORE_ID}/branches/0/documents/{self.id}",
            "id": self.id,
            "schemaId": "default_schema",
            "structData": metadata,
            "parentDocumentId": self.id,
            "content": {
                "mimeType": self.mime,
                "uri": self.uri
            }
        }
        return resource

    def create(self):
        response = requests.post(
            CREATE_DOCUMENT_URL_TEMPLATE.format(self.id),
            headers=self._headers(),
            json=self.as_resource()
        )
        return response.json()

    @classmethod
    def from_id(cls, doc_id):
        resource = cls.get(doc_id)
        metadata = resource['structData']
        restrictions = metadata.pop('restrictions', [])
        mime = resource['content']['mimeType']
        uri = resource['content']['uri']
        return cls(doc_id, restrictions, metadata, mime, uri)

    @staticmethod
    def get(doc_id):
        response = requests.get(
            DOCUMENT_URL_TEMPLATE.format(doc_id),
            headers=Document._headers()
        )
        if response.status_code != 200:
            raise Exception(f'status_code: {response.status_code}')
        return response.json()

    def wait_till_indexed(self):
        sleep_interval = 5
        max_waiting_time = sleep_interval * 60
        start = time.perf_counter()
        while time.perf_counter() - start < max_waiting_time:
            print('checking index status...')
            response = self.get(self.id)
            if 'indexTime' in response:
                break
            time.sleep(sleep_interval)
        return response

    def patch(self):
        response = requests.patch(
            DOCUMENT_URL_TEMPLATE.format(self.id),
            headers=self._headers(),
            json=self.as_resource()
        )
        return response.json()

    def delete(self):
        response = requests.delete(
            DOCUMENT_URL_TEMPLATE.format(self.id),
            headers=self._headers()
        )
        return response.json()

    def update(self):
        # updating could be done with:
        # print(self.patch())
        # ---
        # alternatively with import method (re-importing using in place documents in the API request)
        # ---
        # at the moment using "delete and re-create" workaround
        # as patching and importing currently doesn't work correctly due to an internal error:
        # "Path: does not start with gs://"
        print(self.delete())
        print(self.create())

# Searching in a data store

In [17]:
def search(query, search_filter):
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {get_token()}'
    }
    payload = {
        "query": query,
        "pageSize": 100,
        "contentSearchSpec": {
            "searchResultMode": "CHUNKS"
        },
        "filter": search_filter,
        "relevanceThreshold": "MEDIUM",
    }
    response = requests.post(SEARCH_URL, headers=headers, json=payload)
    return response.json()

In [18]:
def search_without_filter(query):
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {get_token()}'
    }
    payload = {
        "query": query,
        "pageSize": 100,
        "contentSearchSpec": {
            "searchResultMode": "CHUNKS"
        },
        "relevanceThreshold": "MEDIUM",
    }
    response = requests.post(SEARCH_URL, headers=headers, json=payload)
    return response.json()

# Building a LangGraph-based multi-agent application

In [19]:
@tool
def detect_tense(tense: Literal['future', 'past', 'n/a']):
    """used to indicate if the given phrase is about future, past or none of them (n/a)"""

def tools_or_another_agent(state):
    print('redirecting to a different agent, calling the tools or finishing...')
    return state.get('next_agent') or tools_condition(state)

def last_agent(state):
    print('redirecting to the last agent...')
    return state['agent_history'][-1]

In [20]:
def make_agent(name, llm, tools):
    def agent(state):
        print(name)
        messages = state['messages']

        # check if should route to another agent
        last_the_same = state['agent_history'] and state['agent_history'][-1] == name
        if state.get('next_agent') != name and not last_the_same:
            llm_with_tools = llm.bind_tools([detect_tense], tool_choice=True)
            system = SystemMessage(
                """Given this conversation, detect if the next step is to deal with something to do with HVAC docs or Security based docs
                if it's not clear then use 'n/a')"""
            )
            response = llm_with_tools.invoke(
                [system] + [m for m in messages if not isinstance(m, SystemMessage)]
            )
            tense = response.tool_calls[0]['args']['tense']
            if tense != 'n/a':
                return {'agent_history': [name], 'next_agent': tense}

        # proceed normally
        llm_with_tools = llm.bind_tools(tools)
        response = llm_with_tools.invoke(messages)
        return {'messages': [response], 'agent_history': [name], 'next_agent': None, 'retrieved_documents': []}
    return agent

class State(TypedDict):
    messages: Annotated[List[str], add_messages]
    agent_history: Annotated[List[str], operator.add]
    next_agent: Optional[str]
    retrieved_documents: Annotated[List[list], operator.add]

@tool
def search_tool(query: str, config: RunnableConfig, state: Annotated[dict, InjectedState]):
    """this search tool has to be called for any queries on specific companies"""
    current_agent = state['agent_history'][-1]
    agent_to_filter = config['configurable']['user_info']['agent_to_filter']
    search_filter = agent_to_filter[current_agent]
    resp = search(query, search_filter)
    titles = set()
    chunks = []
    try:
        for result in resp['results']:
            titles.add(result['chunk']['documentMetadata']['title'])
            chunks.append(result['chunk']['content'])
    except KeyError:
        print(resp)
    state['retrieved_documents'].append(sorted(titles))  # workaround for debug purposes as LangGraph doesn't support state modifications with tools atm
    return list(dict.fromkeys(chunks))  # dedup as the chunks here come from identical docs

In [21]:
def runnable_builder(model, *args, **kwargs):
    workflow = StateGraph(State)

    toolkit = [search_tool]
    tool_node = ToolNode(toolkit)

    workflow.add_node('coordinator', make_agent('coordinator', model, toolkit))
    workflow.add_node('hvacdoc', make_agent('hvacdoc', model, toolkit))
    workflow.add_node('secdoc', make_agent('secdoc', model, toolkit))
    workflow.add_node('tools', tool_node)

    workflow.add_edge(START, 'coordinator')
    workflow.add_conditional_edges('coordinator', tools_or_another_agent)
    workflow.add_conditional_edges('hvacdoc', tools_or_another_agent)
    workflow.add_conditional_edges('secdoc', tools_or_another_agent)
    workflow.add_conditional_edges('tools', last_agent)

    graph = workflow.compile()
    return graph

# Agent Engine deployment

In [22]:
vertexai.init(
    project=PROJECT_ID, 
    location="us-central1",
    staging_bucket=f'gs://{BUCKET_NAME}' 
)

re_agent = LangchainAgent(
    model='gemini-2.0-flash',
    runnable_builder=runnable_builder
    
)

In [23]:
remote_agent = agent_engines.create(
    re_agent,
    requirements=[
        "google-cloud-aiplatform[agent_engines,langchain]",
        "cloudpickle==3.0.0",
        "pydantic>=2.10",
        "requests",
        "google-auth",
        "langgraph",
        "langchain-core",
    ],
)

Identified the following requirements: {'cloudpickle': '3.0.0', 'google-cloud-aiplatform': '1.87.0'}
The final list of requirements: ['google-cloud-aiplatform[agent_engines,langchain]', 'cloudpickle==3.0.0', 'pydantic>=2.10', 'requests', 'google-auth', 'langgraph', 'langchain-core']
Using bucket data_storage_aiml_hugo
Wrote to gs://data_storage_aiml_hugo/agent_engine/agent_engine.pkl
Writing to gs://data_storage_aiml_hugo/agent_engine/requirements.txt
Creating in-memory tarfile of extra_packages
Writing to gs://data_storage_aiml_hugo/agent_engine/dependencies.tar.gz
Creating AgentEngine
Create AgentEngine backing LRO: projects/1077649599081/locations/us-central1/reasoningEngines/6559079440893083648/operations/161345737873948672
View progress and logs at https://console.cloud.google.com/logs/query?project=big-data-379417


InternalServerError: 500 Please refer to our documentation (https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/troubleshooting/deploy) for checking logs and other troubleshooting tips. 13: Please refer to our documentation (https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/troubleshooting/deploy) for checking logs and other troubleshooting tips.

In [None]:
remote_app = agent_engines.AgentEngine('projects/1077649599081/locations/us-central1/reasoningEngines/4354708160783581184')

# Helper functions for testing

In [24]:
def show_environment():
    print('environment:')
    print('\t', Document.list_all())
    print('\t', Group.all())
    print('\t', User.all())
    print('\t', Agent.all())

In [25]:
def clear_environment():
    for doc_id in [x['id'] for x in Document.list_all().get('documents', [])]:
        Document.from_id(doc_id).delete()
    Group.clear_all()
    User.clear_all()
    Agent.clear_all()


# def clear_environment():
#     all_docs_result = Document.list_all()
#     doc_list = all_docs_result. # Use .get() for safety
#     if doc_list: # Check if the list is not empty
#          for doc_id in [x['id'] for x in doc_list]:
#              try: # Add try/except for robustness during deletion
#                 doc = Document.from_id(doc_id)
#                 if doc: # Ensure from_id didn't return None
#                     doc.delete()
#              except Exception as e:
#                  print(f"Warning: Failed to delete doc {doc_id}: {e}")
#     Group.clear_all()
#     User.clear_all()
#     Agent.clear_all()

In [26]:
def final_response(state):
    return state['messages'][-1]['kwargs']['content']

def retrieved_documents(state_or_search_results):
    if 'retrieved_documents' in state_or_search_results:
        return state_or_search_results['retrieved_documents'][-1]
    docs = {x['chunk']['documentMetadata']['title'] for x in state_or_search_results.get('results', [])}
    return sorted(docs)

In [27]:
def confirm_retrievable(query, doc_title):
    sleep_interval = 10
    max_waiting_time = sleep_interval * 60
    start = time.perf_counter()
    successful_attempts = 0
    want_successful = 5
    while time.perf_counter() - start < max_waiting_time:
        print('retrieving... ', end='')
        search_results = search_without_filter(query)
        retrieved = retrieved_documents(search_results)
        if doc_title in retrieved:
            print('retrieved')
            successful_attempts += 1
        else:
            print('not retrieved')
            successful_attempts = 0
        if successful_attempts == want_successful:
            break
        time.sleep(sleep_interval)

In [28]:
def confirm_restrictions(query, doc):
    sleep_interval = 10
    max_waiting_time = sleep_interval * 60
    start = time.perf_counter()
    successful_attempts = 0
    want_successful = 5
    while time.perf_counter() - start < max_waiting_time:
        print('checking restrictions... ', end='')
        search_results = search_without_filter(query)
        retrieved_restrictions = (
            x['chunk']['documentMetadata']['structData']['restrictions']
            for x in search_results.get('results', [])
            if x['chunk']['documentMetadata']['title'] == doc.title
        )
        retrieved_restrictions = sorted(next(retrieved_restrictions, []))
        document_restrictions = sorted(
            x.hash if isinstance(x, Restriction) else x
            for x in doc.restrictions
        )
        if retrieved_restrictions == document_restrictions:
            print('matching')
            successful_attempts += 1
        else:
            print('not matching')
            successful_attempts = 0
        if successful_attempts == want_successful:
            break
        time.sleep(sleep_interval)

# Test scenarios

## Scenario 1

In [29]:
Group('CustomerA').restrict_read(Access(scope='HVAC_Documents', specifier='HVAC_Users'))
Group('CustomerA').add_member(User('U1'))

In [30]:
for doc_id in [x['id'] for x in Document.list_all()['documents']]:
    Document.from_id(doc_id).delete()
Group.clear_all()
User.clear_all()
Agent.clear_all()


# Access Control System - Scenario 1

## Scenario Overview

This scenario demonstrates how different users access disjoint sets of documents based on their permissions.

## Setup

- **User U1** is a member of customer organization HVAC_Users
- **User U2** is a member of customer organization C2
- **User U1** has read access to resources in the HVAC_Documents scope with specifier HVAC_Users
- **User U2** has read access to resources belonging to customer organization C2
- **Document D1** belongs to HVAC_Documents with specifier HVAC_Users
- **Document D2** belongs to customer organization C2
- Both documents are relevant for the same question Q1
- **Agent A1** has read access to both organizations' resources

## Flow Process

### Step 1: Environment Configuration
- The system establishes groups, permissions, and document restrictions
- Users are assigned to their respective groups
- Documents are created with appropriate access restrictions
- The agent is given access to both groups

### Step 2: Agent Access Verification
- A direct search by Agent A1 confirms it can access all documents
- This verifies that the agent has proper permissions to both document sets

### Step 3: User U1 Query
- User U1 asks question Q1
- The system calculates the intersection of User U1's permissions and Agent A1's permissions
- Agent A1 searches using this combined permission filter
- Results show that User U1 can access Document D1 (HVAC_Users document)
- Document D2 is filtered out as User U1 lacks access to C2 resources

### Step 4: User U2 Query
- User U2 asks the same question Q1
- The system calculates the intersection of User U2's permissions and Agent A1's permissions
- Agent A1 searches using this combined permission filter
- Results show that User U2 can access Document D2 (C2 document)
- Document D1 is filtered out as User U2 lacks access to HVAC_Users resources

## Key Concepts Demonstrated

- Users can only access documents from their organization
- The same agent can serve multiple users with different access levels
- Document visibility is dynamically determined based on user permissions
- The system enforces proper information boundaries between organizations

In [31]:
def scenario_1(re_app):
    """Users with different permissions accessing disjoint sets of documents"""

    Group('HVAC_Users').restrict_read(Access(scope='HVAC_Documents', specifier='HVAC_Users'))
    Group('HVAC_Users').add_member(User('U1'))
    Group('C2').restrict_read(Access(scope='customer', specifier='C2'))
    Group('C2').add_member(User('U2'))
    documents = [
        Document(
            id='D1',
            restrictions=[Restriction(Access(scope='HVAC_Documents', specifier='HVAC_Users'))],
            metadata={},
            mime='application/pdf',
            uri='gs://data_storage_aiml_hugo/adaptation_of_foundation_models_whitepaper_google_cloud.pdf'
        ),
        Document(
            id='D2',
            restrictions=[Restriction(Access(scope='customer', specifier='C2'))],
            metadata={},
            mime='application/pdf',
            uri='gs://data_storage_aiml_hugo/ai_adoption_framework_whitepaper.pdf'
        )
    ]
    for doc in documents:
        print(doc.create())
        print(doc.wait_till_indexed())
    agent_A1 = 'coordinator'
    Group('HVAC_Users').add_member(Agent(agent_A1))
    Group('C2').add_member(Agent(agent_A1))
    question_Q1 = 'what is koenig and bauer?'
    messages = [
        {'role': 'system', 'content': 'You are a helpful assistant.'},
        {'role': 'user', 'content': question_Q1}
    ]
    show_environment()

    # == step 2
    # Direct search attempt, using the permissions of Agent A1, retrieves all relevant documents for question Q1.
    print('\nstep #2')
    search_filter = get_filter(Agent(agent_A1).read_restrictions)
    assert search_filter
    search_results = search(question_Q1, search_filter)
    retrieved = retrieved_documents(search_results)
    print(retrieved)
    # assert 'document_01' in retrieved
    # assert 'document_02' in retrieved

    # == step 3
    # User U1 asks question Q1.
    # Agent A1 uses the search tool.
    # Document D1 is present in the search results.
    # Document D2 is NOT present in the search results.
    print('\nstep #3')
    user_info = {
        'agent_to_filter': {
            agent_id: get_filter(reconcile_restrictions(User('U1').read_restrictions, agent.read_restrictions))
            for agent_id, agent in Agent.all().items()
        }
    }
    state = re_app.query(input={'messages': messages}, config={'configurable': {'user_info': user_info}})
    print(final_response(state))
    print(state['agent_history'])
    retrieved = retrieved_documents(state)
    print(retrieved)
    assert 'adaptation_of_foundation_models_whitepaper_google_cloud' in retrieved
    assert 'ai_adoption_framework_whitepaper' not in retrieved

    # == step 4
    # User U2 asks question Q1.
    # Agent A1 uses the search tool.
    # Document D2 is present in the search results.
    # Document D1 is NOT present in the search results.
    print('\nstep #4')
    user_info = {
        'agent_to_filter': {
            agent_id: get_filter(reconcile_restrictions(User('U2').read_restrictions, agent.read_restrictions))
            for agent_id, agent in Agent.all().items()
        }
    }
    state = re_app.query(input={'messages': messages}, config={'configurable': {'user_info': user_info}})
    print(final_response(state))
    print(state['agent_history'])
    retrieved = retrieved_documents(state)
    print(retrieved)
    assert 'ai_adoption_framework_whitepaper' in retrieved
    assert 'adaptation_of_foundation_models_whitepaper_google_cloud' not in retrieved

In [33]:
scenario_1(re_agent)

{'name': 'projects/1077649599081/locations/global/collections/default_collection/dataStores/test-subfolder_1697051023694/branches/0/documents/D1', 'id': 'D1', 'schemaId': 'default_schema', 'structData': {'restrictions': ['f9b857ba14374d8c5f7fe05067f6d210']}, 'parentDocumentId': 'D1', 'content': {'mimeType': 'application/pdf', 'uri': 'gs://data_storage_aiml_hugo/adaptation_of_foundation_models_whitepaper_google_cloud.pdf'}}
checking index status...
checking index status...
checking index status...
{'name': 'projects/1077649599081/locations/global/collections/default_collection/dataStores/test-subfolder_1697051023694/branches/0/documents/D1', 'id': 'D1', 'schemaId': 'default_schema', 'structData': {'restrictions': ['f9b857ba14374d8c5f7fe05067f6d210']}, 'parentDocumentId': 'D1', 'content': {'mimeType': 'application/pdf', 'uri': 'gs://data_storage_aiml_hugo/adaptation_of_foundation_models_whitepaper_google_cloud.pdf'}, 'indexTime': '2025-04-07T23:16:07.290079Z', 'indexStatus': {'pendingMes

IndexError: list index out of range

## Scenario 2

In [None]:
def scenario_2(re_app):
    """Agents with different permissions accessing disjoint sets of documents"""

    print('scenario #2')
    # == step 1
    # User U1 is a member of customer organization C1.
    # User U1 has read access to resources belonging to customer organization C1.
    # Document D1 is related to machine type M1.
    # Document D2 is related to machine type M2.
    # Customer organization C1 has read access to resources related to machine type M1.
    # Customer organization C1 has read access to resources related to machine type M2.
    # Agent A1 has read access to resources related to machine type M1.
    # Agent A2 has read access to resources related to machine type M2.
    # Document D1 is relevant for question Q1.
    # Document D2 is relevant for question Q1.
    # Document D1 is relevant for question Q2.
    # Document D2 is relevant for question Q2.
    print('\nstep #1')
    clear_environment()
    show_environment()
    Group('C1').restrict_read(Access(scope='customer', specifier='C1'))
    Group('C1').add_member(User('U1'))
    documents = [
        Document(
            id='D1',
            restrictions=[Restriction(Access(scope='machine_type', specifier='M1'))],
            metadata={},
            mime='text/plain',
            uri='gs://experiments-200232053296/kb_articles/document_01.txt'
        ),
        Document(
            id='D2',
            restrictions=[Restriction(Access(scope='machine_type', specifier='M2'))],
            metadata={},
            mime='text/plain',
            uri='gs://experiments-200232053296/kb_articles/document_02.txt'
        )
    ]
    for doc in documents:
        print(doc.create())
        print(doc.wait_till_indexed())
    Group('M1').restrict_read(Access(scope='machine_type', specifier='M1'))
    Group('M2').restrict_read(Access(scope='machine_type', specifier='M2'))
    Group('M1').add_member(Group('C1'))
    Group('M2').add_member(Group('C1'))
    agent_A1 = 'coordinator'
    agent_A2 = 'past'
    Group('M1').add_member(Agent(agent_A1))
    Group('M2').add_member(Agent(agent_A2))
    question_Q1 = 'what is koenig and bauer?'
    question_Q2 = 'what was koenig and bauer in 19th century?'
    user_info = {
        'agent_to_filter': {
            agent_id: get_filter(reconcile_restrictions(User('U1').read_restrictions, agent.read_restrictions))
            for agent_id, agent in Agent.all().items()
        }
    }
    show_environment()

    # == step 2
    # Direct search attempt, using the permissions of User U1, retrieves all relevant documents for question Q1.
    # Direct search attempt, using the permissions of User U1, retrieves all relevant documents for question Q2.
    print('\nstep #2')
    search_filter = get_filter(User('U1').read_restrictions)
    assert search_filter
    search_results = search(question_Q1, search_filter)
    retrieved = retrieved_documents(search_results)
    print(retrieved)
    assert 'document_01' in retrieved
    assert 'document_02' in retrieved
    search_results = search(question_Q2, search_filter)
    retrieved = retrieved_documents(search_results)
    print(retrieved)
    assert 'document_01' in retrieved
    assert 'document_02' in retrieved

    # == step 3
    # User U1 asks question Q1.
    # Agent A1 uses the search tool.
    # Document D1 is present in the search results.
    # Document D2 is NOT present in the search results.
    print('\nstep #3')
    messages = [
        {'role': 'system', 'content': 'You are a helpful assistant.'},
        {'role': 'user', 'content': question_Q1}
    ]
    state = re_app.query(input={'messages': messages}, config={'configurable': {'user_info': user_info}})
    print(final_response(state))
    print(state['agent_history'])
    retrieved = retrieved_documents(state)
    print(retrieved)
    assert 'document_01' in retrieved
    assert 'document_02' not in retrieved

    # == step 4
    # User U1 asks question Q2.
    # Agent A2 uses the search tool.
    # Document D1 is NOT present in the search results.
    # Document D2 is present in the search results.
    print('\nstep #4')
    messages = [
        {'role': 'system', 'content': 'You are a helpful assistant.'},
        {'role': 'user', 'content': question_Q2}
    ]
    state = re_app.query(input={'messages': messages}, config={'configurable': {'user_info': user_info}})
    print(final_response(state))
    print(state['agent_history'])
    retrieved = retrieved_documents(state)
    print(retrieved)
    assert 'document_01' not in retrieved
    assert 'document_02' in retrieved

In [None]:
scenario_2(remote_app)

scenario #2

step #1
environment:
	 {}
	 {}
	 {}
	 {}
{'name': 'projects/200232053296/locations/eu/collections/default_collection/dataStores/kb_demo_1731593512611/branches/0/documents/D1', 'id': 'D1', 'schemaId': 'default_schema', 'structData': {'restrictions': ['93d144ae9dd8e66e079dbe67c0c58c58']}, 'parentDocumentId': 'D1', 'content': {'mimeType': 'text/plain', 'uri': 'gs://experiments-200232053296/kb_articles/document_01.txt'}}
checking index status...
checking index status...
checking index status...
checking index status...
checking index status...
checking index status...
checking index status...
{'name': 'projects/200232053296/locations/eu/collections/default_collection/dataStores/kb_demo_1731593512611/branches/0/documents/D1', 'id': 'D1', 'schemaId': 'default_schema', 'structData': {'restrictions': ['93d144ae9dd8e66e079dbe67c0c58c58']}, 'parentDocumentId': 'D1', 'content': {'mimeType': 'text/plain', 'uri': 'gs://experiments-200232053296/kb_articles/document_01.txt'}, 'indexTime'

## Scenario 3

In [None]:
def scenario_3(re_app):
    """Changing permissions for a user reflected in search results"""

    print('scenario #3')
    # == step 1
    # User U1 is a member of customer organization C1.
    # User U1 has read access to resources belonging to customer organization C1.
    # Document D1 belongs to customer organization C1.
    # Document D2 is related to machine type M1.
    # Subscription group S1 has read access to resources related to machine type M1.
    # Document D1 is relevant for question Q1.
    # Document D2 is relevant for question Q1.
    # Agent A1 has read access to resources belonging to customer organization C1.
    # Agent A1 has read access to resources related to machine type M1.
    print('\nstep #1')
    # clear_environment()
    show_environment()
    Group('C1').restrict_read(Access(scope='customer', specifier='C1'))
    Group('C1').add_member(User('U1'))
    documents = [
        Document(
            id='D1',
            restrictions=[Restriction(Access(scope='customer', specifier='C1'))],
            metadata={},
            mime='text/plain',
            uri='gs://experiments-200232053296/kb_articles/document_01.txt'
        ),
        Document(
            id='D2',
            restrictions=[Restriction(Access(scope='machine_type', specifier='M1'))],
            metadata={},
            mime='text/plain',
            uri='gs://experiments-200232053296/kb_articles/document_02.txt'
        )
    ]
    for doc in documents:
        print(doc.create())
        print(doc.wait_till_indexed())
    Group('M1').restrict_read(Access(scope='machine_type', specifier='M1'))
    Group('M1').add_member(Group('S1'))
    agent_A1 = 'future'
    Group('C1').add_member(Agent(agent_A1))
    Group('M1').add_member(Agent(agent_A1))
    question_Q1 = 'what will be koenig and bauer in the future?'
    messages = [
        {'role': 'system', 'content': 'You are a helpful assistant.'},
        {'role': 'user', 'content': question_Q1}
    ]
    show_environment()

    # == step 2
    # Direct search attempt, using the permissions of Agent A1, retrieves all relevant documents for question Q1.
    print('\nstep #2')
    search_filter = get_filter(Agent(agent_A1).read_restrictions)
    assert search_filter
    search_results = search(question_Q1, search_filter)
    retrieved = retrieved_documents(search_results)
    print(retrieved)
    assert 'document_01' in retrieved
    assert 'document_02' in retrieved

    # == step 3
    # User U1 asks question Q1.
    # Agent A1 uses the search tool.
    # Document D1 is present in the search results.
    # Document D2 is NOT present in the search results.
    print('\nstep #3')
    user_info = {
        'agent_to_filter': {
            agent_id: get_filter(reconcile_restrictions(User('U1').read_restrictions, agent.read_restrictions))
            for agent_id, agent in Agent.all().items()
        }
    }
    state = re_app.query(input={'messages': messages}, config={'configurable': {'user_info': user_info}})
    print(final_response(state))
    print(state['agent_history'])
    retrieved = retrieved_documents(state)
    print(retrieved)
    assert 'document_01' in retrieved
    assert 'document_02' not in retrieved

    # == step 4
    # Customer organization C1 becomes a member of subscription group S1.
    # User U1 asks question Q1.
    # Agent A1 uses the search tool.
    # Document D1 is present in the search results.
    # Document D2 is present in the search results.
    print('\nstep #4')
    Group('S1').add_member(Group('C1'))
    user_info = {
        'agent_to_filter': {
            agent_id: get_filter(reconcile_restrictions(User('U1').read_restrictions, agent.read_restrictions))
            for agent_id, agent in Agent.all().items()
        }
    }
    state = re_app.query(input={'messages': messages}, config={'configurable': {'user_info': user_info}})
    print(final_response(state))
    print(state['agent_history'])
    retrieved = retrieved_documents(state)
    print(retrieved)
    assert 'document_01' in retrieved
    assert 'document_02' in retrieved

In [None]:
scenario_3(remote_app)

scenario #3

step #1
environment:
	 {}
	 {}
	 {}
	 {}
{'name': 'projects/200232053296/locations/eu/collections/default_collection/dataStores/kb_demo_1731593512611/branches/0/documents/D1', 'id': 'D1', 'schemaId': 'default_schema', 'structData': {'restrictions': ['d6000c6f6bbbbd310c98dd5484524c59']}, 'parentDocumentId': 'D1', 'content': {'mimeType': 'text/plain', 'uri': 'gs://experiments-200232053296/kb_articles/document_01.txt'}}
checking index status...
checking index status...
checking index status...
checking index status...
{'name': 'projects/200232053296/locations/eu/collections/default_collection/dataStores/kb_demo_1731593512611/branches/0/documents/D1', 'id': 'D1', 'schemaId': 'default_schema', 'structData': {'restrictions': ['d6000c6f6bbbbd310c98dd5484524c59']}, 'parentDocumentId': 'D1', 'content': {'mimeType': 'text/plain', 'uri': 'gs://experiments-200232053296/kb_articles/document_01.txt'}, 'indexTime': '2024-11-19T11:42:00.795007Z'}
{'name': 'projects/200232053296/locations/

## Scenario 4

In [None]:
Fdef scenario_4(re_app):
    """Uploading new documents with a selected scope and retrieving them afterwards"""

    print('scenario #4')
    # == step 1
    # User U1 is a member of customer organization C1.
    # User U2 is a member of customer organization C1.
    # User U1 has read access to resources belonging to customer orFganization C1.
    # User U2 has read access to resources belonging to customer organization C1.
    # Document D1 belongs to customer organization C1.
    # Document D1 is relevant for question Q1.
    # Agent A1 has read access to resources belonging to customer organization C1.
    # Agent A1 has read access to resources belonging to all users.
    # User U1 has write access to resources belonging to customer organization C1.
    print('\nstep #1')
    clear_environment()
    show_environment()
    Group('C1_readers').restrict_read(Access(scope='customer', specifier='C1'))
    Group('C1_readers').add_member(User('U1'))
    Group('C1_readers').add_member(User('U2'))
    document = Document(
        id='D1',
        restrictions=[Restriction(Access(scope='customer', specifier='C1'))],
        metadata={},
        mime='text/plain',
        uri='gs://experiments-200232053296/kb_articles/document_01.txt'
    )
    print(document.create())
    print(document.wait_till_indexed())
    agent_A1 = 'past'
    Group('C1_readers').add_member(Agent(agent_A1))
    Group('any_user_readers').restrict_read(Access(scope='user', specifier='*'))
    Group('any_user_readers').add_member(Agent(agent_A1))
    Group('C1_writers').restrict_write(Access(scope='customer', specifier='C1'))
    Group('C1_writers').add_member(User('U1'))
    question_Q1 = 'what was koenig and bauer in 19th century?'
    messages = [
        {'role': 'system', 'content': 'You are a helpful assistant.'},
        {'role': 'user', 'content': question_Q1}
    ]
    show_environment()

    # == step 2
    # User U1 asks question Q1.
    # Agent A1 uses the search tool.
    # Document D1 is present in the search results.
    print('\nstep #2')
    user_info_u1 = {
        'agent_to_filter': {
            agent_id: get_filter(reconcile_restrictions(User('U1').read_restrictions, agent.read_restrictions))
            for agent_id, agent in Agent.all().items()
        }
    }
    state = re_app.query(input={'messages': messages}, config={'configurable': {'user_info': user_info_u1}})
    print(final_response(state))
    print(state['agent_history'])
    retrieved = retrieved_documents(state)
    print(retrieved)
    assert 'document_01' in retrieved

    # == step 3
    # User U2 asks question Q1.
    # Agent A1 uses the search tool.
    # Document D1 is present in the search results.
    print('\nstep #3')
    user_info_u2 = {
        'agent_to_filter': {
            agent_id: get_filter(reconcile_restrictions(User('U2').read_restrictions, agent.read_restrictions))
            for agent_id, agent in Agent.all().items()
        }
    }
    state = re_app.query(input={'messages': messages}, config={'configurable': {'user_info': user_info_u2}})
    print(final_response(state))
    print(state['agent_history'])
    retrieved = retrieved_documents(state)
    print(retrieved)
    assert 'document_01' in retrieved

    # == step 4
    # User U1 begins a document upload process.
    # User U1 is prompted to choose document access scope.
    # User U1 access scope is present in the available options.
    # Customer organization C1 access scope is present in the available options.
    # User U1 selects customer organization C1 access scope.
    # User U1 uploads document D2.
    # Document D2 is relevant for question Q1.
    print('\nstep #4')
    options = upload_choices(User('U1'))
    print(options)
    assert 'user_U1' in options
    assert 'customer_C1' in options
    selection = 'customer_C1'
    restriction = options.get(selection)
    document = Document(
        id='D2',
        restrictions=[restriction],
        metadata={},
        mime='text/plain',
        uri='gs://experiments-200232053296/kb_articles/document_02.txt'
    )
    print(document.create())
    print(document.wait_till_indexed())
    confirm_retrievable(question_Q1, 'document_02')

    # == step 5
    # User U1 asks question Q1.
    # Agent A1 uses the search tool.
    # Document D1 is present in the search results.
    # Document D2 is present in the search results.
    print('\nstep #5')
    state = re_app.query(input={'messages': messages}, config={'configurable': {'user_info': user_info_u1}})
    print(final_response(state))
    print(state['agent_history'])
    retrieved = retrieved_documents(state)
    print(retrieved)
    assert 'document_01' in retrieved
    assert 'document_02' in retrieved

    # == step 6
    # User U2 asks question Q1.
    # Agent A1 uses the search tool.
    # Document D1 is present in the search results.
    # Document D2 is present in the search results.
    print('\nstep #6')
    state = re_app.query(input={'messages': messages}, config={'configurable': {'user_info': user_info_u2}})
    print(final_response(state))
    print(state['agent_history'])
    retrieved = retrieved_documents(state)
    print(retrieved)
    assert 'document_01' in retrieved
    assert 'document_02' in retrieved

    # == step 7
    # User U2 begins a document upload process.
    # User U2 is prompted to choose document access scope.
    # User U1 access scope is present in the available options.
    # Customer organization C1 access scope is NOT present in the available options.
    # User U2 selects User U2 access scope.
    # User U2 uploads document D3.
    # Document D3 is relevant for question Q1.
    print('\nstep #7')
    options = upload_choices(User('U2'))
    print(options)
    assert 'user_U2' in options
    assert 'customer_C1' not in options
    selection = 'user_U2'
    restriction = options.get(selection)
    document = Document(
        id='D3',
        restrictions=[restriction],
        metadata={},
        mime='text/plain',
        uri='gs://experiments-200232053296/kb_articles/document_03.txt'
    )
    print(document.create())
    print(document.wait_till_indexed())
    confirm_retrievable(question_Q1, 'document_03')

    # == step 8
    # User U1 asks question Q1.
    # Agent A1 uses the search tool.
    # Document D1 is present in the search results.
    # Document D2 is present in the search results.
    # Document D3 is NOT present in the search results.
    print('\nstep #8')
    state = re_app.query(input={'messages': messages}, config={'configurable': {'user_info': user_info_u1}})
    print(final_response(state))
    print(state['agent_history'])
    retrieved = retrieved_documents(state)
    print(retrieved)
    assert 'document_01' in retrieved
    assert 'document_02' in retrieved
    assert 'document_03' not in retrieved

    # == step 9
    # User U2 asks question Q1.
    # Agent A1 uses the search tool.
    # Document D1 is present in the search results.
    # Document D2 is present in the search results.
    # Document D3 is present in the search results.
    print('\nstep #9')
    state = re_app.query(input={'messages': messages}, config={'configurable': {'user_info': user_info_u2}})
    print(final_response(state))
    print(state['agent_history'])
    retrieved = retrieved_documents(state)
    print(retrieved)
    assert 'document_01' in retrieved
    assert 'document_02' in retrieved
    assert 'document_03' in retrieved

In [None]:
scenario_4(remote_app)

scenario #4

step #1
environment:
	 {}
	 {}
	 {}
	 {}
{'name': 'projects/200232053296/locations/eu/collections/default_collection/dataStores/kb_demo_1731593512611/branches/0/documents/D1', 'id': 'D1', 'schemaId': 'default_schema', 'structData': {'restrictions': ['d6000c6f6bbbbd310c98dd5484524c59']}, 'parentDocumentId': 'D1', 'content': {'mimeType': 'text/plain', 'uri': 'gs://experiments-200232053296/kb_articles/document_01.txt'}}
checking index status...
checking index status...
checking index status...
checking index status...
{'name': 'projects/200232053296/locations/eu/collections/default_collection/dataStores/kb_demo_1731593512611/branches/0/documents/D1', 'id': 'D1', 'schemaId': 'default_schema', 'structData': {'restrictions': ['d6000c6f6bbbbd310c98dd5484524c59']}, 'parentDocumentId': 'D1', 'content': {'mimeType': 'text/plain', 'uri': 'gs://experiments-200232053296/kb_articles/document_01.txt'}, 'indexTime': '2024-11-19T11:46:00.078335Z'}
environment:
	 {'documents': [{'name': 'pr

## Scenario 5

In [None]:
def scenario_5(re_app):
    """Changing permissions for a document reflected in search results"""

    print('scenario #5')
    # == step 1
    # User U1 is a member of customer organization C1.
    # Document D1 is related to machine type M1.
    # Document D2 is related to machine type M2.
    # Subscription group S1 has read access to resources related to machine type M1.
    # Customer organization C1 is a member of subscription group S1.
    # Document D1 is relevant for question Q1.
    # Document D2 is relevant for question Q1.
    # Agent A1 has read access to resources related to machine type M1.
    # Agent A1 has read access to resources related to machine type M2.
    # Internal users have write access to resources related to machine type M1.
    # Internal users have write access to resources related to machine type M2.
    # User U2 is an internal user.
    print('\nstep #1')
    clear_environment()
    show_environment()
    Group('C1').add_member(User('U1'))
    documents = [
        Document(
            id='D1',
            restrictions=[Restriction(Access(scope='machine_type', specifier='M1'))],
            metadata={},
            mime='text/plain',
            uri='gs://experiments-200232053296/kb_articles/document_01.txt'
        ),
        Document(
            id='D2',
            restrictions=[Restriction(Access(scope='machine_type', specifier='M2'))],
            metadata={},
            mime='text/plain',
            uri='gs://experiments-200232053296/kb_articles/document_02.txt'
        )
    ]
    for doc in documents:
        print(doc.create())
        print(doc.wait_till_indexed())
    question_Q1 = 'what is koenig and bauer?'
    confirm_retrievable(question_Q1, 'document_01')
    confirm_retrievable(question_Q1, 'document_02')
    agent_A1 = 'coordinator'
    Group('M1_readers').restrict_read(Access(scope='machine_type', specifier='M1'))
    Group('M2_readers').restrict_read(Access(scope='machine_type', specifier='M2'))
    Group('M1_readers').add_member(Group('S1'))
    Group('S1').add_member(Group('C1'))
    Group('M1_readers').add_member(Agent(agent_A1))
    Group('M2_readers').add_member(Agent(agent_A1))
    Group('M1_writers').restrict_write(Access(scope='machine_type', specifier='M1'))
    Group('M2_writers').restrict_write(Access(scope='machine_type', specifier='M2'))
    Group('M1_writers').add_member(Group('internal'))
    Group('M2_writers').add_member(Group('internal'))
    Group('internal').add_member(User('U2'))
    messages = [
        {'role': 'system', 'content': 'You are a helpful assistant.'},
        {'role': 'user', 'content': question_Q1}
    ]
    user_info_u1 = {
        'agent_to_filter': {
            agent_id: get_filter(reconcile_restrictions(User('U1').read_restrictions, agent.read_restrictions))
            for agent_id, agent in Agent.all().items()
        }
    }
    show_environment()

    # == step 2
    # Direct search attempt, using the permissions of Agent A1, retrieves all relevant documents for question Q1.
    print('\nstep #2')
    search_filter = get_filter(Agent(agent_A1).read_restrictions)
    assert search_filter
    search_results = search(question_Q1, search_filter)
    retrieved = retrieved_documents(search_results)
    print(retrieved)
    assert 'document_01' in retrieved
    assert 'document_02' in retrieved

    # == step 3
    # User U1 asks question Q1.
    # Agent A1 uses the search tool.
    # Document D1 is present in the search results.
    # Document D2 is NOT present in the search results.
    print('\nstep #3')
    state = re_app.query(input={'messages': messages}, config={'configurable': {'user_info': user_info_u1}})
    print(final_response(state))
    print(state['agent_history'])
    retrieved = retrieved_documents(state)
    print(retrieved)
    assert 'document_01' in retrieved
    assert 'document_02' not in retrieved

    # == step 4
    # User U2 begins a process of changing access scope of Document D2.
    # User U2 has write access matching the current access scope of Document D2.
    # User U2 is prompted to choose target document access scope.
    # Machine type M1 access scope is present in the available options.
    # User U2 selects Machine type M1 access scope.
    # User U2 changes access scope of Document D2.
    print('\nstep #4')
    document = Document.from_id('D2')
    assert any(
        restriction.hash in document.restrictions
        for restriction in User('U2').write_restrictions
    )
    options = upload_choices(User('U2'))
    print(options)
    assert 'machine_type_M1' in options
    selection = 'machine_type_M1'
    restriction = options.get(selection)
    document.restrictions = [restriction]
    document.update()
    print(document.wait_till_indexed())
    confirm_retrievable(question_Q1, 'document_02')
    confirm_restrictions(question_Q1, document)

    # == step 5
    # User U1 asks question Q1.
    # Agent A1 uses the search tool.
    # Document D1 is present in the search results.
    # Document D2 is present in the search results.
    print('\nstep #5')
    state = re_app.query(input={'messages': messages}, config={'configurable': {'user_info': user_info_u1}})
    print(final_response(state))
    print(state['agent_history'])
    retrieved = retrieved_documents(state)
    print(retrieved)
    assert 'document_01' in retrieved
    assert 'document_02' in retrieved

    # == step 6
    # User U2 begins a process of changing access scope of Document D1.
    # User U2 has write access matching the current access scope of Document D1.
    # User U2 is prompted to choose target document access scope.
    # Machine type M2 access scope is present in the available options.
    # User U2 selects Machine type M2 access scope.
    # User U2 changes access scope of Document D1.
    print('\nstep #6')
    document = Document.from_id('D1')
    assert any(
        restriction.hash in document.restrictions
        for restriction in User('U2').write_restrictions
    )
    options = upload_choices(User('U2'))
    print(options)
    assert 'machine_type_M2' in options
    selection = 'machine_type_M2'
    restriction = options.get(selection)
    document.restrictions = [restriction]
    document.update()
    print(document.wait_till_indexed())
    confirm_retrievable(question_Q1, 'document_01')
    confirm_restrictions(question_Q1, document)

    # == step 7
    # User U1 asks question Q1.
    # Agent A1 uses the search tool.
    # Document D1 is NOT present in the search results.
    # Document D2 is present in the search results.
    print('\nstep #7')
    state = re_app.query(input={'messages': messages}, config={'configurable': {'user_info': user_info_u1}})
    print(final_response(state))
    print(state['agent_history'])
    retrieved = retrieved_documents(state)
    print(retrieved)
    assert 'document_01' not in retrieved
    assert 'document_02' in retrieved

In [None]:
scenario_5(remote_app)

scenario #5

step #1
environment:
	 {}
	 {}
	 {}
	 {}
{'name': 'projects/200232053296/locations/eu/collections/default_collection/dataStores/kb_demo_1731593512611/branches/0/documents/D1', 'id': 'D1', 'schemaId': 'default_schema', 'structData': {'restrictions': ['93d144ae9dd8e66e079dbe67c0c58c58']}, 'parentDocumentId': 'D1', 'content': {'mimeType': 'text/plain', 'uri': 'gs://experiments-200232053296/kb_articles/document_01.txt'}}
checking index status...
checking index status...
checking index status...
checking index status...
{'name': 'projects/200232053296/locations/eu/collections/default_collection/dataStores/kb_demo_1731593512611/branches/0/documents/D1', 'id': 'D1', 'schemaId': 'default_schema', 'structData': {'restrictions': ['93d144ae9dd8e66e079dbe67c0c58c58']}, 'parentDocumentId': 'D1', 'content': {'mimeType': 'text/plain', 'uri': 'gs://experiments-200232053296/kb_articles/document_01.txt'}, 'indexTime': '2024-11-19T11:49:30.516943Z'}
{'name': 'projects/200232053296/locations/

# Backend logic example

## Possible backend functions

In [None]:
def get_agent_filters(user):
    return {
        agent_id: get_filter(reconcile_restrictions(user.read_restrictions, agent.read_restrictions))
        for agent_id, agent in Agent.all().items()
    }

In [None]:
def get_write_restrictions(user):
    return list(upload_choices(user))  # a list of strings

In [None]:
def validate_write_restriction(user, restriction):
    options = upload_choices(user)
    if restriction not in options:
        raise Exception('the user cannot use this write restriction')
    else:
        return options[restriction]

In [None]:
def upload_document(doc_id, restriction, metadata, mime, gcs_uri):
    document = Document(
        id=doc_id,
        restrictions=[restriction],
        metadata=metadata,
        mime=mime,
        uri=gcs_uri
    )
    document.create()
    document.wait_till_indexed()
    # confirm_retrievable?

In [None]:
def can_update_document(user, document):
    return any(
        restriction.hash in document.restrictions
        for restriction in user.write_restrictions
    )

In [None]:
def update_document_restrictions(document, restricions):
    document.restrictions = restricions
    document.update()
    print(document.wait_till_indexed())
    # confirm_retrievable and/or confirm_restrictions?

## DB design matching the class design

## Alternative DB design (without groups)