# LLM For the Generation of Rego Policies starting from natural language queries

# K= 10 Retrieved Policies, Configuration A-DP, random_state=42.

This configuration was created using a random number to distribute the policy dataset between the knowledge base and the test setting, using the number 42 (Configuration A).

The number of retrieved policies provided to the LLM is 10.

All the data product on which the policy applied are in the KB.


###The following code can be divided into three main parts:

####1. Knowledge base generation and vector database setup

####2. Policy retrieval and LLM-based Rego code generation

####3. Validation against the gold standard

# 1. Knowledge base generation and vector database setup

Note: in the case of k = 0, historical policies will not be used.




---

## Structure of the Knowledge Base

A knowledge base must be set up to provide the LLM with the full context of the systems.

The structure of the knowledge base is shown below as a Python dictionary.

### Knowledge base structure
```

knowledge_base = {
    "actions" : [],
    "user_specified_functions" : [
        {
            "description" : "",
            "definition" : ""
        },
        {
            "description" : "",
            "definition" : ""
        }
    ],
    "roles" : [],
    "policies" : [
        {
            "nlp" : "",
            "rego" : "",
        },
        {
            "nlp" : "",
            "rego" : "",
        }
    ],
    "user_specified_actions" : [],
    "specified_actions" : [
        {
            "decription" : "",
            "definition" : ""
        },
        {
            "decription" : "",
            "definition" : ""
        }
    ],
    "prefixies" : {
        "describe" : "",
        "prefix" : []
    }

}

```

### Test Queries
The policies used to test the generation are saved in a separate structure, where each policy includes a natural language version and its corresponding Rego version.


```

test_query = [
    {
        "npl" : "",
        "rego" : ""
    },
    {
        "npl" : "",
        "rego" : ""
    }
]

```

### Validation Queries
Finally, a dedicated structure stores each test policy in natural language, its generated Rego counterpart, and the corresponding gold standard reference.

```

evaluation_results = [
    {
        "query_nlp": "...",
        "generated_rego": "...",
        "expected_rego": "..."
    },
    ...
]
```
---

####1.1 Vector Database

Data retrieval is performed using a vector database hosted on Pinecone.

As a first step, we save the access key needed to retrieve the data.

In [1]:
# Get APIs Key

PINECONE = userdata.get('MY_PINECONE_KEY')

NameError: name 'userdata' is not defined

# Extracting the policies and creating the knowledge base

Note: in the case of k = 0, historical policies will not be used.


#####1.2 Function to parse the elements of the template using a LLM

In [2]:
#Function to parse the element of the template
def blobParser(text):

  # Prompt Writing
  prompt = (
      f"""{text}"""
  )

  # Send request
  response = openai.chat.completions.create(
      model="gpt-4o-mini",
      messages=[
          {"role": "system", "content": """You are a parser specialized in Open Policy Agent (Rego) policy files and you are given a file written in the Rego policy language.

              Your task is to parse it and extract structured information into a JSON file with the exact format below (do not change the structure):

              {
                  "actions": [],
                  "user_specified_functions": [
                      {
                          "description": "",
                          "definition": ""
                      }
                  ],
                  "roles": [],
                  "policies": [
                      {
                          "nlp": "",
                          "rego": ""
                      }
                  ],
                  "user_specified_actions": [],
                  "specified_actions": [
                      {
                          "decription": "",
                          "definition": ""
                      }
                  ],
                  "prefixies": {
                      "describe": "",
                      "prefix": []
                  }
              }

              Instructions:

              1. **actions**: Populate with basic actions defined in the format `name := "action"` under the `# Actions` comment (e.g., `read`, `write`, etc.).

              2. **user_specified_actions**: Include only those actions specifically marked in the comments as user-defined (e.g., under `# User specific actions` like `write_role`, `change_password`).

              3. **specified_actions**: For any action (especially prefixed or specialized ones), include:
                - `"decription"`: a comment directly describing that action.
                - `"definition"`: the full line that defines the action (e.g., `selective_read := "selective_read"`), using `\n` for line breaks and `\t` for tabs.

              4. **roles**: Include the roles listed under `# Roles`, such as `admin`, `observer`, etc.

              5. **user_specified_functions**: Include all Rego functions (like `team_role`, `allowed_list_roles`, etc.) with:
                - `"description"`: the full comment block directly above the function.
                - `"definition"`: the full function code (including conditions), using `\n` for newlines and `\t` for tabs.

              6. **policies**: Each `allow { ... }` block becomes a separate policy:
                - `"nlp"`: use the comment directly above the block, if any. DO NOT invent anything.
                - `"rego"`: the full Rego block content, formatted with `\n` for line breaks and `\t` for tabs.

              7. **prefixies**: If any prefixed actions or conditional logic for them appear (e.g., `selective_read`, `selective_list`), then:
                - `"describe"`: use the comment that explains these prefixes.
                - `"prefix"`: include all such prefix terms.

              STRICT RULES:
              - DO NOT invent any data or add descriptions that aren't explicitly in the file.
              - Preserve formatting: every line break must be `\n`, every indentation must be `\t`.
              - Only use what exists in the file. If any field has no information, leave it as an empty string or list.

              Your output should be the completed JSON file, and nothing else."""},

          {"role": "user",
           "content": prompt}
      ]
  )
  answer = response.choices[0].message.content
  return answer




##### 1.3 Function to parse the policies, each one in natural language and rego

In [3]:

def onlyPoliciesParser(policies):

  # Prompt Writing
  prompt = (
      f"""{policies}"""
  )

  # Send request
  response = openai.chat.completions.create(
      model="gpt-4o-mini",
      messages=[
          {"role": "system", "content": """You are a parser specialized in Open Policy Agent (Rego) policy files and You are given a Rego policy file.

Your task is to extract all `allow { ... }` policy blocks and return them in the following JSON format:

{
  "policies": [
    {
      "nlp": "",
      "rego": ""
    }
  ]
}

Instructions:

1. Each `allow { ... }` block must be included as a separate item in the `"policies"` list.

2. For each policy:
   - "nlp": Capture the single-line or multi-line comment directly above the `allow {` block **only if it is a descriptive comment**, such as those starting with `#`, describing the policy logic in plain language.
     - Ignore high-level section headers or delineators (such as lines with `##`, `###`, or blocks like `## # Queries ##`).
     - If no valid comment is found above the block, leave `"nlp"` as an empty string.
   - "rego": Include the full Rego code of the policy block (including `allow {` and its closing brace `}`), encoded as a string with:
     - `\\n ` (two backslashes + 'n' + a space) for each newline
     - `\\t ` (two backslashes + 't' + a space) for each tab
   - Every line break converted to `\\n ` (newline followed by space)
   - Every indentation converted to `\\t ` (tab followed by space)

3. Preserve the original formatting exactly:
   - Convert each newline in the code to a `\n`
   - Convert each indentation to `\t` (a tab character)
   - Do not change variable names or structure

4. Do not include any policies that are not part of an `allow { ... }` block.

5. Output only the JSON structure — no explanations, no extra text.

IMPORTANT:
- Do NOT invent or modify content.
- Preserve the Rego block exactly as it is, but encode line breaks and tabs correctly.
- If there is no comment above a policy, set `"nlp"` to an empty string.

The output should be valid JSON, containing the list of extracted policies.



"""},

          {"role": "user",
           "content": prompt}
      ]
  )
  answer = response.choices[0].message.content
  return answer

Get the Rego file to extract the template and policies from

Extract the file as raw text

In [4]:
# Download the GitHub file

import requests

link = "https://raw.githubusercontent.com/fleetdm/fleet/refs/heads/main/server/authz/policy.rego"
r = requests.get(link)
blob = r.text

Create 3 chunks, splitting the text for the parsing:

- line ~0-350 -> 1°Chunk

- the first "##Queries##" line ~350-700 -> 2°Chunk

- the second "##Queries##" line ~700-1050 -> 3°Chunk

In [5]:
# Create 3 chunks of ~ 350 lines

blob = blob.replace('##\n# Software\n##', '##\n# Queries\n##')
chunks = blob.split('##\n# Queries\n##')

The first chunk is parsed using the LLM, via a connection to OpenAI, to extract the template anche the first set of policies.

In [6]:
# Connect to OpenAI
import openai
from google.colab import userdata


openai.api_key = userdata.get('OPENAI_API_KEY')

# Parse first chunk
generated_text = blobParser(chunks[0])

NotebookAccessError: Notebook does not have access to secret OPENAI_API_KEY

Printing output for debugging purposes

In [None]:
print(generated_text)

{
    "actions": [
        "read",
        "list",
        "write",
        "write_host_label",
        "cancel_host_activity",
        "run",
        "run_new",
        "selective_read",
        "selective_list"
    ],
    "user_specified_functions": [
        {
            "description": "# team_role gets the role that the subject has for the team, returning undefined\n# if the user has no explicit role for that team.",
            "definition": "team_role(subject, team_id) = role {\n\tsubject_team := subject.teams[_]\n\tsubject_team.id == team_id\n\trole := subject_team.role\n}"
        },
        {
            "description": "# allowed_read_roles evaulates which roles are allowed for read based on the given action.",
            "definition": "allowed_read_roles(action, base_roles, extra_roles) = result {\n\taction == selective_read\n\tresult := base_roles | extra_roles\n} else = result {\n\taction == read\n\tresult := base_roles\n} else = result {\n\tresult := null\n}"
        },


Parse second and third chunks to extract the policies only

In [None]:
# Parse second and third chunks
chunk_1 = onlyPoliciesParser(chunks[1])
chunk_2 = onlyPoliciesParser(chunks[2])

In [None]:
print(chunk_1)

{
  "policies": [
    {
      "nlp": "Global admins, maintainers and gitops can write queries.",
      "rego": "allow {\n\t object.type == \"query\"\n\t subject.global_role == [admin, maintainer, gitops][_]\n\t action == write\n}"
    },
    {
      "nlp": "Global admins, maintainers, gitops, observer_plus and observers can read queries.",
      "rego": "allow {\n\t object.type == \"query\"\n\t subject.global_role == [admin, maintainer, gitops, observer_plus, observer][_]\n\t action == read\n}"
    },
    {
      "nlp": "Team admin, maintainers and gitops can write queries for their teams.",
      "rego": "allow {\n\t object.type == \"query\"\n\t not is_null(object.team_id)\n\t team_role(subject, object.team_id) == [admin, maintainer, gitops][_]\n\t action == write\n}"
    },
    {
      "nlp": "Team admins, maintainers, gitops, observer_plus and observers can read queries for their teams.",
      "rego": "allow {\n\t object.type == \"query\"\n\t not is_null(object.team_id)\n\t team_ro

In [None]:
print(chunk_2)

{
  "policies": [
    {
      "nlp": "Global admins, maintainers, observers and observer_plus can read all software.",
      "rego": "allow {\n\t object.type == \"software_inventory\"\n\t subject.global_role == [admin, maintainer, observer, observer_plus][_]\n\t action == read\n}"
    },
    {
      "nlp": "Only global admins can modify software inventory (specifically software title names)",
      "rego": "allow {\n\t object.type == \"software_inventory\"\n\t subject.global_role == admin\n\t action == write\n}"
    },
    {
      "nlp": "Team admins, maintainers, observers and observer_plus can read all software in their teams.",
      "rego": "allow {\n\t not is_null(object.team_id)\n\t object.type == \"software_inventory\"\n\t team_role(subject, object.team_id) == [admin, maintainer, observer, observer_plus][_]\n\t action == read\n}"
    },
    {
      "nlp": "Global admins and maintainers can read all maintained apps.",
      "rego": "allow {\n\t object.type == \"maintained_app\"\n

Removing useless characters

In [None]:
def clean_output(text):
  # Remove useless characters added by GPT
  text = text.replace("```json\n", "").replace("```", "")
  return text

In [None]:
import json

# Clean the outputs
chunk_0 = json.loads(clean_output(generated_text))
chunk_1 = json.loads(clean_output(chunk_1))
chunk_2 = json.loads(clean_output(chunk_2))

Finally, the policies are merged into a single variable called generated_text.

Since generated_text is a Python dictionary, all policies from the chunks are concatenated into the list stored under the 'policies' key.

In [None]:
# Merge the policies in a single variable
generated_text = chunk_0
generated_text['policies'] += chunk_1['policies']
generated_text['policies'] += chunk_2['policies']

The first policy of the dictonary is indeed the first policy of the rego file

In [None]:
generated_text['policies'][0]

{'nlp': '# Global admin, gitops, maintainer, observer_plus and observer can read global config.',
 'rego': 'allow {\n\tobject.type == "app_config"\n\tsubject.global_role == [admin, gitops, maintainer, observer_plus, observer][_]\n\taction == read\n}'}

The last policy of the dictonary is indeed the first policy of the rego file --> **!!114 policies were successfully extracted!!**

In [None]:
generated_text['policies'][113]

{'nlp': 'Global admins can connect enterprise.',
 'rego': 'allow {\n\t object.type == "android_enterprise"\n\t subject.global_role == admin\n\t action == [read, write][_]\n}'}

Divide 80% of the policies into training data and reserve 20% for testing. The random_state can be modified to obtain different test sets.

In [None]:
from sklearn.model_selection import train_test_split

# Split training data
train_data, test_query = train_test_split(generated_text['policies'], test_size=0.2, random_state=42)

Create a copy of generated_text to set up the knowledge base with only 80% of the policies.

generated_text remains unchanged.

In [None]:
# Create the Knowledge Base
import copy

knowledge_base = copy.deepcopy(generated_text)
knowledge_base['policies'] = train_data

# Create the Vector Database

Do not rerun!

In [1]:
!pip install pinecone



Import the SentenceTransformer model for generating embeddings

In [2]:
from sentence_transformers import SentenceTransformer

model = SentenceTransformer('sentence-transformers/all-mpnet-base-v2')

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/571 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/438M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/363 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Connecting to Pinecone and creating the index for the vector database using cosine similarity for retrieval.

In [3]:
from google.colab import userdata
from pinecone import ServerlessSpec
from pinecone.grpc import PineconeGRPC as Pinecone

pc = Pinecone(api_key= userdata.get('PINECONE'))

index_name = "policies-a"

# Create the index, if not already present
if index_name not in pc.list_indexes().names():
  pc.create_index(
    name=index_name,
    dimension=model.get_sentence_embedding_dimension(),
    metric="cosine",
    spec=ServerlessSpec(
      cloud="aws",
      region="us-east-1"
    )
  )

In [4]:
#Connect to the Index
index = pc.Index(index_name)

Since the vector database is empty, it must be populated with the policies. For retrieval, only the natural language part of each policy is used and embedded.

In [5]:
# Encode the natural language policies and insert them into the vector database

vectors = []

for i, policy in enumerate(knowledge_base["policies"]):
  embedding = model.encode(policy["nlp"])
  vectors.append((f"""id-{i}""", embedding, {"type":"nlp"}))

index.upsert(vectors=vectors)


NameError: name 'knowledge_base' is not defined

Test the embedding and retrieval using the first policy.

In [7]:
# Test Query
embedding_query = model.encode(knowledge_base["policies"][0]["nlp"])

results = index.query(vector=embedding_query, top_k=10)

print("Top matches:")
for match in results["matches"]:
    print(f"ID: {match['id']}, Score: {match['score']}")

NameError: name 'knowledge_base' is not defined

# Store and Retrieve locally saved KB


Save the KB and the test (20% of policies) into two pickles on google drive

In [13]:
import pickle
import os
import subprocess
from google.colab import drive

drive.mount('/content/drive')


def storeOnDrive(variable, name):

    filename = f'{name}.pickle'
    folder_path = '/content/drive/MyDrive/PicklesSOC4AI/Generation/ConfigurationA-DP'

    # Save the pickle file locally
    with open(f'{name}.pickle', 'wb') as handle:
        pickle.dump(variable, handle, protocol=pickle.HIGHEST_PROTOCOL)

    # Create the folder if it doesn't exist
    os.makedirs(folder_path, exist_ok=True)

    # Move the file using subprocess (handles filenames safely)
    subprocess.run(['mv', filename, folder_path])


def retriveFromDrive(name):
  filepath = f'/content/drive/MyDrive/PicklesSOC4AI/Generation/ConfigurationA-DP/{name}.pickle'

  # Check if the file exists before loading
  if not os.path.exists(filepath):
      raise FileNotFoundError(f"No such file: {filepath}")

  with open(filepath, 'rb') as handle:
      return pickle.load(handle)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [14]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
storeOnDrive(knowledge_base, "knowledge_base")

In [None]:
storeOnDrive(test_query, "test_query")

Retrive the KB and the test queries to perform the RAG Pipeline (to execute the pipeline run from here)

In [15]:
knowledge_base = retriveFromDrive("knowledge_base")
test_query = retriveFromDrive("test_query")

Print for debbugging

In [12]:
print(knowledge_base["policies"][90])

{'nlp': '# Admins can read/write all user sessions', 'rego': 'allow {\n\tobject.type == "session"\n\tsubject.global_role == admin\n\taction == [read, write][_]\n}'}


In [13]:
print(test_query[22])

{'nlp': 'Global admins and maintainers can read and write macos setup assistants.', 'rego': 'allow {\n\t object.type == "mdm_apple_setup_assistant"\n\t subject.global_role == [admin, maintainer][_]\n\t action == [read, write][_]\n}'}


# 2. RAG pipeline

Some test policies need to be corrected to more accurately match the corresponding Rego code.

In [16]:
print(test_query)

[{'nlp': 'Global admins, maintainers, and gitops can read and write policies.', 'rego': 'allow {\n\t object.type == "policy"\n\t subject.global_role == [admin, maintainer, gitops][_]\n\t action == [read, write][_]\n}'}, {'nlp': 'Global admins and maintainers can read and write bootstrap packages.', 'rego': 'allow {\n\t object.type == "mdm_apple_bootstrap_package"\n\t subject.global_role == [admin, maintainer][_]\n\t action == [read, write][_]\n}'}, {'nlp': 'Global admins, maintainers, observer_plus and observers can read teams.', 'rego': 'allow {\n\tobject.type == "team"\n\tobject.id != 0\n\tsubject.global_role == [admin, maintainer, observer, observer_plus][_]\n\taction == read\n}'}, {'nlp': 'Team admin, maintainer and observer_plus running a non-observers_can_run query that belongs to their team when no target teams are specified.', 'rego': 'allow {\n\t object.type == "targeted_query"\n\t object.observer_can_run == false\n\t is_null(subject.global_role)\n\t action == run\n\n\t team_r

In [1]:
!pip install langchain langchain-community langchain-openai langchain-pinecone pinecone



Retrieve the policies

In [2]:
from langchain_pinecone import PineconeVectorStore
from langchain_community.embeddings import HuggingFaceEmbeddings
import os
from google.colab import userdata
from langchain_openai import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from pinecone import ServerlessSpec
from pinecone.grpc import PineconeGRPC as Pinecone


For example, replace imports like: `from langchain_core.pydantic_v1 import BaseModel`
with: `from pydantic import BaseModel`
or the v1 compatibility namespace if you are working in a code base that has not been fully upgraded to pydantic 2 yet. 	from pydantic.v1 import BaseModel

  from langchain_pinecone.vectorstores import Pinecone, PineconeVectorStore


In [4]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('sentence-transformers/all-mpnet-base-v2')

In [5]:
os.environ["PINECONE"] = userdata.get("PINECONE")
os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")

In [6]:
pc = Pinecone(api_key= userdata.get('PINECONE'))

index_name = "policies-a"

In [7]:
#Connect to the Index
index = pc.Index(index_name)

The index vector is ready, test one of the policy to check if the retrieve works

In [None]:
# Test a sample query to verify if the vector database is working correctly

query = knowledge_base["policies"][10]["nlp"]

print("\n TEST QUERY:\n")
print(query)

# Encode the natural language query into an embedding
query_embedding = model.encode(query)

# Retrieve the top 5 most similar entries from the vector database
results = index.query(vector=query_embedding, top_k=5)

print("\n ---> RETRIEVAL RESULTS:\n")
for match in results["matches"]:
    # Extract the numeric index from the ID (e.g., "id-10" -> 10)
    idx = int(match["id"].split("-")[1])
    nlp_text = knowledge_base["policies"][idx]["nlp"]
    rego_code = knowledge_base["policies"][idx]["rego"]

    print(f"ID: {match['id']} - Score: {round(match['score']*100, 2)}%")
    print(f"**NLP**: {nlp_text}")
    print(f"**REGO**:\n{rego_code}")
    print("\n" + "-"*50 + "\n")


 TEST QUERY:

Team admins, maintainers, observers and observer_plus can read MDM commands on hosts of their teams.

 ---> RETRIEVAL RESULTS:

ID: id-10 - Score: 100.04%
**NLP**: Team admins, maintainers, observers and observer_plus can read MDM commands on hosts of their teams.
**REGO**:
allow {
	 not is_null(object.team_id)
	 object.type == "mdm_command"
	 team_role(subject, object.team_id) == [admin, maintainer, observer, observer_plus][_]
	 action == read
}

--------------------------------------------------

ID: id-58 - Score: 91.64%
**NLP**: Team admins and maintainers can write (execute) MDM commands on hosts of their teams.
**REGO**:
allow {
	 not is_null(object.team_id)
	 object.type == "mdm_command"
	 team_role(subject, object.team_id) == [admin, maintainer, gitops][_]
	 action == write
}

--------------------------------------------------

ID: id-55 - Score: 90.56%
**NLP**: Global admins, maintainers, observers and observer_plus can read MDM commands.
**REGO**:
allow {
	 obj

##Retrieve Function

Since retrieval works correctly, we can define a function that takes as input the natural language policy to compare and the number of similar policies to retrieve.

In [8]:
def retrieve_relevant_policies(query, top_k):
    """
    Searches for the policies that are semantically most similar to the query (using cosine similarity) and returns the NLP text along with the corresponding Rego code.

    Args:
        query (str): The policy written in natural language.
        top_k (int): Number of results (most similar policies) to retrieve.

    Returns:
        list of dict: Every dict contains 'nlp', 'rego' and 'score'.
    """
    # Encode the natural language query (policy) into an embedding
    query_embedding = model.encode(query)

    # Retrieve the top k most similar entries from the vector database
    results = index.query(vector=query_embedding, top_k=top_k)

    retrieved_policies = []
    for match in results["matches"]:
        idx = int(match["id"].split("-")[1])
        nlp_text = knowledge_base["policies"][idx]["nlp"]
        rego_code = knowledge_base["policies"][idx]["rego"]
        score = match["score"]
        # The retrieved policies are appended in the same variable
        retrieved_policies.append({
            "nlp": nlp_text,
            "rego": rego_code,
            "score": score
        })

    # The retrieved policies are sorted in descending order
    retrieved_policies = sorted(retrieved_policies, key=lambda x: x["score"], reverse=True)

    return retrieved_policies


##Function to generate rego from the natural language policy

The function:

- extract information from the KB (actions, roles..)
- prompt construction
- gives the info to the llm
- get the output

In [9]:
def generate_rego_from_query(query, retrieved_policies, llm_chain, prompt_template, verbose=True):
    """
    Use the query in nl + retrieved policies to generate a new Rego policy using LLM.

    Args:
        query (str): The natural language policy.
        retrieved_policies (list of dict): retrieved policies with 'nlp', 'rego', 'score'.
        llm_chain (LLMChain): Chain LLM already used.
        prompt_template (PromptTemplate): Prompt template to format with the information.
        verbose (bool): if True print the complete query completa and the scores.

    Returns:
        str: Rego generated code. .
    """
    # Extract information from the knowledge base to provide as input to the LLM

    # List of Actions
    actions = ", ".join(knowledge_base.get("actions", [])) or "None"

    # List of Roles
    roles = ", ".join(knowledge_base.get("roles", [])) or "None"

    # User Specified Functions (dict or string)
    user_specified_functions_data = knowledge_base.get("user_specified_functions", [])
    if user_specified_functions_data and isinstance(user_specified_functions_data[0], dict):
        user_specified_functions = "\n".join([
            f"- Description: {f['description']}\n  Definition: {f['definition']}"
            for f in user_specified_functions_data
        ]) or "None"
    else:
        user_specified_functions = ", ".join(user_specified_functions_data) or "None"

    # User Specified Actions (dict or string)
    user_specified_actions_data = knowledge_base.get("user_specified_actions", [])
    if user_specified_actions_data and isinstance(user_specified_actions_data[0], dict):
        user_specified_actions = "\n".join([
            f"- Description: {a['description']}\n  Definition: {a['definition']}"
            for a in user_specified_actions_data
        ]) or "None"
    else:
        user_specified_actions = ", ".join(user_specified_actions_data) or "None"

    # Specified Actions (dict or string)
    specified_actions_data = knowledge_base.get("specified_actions", [])
    if specified_actions_data and isinstance(specified_actions_data[0], dict):
        specified_actions = "\n".join([
            f"- Description: {a.get('description', '')}\n  Action Definition: {a.get('definition', '')}"
            for a in specified_actions_data
        ]) or "None"
    else:
        specified_actions = ", ".join(specified_actions_data) or "None"

    # Prefixies
    prefixies_data = knowledge_base.get("prefixies", {})
    describe = prefixies_data.get("describe", "")
    prefixes_list = prefixies_data.get("prefix", [])
    prefixes_joined = ", ".join(prefixes_list) if isinstance(prefixes_list, list) else str(prefixes_list)
    prefixies = f"Describe: {describe}\nPrefixes: {prefixes_joined}" or "None"

    # Retrieved policies
    retrieved_policies_text = "\n\n".join([
        f"- NLP: {doc['nlp']}\n  REGO:\n{doc['rego']}" for doc in retrieved_policies
    ]) or "None"


   # Construct the prompt by formatting the input prompt_template with information from the knowledge base and retrieved policies
    formatted_prompt = prompt_template.format(
        query=query,
        retrieved_policies=retrieved_policies_text,
        actions=actions,
        user_specified_functions=user_specified_functions,
        roles=roles,
        user_specified_actions=user_specified_actions,
        specified_actions=specified_actions,
        prefixies=prefixies
    )

    if verbose:
        print("\n--- FULL PROMPT ---\n")
        print(formatted_prompt)

    # Finally after the prompt is ready we can call the llm, and get the response passing the prompt's elements
    result = llm_chain.invoke({
        "query": query,
        "retrieved_policies": retrieved_policies_text,
        "actions": actions,
        "user_specified_functions": user_specified_functions,
        "roles": roles,
        "user_specified_actions": user_specified_actions,
        "specified_actions": specified_actions,
        "prefixies": prefixies
    })

    # Finally we can extract the output, as the rego policy
    generated_rego = result["text"].strip()

    return generated_rego


##Generation of the policy

Prompt For the llm with the KB elements and the rules to follow for the generation.

In [17]:
#Fixed

#call the llm model
llm = ChatOpenAI(model="gpt-4o", temperature=0)

#set the prompt with the different elements of the dictonary
prompt_template = PromptTemplate(
    input_variables=[
        "query",
        "actions",
        "user_specified_functions",
        "roles",
        "user_specified_actions",
        "specified_actions",
        "prefixies",
        "retrieved_policies"
    ],
    template="""
You are an expert assistant specialized in generating Rego policies (Open Policy Agent).

You are provided with:
- A complete policy knowledge base describing available elements of the system.
- A user request which is a policy in natural language you must convert to rego.
- A set of reference policies retrieved from a knowledge base to use to understand the attributes and values you must use..

A policy typically has three essential components:
1. SUBJECT: Who or what the rule applies to (e.g., user, global admin, team member).
2. OBJECT: The resource type the policy targets (e.g., team, host, user) as object.type.
3. ACTION: The action permitted or denied by the policy (e.g., read, write, write_role, change_password).


Here the elements you're provided with:
---

NEW POLICY QUERY (Natural Language):
{query}

---
Here are important information about the system and its elements you must adapt to:
Available Actions:
{actions}

User-Specified Functions:
{user_specified_functions}

Roles:
{roles}

User-Specified Actions:
{user_specified_actions}

Specified Actions:
{specified_actions}

Prefix Rules:
{prefixies}

---

RETRIEVED POLICIES (for reference):
{retrieved_policies}

---

Additional information:


Your task involves several clear steps:

Step 1: Identify and clearly state in natural language the SUBJECT, OBJECT, and ACTION from the new policy query in natural language.

Step 2: For each retrieved policy(1 to 10), state the natural language formulation then extract and state in natural language and in rego the SUBJECT, OBJECT, and ACTION.

Step 3: Find a match between the new policy OBJECT and one of the retrieved policies OBJECT. To find the match, look at the new policy OBJECT's natural language and the retrieved OBJECT in natural language and in rego and if one of them is sintattically similar (minor small changes, ignoring minor lexical variations such as underscores, spaces, or plurals) copy for the new policy's "object.type" (or the corrisponing rego regarding the OBJECT) the object.type from the retrieved policy.
A. Example of the "OBJECT" match reasoning (natural language-natural language match):
1. New Policy Breakdown:
  - New policy nl: "Admin can write my data product in teams"
   - SUBJECT: "Admin"
   - OBJECT: "Data Product in teams"
   - ACTION: "write"

2. Retrieved Policies Breakdown (nl and rego):
   - Policy nl: "Researcher can read my data products in teams"
     - SUBJECT: "Researcher"
     - OBJECT: "Data Products in teams"
     - ACTION: "read"
     - Rego OBJECT: `object.type == "DP-group"`
Since the 1 and 2 "OBJECT" matches in their natural language formulation (Data Product in teams) the new policy's rego will get the corrisponing rego for the retrieved policy's OBJECT, so in the generation you should use "object.type == "DP-Group".

IMPORTANT: When you chose the OBJECT match consider only the object.type or the OBJECT expressed in natural languege of the retrieved policy, do not consider SUBJECT or ACTION to find the OBJCET match!
Do not invent the value of object.type for the new policy, use the most relevant object.type comparing the natural language new policy's OBJECT with the natural language retrieved.

Step 4: Generate a new Rego policy, use the OBJECT's rego found, and generate the rego for SUBJECT and ACTION, using all the informations retrieve from the knowledge base and the retrieved policies.
Always preserve the retrieved "object.type" and never invent new object types. The SUBJECT and ACTION do not necessarily have to correspond to the same policy from which the OBJECT.TYPE was taken; instead, consider all retrieved policies and adapt the code to the new request.

!!FOR STEP 4 and 5 you must follow these RULES!!!:
  1. RULES FOR YOUR GENERATION:
    - Generate syntactically valid and properly formatted Rego code.
    - Produce only ONE allow block, do not use formulation with multiple blocks (ex. DO NOT USE MORE THAN 1 allow OR ANY else/or).
    - Copy object.type from a retrieved policy when the OBJECT matches the OBJECT of the new policy's natural language (as shown before).
    - Do not invent object.type's value but use exaclty the rego value retrieved. Minor changes ARE NOT permitted.
    - IF YOU FIND A POLICY WITH THE MATCHED OBJECT COPY THE REGO VALUE , EVEN IF SUBJECT OR ACTION DON'T MATCH.
    - Maintain consistency in spacing, indentation, and formatting with the retrieved policies.
    - Do not invent any new fields, attributes, or values.
    - Do not invent new types and do not use the object.type directly from the natural language query.
    - DO NOT ADD ANY COMMENTS IN THE GENERATED REGO
    - DO not skip any roles.
    - Do not use negation, DO NOT USE "action !=read", negation on attributes are not permited; use only positive expressions like "action == write" even if the natural language specify a negation of an exept.
    - Do not repeat the same attribute.
    - Don't use array when the value of one attribute is just one, use just the direct value.


  2. RULES SPECIF OF THE SYSTEM, YOU MUST FOLLOW THEM FOR THE GENERATION:
    - Global admins are just "admin", they do not belong to any team. Do not use the function `team_role` for global admins. Instead, use `subject.global_role`.
    - The valid actions that Team Admins can perform on team users are: `read`, `write`, `write_role`, and `change_password`. If this request is present, consider these actions. In case like: Team admins can perform all operations on the team users (except delete) do not use negation, but only positive way to rapresent the consept, omitting "delete" in the array is the only correct solution, do not use negation like "action!=delete" if the natural language specify that someone can't delete, istead use action == [change_password, write, write_role, read][_], the same goes for any action.
    - "extra_roles" attribute sometimes is part of the "SUBJECT" and should be used in the rego if the natural language mention an extra role.
    - "base_roles" attribute sometimes is part of the "SUBJECT" and should be used in the rego if the natural language mention an extra role.
    - Sometimes "ACTION" can be "list", in that case instead of "action" attribute use the function allowed_list_roles.
    - IMPORTANT: Use team_role only when "Team Admin" or "Team Admins" are present (not just "team" as object.type or just "admin", in that case use standard Instead, use `subject.global_role`.)
    - IMPORTANT: Use "allowed_read_roles" only then the natural language ACTION is selective_read.
    - IMPORTANT: use "allowed_list_roles" only then the natural language ACTION is list and/or selective_list
    - IMPORTANT: use "team_role" only if (one of) the SUBJECT is "Team Admin", do not use team_role for any other role! (with Global admin, observer, admin etc DO NOT USE team_role)
    - IMPORTANT: If you include "team_role" with "object.team_id" in the generation, you must include the check on "not is_null(object.team_id)" attribute.
    - IMPORTANT: if object.type == "team", YOU DON'T USE "team_role", use "team_role" only when SUBJECT is "Team Admin"!!
    - IMPORTANT: if object.type == "team" add a check on "object.id != 0"
    - IMPORTANT: use "not is_null(object.team_id)" only and always if you use team_role() and "object.team_id" is present as argument.
    - IMPORTANT: if there is "target teams" in natural language you must add the check is_null(object.host_targets.teams)
    - IMPORTANT: do not include the check on is_null(object.team_id) is object.team_is is not an argument of team_role.



Step 5:
- Output ONLY the corrisponing rego code, use ONLY one block of allow (no else/or), follow all the rules specified before (1 RULES FOR YOUR GENERATION and 2. RULES SPECIF OF THE SYSTEM) - DO NOT SKIP THEM!!
- !! Remember to add the checks in needed and the proper use of the functions.
- For the formulation, staying as close as possible to the retrieved policies (just one block of allow, no repeted attributes, object.type match, same structures for attributes and functions).






"""

)
#Choose the shortest and simplest formulation, staying as close as possible to the retrieved policies.
#- IMPORTANT: if the value (of whatever attribute) is just one element, and not a list (array), use only the element and don't include the array, example:
        #"team_role(subject, object.teams[_].id) == admin" ok
        #"team_role(subject, object.team_id) == [admin][_]" not ok
        #"subject.global_role == [observer][_]" no ok
        #"subject.global_role == observer" ok
    #-IMPORTANT: Instead if you find multiple values for the same attributes (roles as admin and maintainers), use the array:
          #"subject.global_role == admin, subject.global_role == maintainer" NOT ok
          #"subject.global_role == [admin, maintainer][_]" ok

chain = LLMChain(
    llm=llm,
    prompt=prompt_template
)

DO SOME CLEANING IF THE REGO IS NOT CORRECTED

Some Policies from the data set (the gold standard) are not expressed in a correct rego, so we correct them to provide a right validation.

In [18]:
for cell in test_query:
    print(cell)
    print()

{'nlp': 'Global admins, maintainers, and gitops can read and write policies.', 'rego': 'allow {\n\t object.type == "policy"\n\t subject.global_role == [admin, maintainer, gitops][_]\n\t action == [read, write][_]\n}'}

{'nlp': 'Global admins and maintainers can read and write bootstrap packages.', 'rego': 'allow {\n\t object.type == "mdm_apple_bootstrap_package"\n\t subject.global_role == [admin, maintainer][_]\n\t action == [read, write][_]\n}'}

{'nlp': 'Global admins, maintainers, observer_plus and observers can read teams.', 'rego': 'allow {\n\tobject.type == "team"\n\tobject.id != 0\n\tsubject.global_role == [admin, maintainer, observer, observer_plus][_]\n\taction == read\n}'}

{'nlp': 'Team admin, maintainer and observer_plus running a non-observers_can_run query that belongs to their team when no target teams are specified.', 'rego': 'allow {\n\t object.type == "targeted_query"\n\t object.observer_can_run == false\n\t is_null(subject.global_role)\n\t action == run\n\n\t team_ro

###Corrections of any discrepancies between the natural language description and the gold standard to ensure accurate validation.

In [44]:
test_query[0] = {'nlp': 'Global admins, maintainers, and gitops can read and write policies.', 'rego': 'allow {\n\t object.type == "policy"\n\t subject.global_role == [admin, maintainer, gitops][_]\n\t action == [read, write][_]\n}'}

test_query[1] = {'nlp': 'Global admins and maintainers can read and write bootstrap packages.', 'rego': 'allow {\n\t object.type == "mdm_apple_bootstrap_package"\n\t subject.global_role == [admin, maintainer][_]\n\t action == [read, write][_]\n}'}

test_query[2] = {'nlp': 'Global admins, maintainers, observer_plus and observers can read teams.', 'rego': 'allow {\n\tobject.type == "team"\n\tobject.id != 0\n\tsubject.global_role == [admin, maintainer, observer, observer_plus][_]\n\taction == read\n}'}

test_query[3] = {'nlp': 'Team admin, maintainer and observer_plus running a non-observers_can_run query that belongs to their team when no target teams are specified.', 'rego': 'allow {\n\t object.type == "targeted_query"\n\t object.observer_can_run == false\n\t is_null(subject.global_role)\n\t action == run\n\n\t team_role(subject, object.team_id) == [admin, maintainer, observer_plus][_]\n\n\t # there are no team targets\n\t is_null(object.host_targets.teams)\n}'}

test_query[4] = {'nlp': 'Team admins and maintainers can write to hosts of their own team', 'rego': 'allow {\n\tobject.type == "host"\n\tteam_role(subject, object.team_id) == [admin, maintainer][_]\n\taction == write\n}'}

test_query[5] = {'nlp': 'Global admins and gitops can configure, read and list certificate Authorities', 'rego': 'allow {\n\t object.type == "certificate_authority"\n\t subject.global_role == [admin, gitops][_]\n\t action == [read, write, list][_]\n}'}

test_query[6] = {'nlp': 'Team admins, maintainers, and gitops can write any installable entity (software installer or VPP app) in their teams.', 'rego': 'allow {\n\t not is_null(object.team_id)\n\t object.type == "installable_entity"\n\t team_role(subject, object.team_id) == [admin, maintainer, gitops][_]\n\t action == write\n}'}

test_query[7]={'nlp': 'Team admins can perform all operations on the team users (except changing their password).', 'rego': 'allow {\n\tobject.type == "user"\n\tteam_role(subject, object.team_id) == admin\n\taction == [read, write, write_role][_]\n}'}

test_query[8] = {'nlp': 'Global gitops can write bootstrap packages.', 'rego': 'allow {\n\t object.type == "mdm_apple_bootstrap_package"\n\t subject.global_role == gitops\n\t action == write\n}'}

test_query[9] = {'nlp': 'Team admin and maintainers, observer and oberserver_plus can read software install results on hosts for their\n teams (not gitops as this is not something that relates to fleetctl apply).', 'rego': 'allow {\n\t object.type == "host_software_installer_result"\n\t not is_null(object.host_team_id)\n\t team_role(subject, object.host_team_id) == [admin, maintainer, observer, observer_plus][_]\n\t action == read\n}'}

test_query[10] = {'nlp': 'Global admins, maintainers, observer_plus and observers and extra role gitops can list hosts.', 'rego': 'allow {\n\tobject.type == "host"\n\tbase_roles := {admin, maintainer, observer_plus, observer}\n\textra_roles := {gitops}\n\tallowed_list_roles(action, base_roles, extra_roles)[_] == subject.global_role\n}'}

test_query[11] = {'nlp': 'Any type of role can read secret variables.\n\n Read permission here is just about being able to read the names and ids, not the content (value).', 'rego': 'allow {\n\t object.type == "secret_variable"\n\t subject.global_role == [admin, maintainer, gitops, observer_plus, observer][_]\n\t action == read\n}'}

#invites is not an object that has other policy in the kb so i change it with another object
test_query[12] = {'nlp': 'Global admins may read/write host', 'rego': 'allow {\n\tobject.type == "host"\n\tsubject.global_role == admin\n\taction == [read,write][_]\n}'}

test_query[13] = {'nlp': 'Gitops can write macos setup assistants.', 'rego': 'allow {\n\t object.type == "mdm_apple_setup_assistant"\n\t subject.global_role == gitops\n\t action == write\n}'}

test_query[14] = {'nlp': 'Team admin, maintainer and observer_plus running a global non-observers_can_run query must have the targets filtered to only teams that they maintain.', 'rego': 'allow {\n\t object.type == "targeted_query"\n\t object.observer_can_run == false\n\t is_null(subject.global_role)\n\t action == run\n\n\t is_null(object.team_id)\n\n\t not is_null(object.host_targets.teams)\n\t ok_teams := { tmid | tmid := object.host_targets.teams[_]; team_role(subject, tmid) == [admin, maintainer, observer_plus][_] }\n\t count(ok_teams) == count(object.host_targets.teams)\n}'}

test_query[15] = {'nlp': 'Global admins, maintainers, observer_plus, observers and gitops can read labels.', 'rego': 'allow {\n\tobject.type == "label"\n\tsubject.global_role == [admin, maintainer, observer_plus, observer, gitops][_]\n\taction == read\n}'}

test_query[16] = {'nlp': 'Team gitops can write MDM Apple Settings of their teams.', 'rego': 'allow {\n\t not is_null(object.team_id)\n\t object.type == "mdm_apple_settings"\n\t team_role(subject, object.team_id) == gitops\n\t action == write\n}'}

test_query[17] = {'nlp': 'Global admin, gitops, maintainer, observer_plus and observer can read global config.', 'rego': 'allow {\n\tobject.type == "app_config"\n\tsubject.global_role == [admin, gitops, maintainer, observer_plus, observer][_]\n\taction == read\n}'}

test_query[18] = {'nlp': 'Team admins and maintainers can read and write macos setup assistants on their teams.', 'rego': 'allow {\n\t not is_null(object.team_id)\n\t object.team_id != 0\n\t object.type == "mdm_apple_setup_assistant"\n\t team_role(subject, object.team_id) == [admin, maintainer][_]\n\t action == [read, write][_]\n}'}

test_query[19] = {'nlp': 'Team admins, maintainers, observer_plus and observers can read scripts for their teams.', 'rego': 'allow {\n\t object.type == "script"\n\t not is_null(object.team_id)\n\t team_role(subject, object.team_id) == [admin, maintainer, observer_plus, observer][_]\n\t action == read\n}'}

test_query[20] = {'nlp': 'Global admins and maintainers can read all maintained apps.', 'rego': 'allow {\n\t object.type == "maintained_app"\n\t subject.global_role == [admin, maintainer][_]\n\t action == read\n}'}

test_query[21] = {'nlp': 'Global admins, maintainers and observer_plus can run any query saved query.', 'rego': 'allow {\n\t object.type == "query"\n\t subject.global_role == [admin, maintainer, observer_plus][_]\n\t action = run\n}'}

test_query[22] = {'nlp': 'Global admins and maintainers can read and write macos setup assistants.', 'rego': 'allow {\n\t object.type == "mdm_apple_setup_assistant"\n\t subject.global_role == [admin, maintainer][_]\n\t action == [read, write][_]\n}'}

test_query[23] = {'nlp': 'Team admin, maintainer, observer_plus and observer can read global config.', 'rego': 'allow {\n\t object.type == "app_config"\n\t # If role is admin, maintainer, observer_plus or observer on any team.\n\t team_role(subject, subject.teams[_].id) == [admin, maintainer, observer_plus, observer][_]\n\t action == read\n}'}


In [45]:
print(test_query)

[{'nlp': 'Global admins, maintainers, and gitops can read and write policies.', 'rego': 'allow {\n\t object.type == "policy"\n\t subject.global_role == [admin, maintainer, gitops][_]\n\t action == [read, write][_]\n}'}, {'nlp': 'Global admins and maintainers can read and write bootstrap packages.', 'rego': 'allow {\n\t object.type == "mdm_apple_bootstrap_package"\n\t subject.global_role == [admin, maintainer][_]\n\t action == [read, write][_]\n}'}, {'nlp': 'Global admins, maintainers, observer_plus and observers can read teams.', 'rego': 'allow {\n\tobject.type == "team"\n\tobject.id != 0\n\tsubject.global_role == [admin, maintainer, observer, observer_plus][_]\n\taction == read\n}'}, {'nlp': 'Team admin, maintainer and observer_plus running a non-observers_can_run query that belongs to their team when no target teams are specified.', 'rego': 'allow {\n\t object.type == "targeted_query"\n\t object.observer_can_run == false\n\t is_null(subject.global_role)\n\t action == run\n\n\t team_r

In [46]:
### simple test from test query

sample_test = test_query[22]

query = sample_test["nlp"]
expected_rego = sample_test["rego"]


retrieved_policies = retrieve_relevant_policies(query, top_k=10)
print("Policy natural language:")
print(query)


generated_rego = generate_rego_from_query(query, retrieved_policies, chain, prompt_template, verbose=False)



print("\n Rego Generated by the LLM:\n")
print(generated_rego)
print("\n REGO Gold Standard:\n")
print(expected_rego)
#print("NATURAL LANGUAGE:")
#print(test_query[0])



Policy natural language:
Global admins and maintainers can read and write macos setup assistants.

 Rego Generated by the LLM:

```rego
allow {
    object.type == "mdm_apple_setup_assistant"
    subject.global_role == [admin, maintainer][_]
    action == [read, write][_]
}
```

 REGO Gold Standard:

allow {
	 object.type == "mdm_apple_setup_assistant"
	 subject.global_role == [admin, maintainer][_]
	 action == [read, write][_]
}


In [47]:
def build_evaluation_results(test_query, llm_chain, prompt_template, top_k=10):
    """
    Generate evaluation results for each test example in the test_query list.

    Args:
        test_query (list of dict): List of tests with fields {"nlp", "rego"}.
        llm_chain (LLMChain): An initialized LLMChain instance with model and prompt.
        top_k (int): Number of documents to retrieve for each query.

    Returns:
        list of dict: A list of evaluation results, each containing:
                      - the natural language query
                      - the Rego code generated by the LLM
                      - the expected Rego code
    """
    evaluation_results = []

    for sample_test in test_query:
        query = sample_test["nlp"]
        expected_rego = sample_test["rego"]

        retrieved_policies = retrieve_relevant_policies(query, top_k=top_k)

        generated_rego = generate_rego_from_query(query, retrieved_policies, llm_chain, prompt_template, verbose=False)

        #generated_rego = extract_rego_block(generated_rego)

        print(generated_rego)


        evaluation_results.append({
            "query_nlp": query,
            "generated_rego": generated_rego,
            "expected_rego": expected_rego
        })

    return evaluation_results


In [48]:
# popola evaluation results basato sui test_query
evaluation_results = build_evaluation_results(test_query, chain, prompt_template)


print(f"\n✅ Successfully created {len(evaluation_results)} evaluation examples.\n")
print(evaluation_results)

```rego
allow {
    object.type == "policy"
    subject.global_role == [admin, maintainer, gitops][_]
    action == [read, write][_]
}
```
```rego
allow {
    object.type == "mdm_apple_bootstrap_package"
    subject.global_role == [admin, maintainer][_]
    action == [read, write][_]
}
```
```rego
allow {
    object.type == "team"
    object.id != 0
    subject.global_role == [admin, maintainer, observer_plus, observer][_]
    action == read
}
```
```rego
allow {
    object.type == "targeted_query"
    object.observer_can_run == false
    is_null(subject.global_role)
    action == run

    team_role(subject, object.team_id) == [admin, maintainer, observer_plus][_]

    not is_null(object.team_id)
    is_null(object.host_targets.teams)
}
```
```rego
allow {
    not is_null(object.team_id)
    object.type == "host"
    team_role(subject, object.team_id) == [admin, maintainer][_]
    action == write
}
```
```rego
allow {
    object.type == "certificate_authority"
    subject.global_role =

In [49]:
storeOnDrive(evaluation_results, "evaluation_results")
storeOnDrive(test_query, "test_query")

# Validation Phase

## Upload of the output of the pipeline

In [50]:
from google.colab import drive
drive.mount('/content/drive')

import pickle
import re
import pprint
import json

# Path to the pickle file
pickle_path = '/content/drive/MyDrive/PicklesSOC4AI/Generation/ConfigurationA-DP/evaluation_results.pickle'

# Load the evaluation results
with open(pickle_path, 'rb') as f:
    evaluation_results = pickle.load(f)

# Print nicely if you want to inspect
print(json.dumps(evaluation_results, indent=2))



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
[
  {
    "query_nlp": "Global admins, maintainers, and gitops can read and write policies.",
    "generated_rego": "```rego\nallow {\n    object.type == \"policy\"\n    subject.global_role == [admin, maintainer, gitops][_]\n    action == [read, write][_]\n}\n```",
    "expected_rego": "allow {\n\t object.type == \"policy\"\n\t subject.global_role == [admin, maintainer, gitops][_]\n\t action == [read, write][_]\n}"
  },
  {
    "query_nlp": "Global admins and maintainers can read and write bootstrap packages.",
    "generated_rego": "```rego\nallow {\n    object.type == \"mdm_apple_bootstrap_package\"\n    subject.global_role == [admin, maintainer][_]\n    action == [read, write][_]\n}\n```",
    "expected_rego": "allow {\n\t object.type == \"mdm_apple_bootstrap_package\"\n\t subject.global_role == [admin, maintainer][_]\n\t action == [read, write][_]\n}"
  }

# Semantic Equivalence

##Function to validate a single policy with LLM - Semantic Equivalence

In [51]:
def are_policies_equivalent_via_llm(nl_query, compared_rego, expected_rego, llm_model):
    """
    Compare the original generated policy to the gold standard using LLM.

    Returns:
        equiv (bool): True if equivalent.
        feedback (str): Full LLM raw response.
    """
    prompt = f"""
You are an expert in Open Policy Agent (OPA) and Rego policy logic.

User request (NL):
{nl_query}

Compare these two Rego policies. Determine whether they allow/deny
the same requests under the same conditions.

STRICT RULES TO FOLLOW:

- The order of elements in subject.global_role does NOT affect equivalence.
  Differences in the order of roles are NOT a sign of inequivalence.

- The order of attributes does NOT affect equivalence.
  Differences in the order of roles are NOT a sign of inequivalence.

- Differences in formatting, indentation, or line breaks
  do NOT count as inequivalence.

- Only differences in the actual logic of access control (subject, object, action, conditions)
  should be considered as a valid reason to say the policies are not equivalent.

- Addictional check on not is_null(object.team_id) must be consider correct if object.team_id is an element of team_role or any functions, even if the check is not present in the gold standard.

If equivalent by the logic, as if both policies aim to allow access based on the same condition, answer "YES" on the first line, do not condiser formatting and syntax differences, line breaks, identation, order of roles or order of variables, and potential iterpretation as possible affect on the logical interpretation of the conditions. IF there is an addictional check on not is_null(object.team_id) it must be consider correct if object.team_id is an element of team_role or any functions, even if the check is not present in the gold standard if the check is the only different then answer YES.


If only inequivalent by logic, answer "NO" and explain briefly, do not condiser formatting and syntax differences, line breaks, identation, order of roles or variables, and potential iterpretation as possible affect on the logical interpretation of the conditions. Addictional check on not is_null(object.team_id) must be consider correct and not a sign of inequivalnce if object.team_id is an element of team_role or any functions, even if the check is not present in the gold standard.

---
Original Generated Rego Policy:
{compared_rego}

Expected (Gold Standard) Rego Policy:
{expected_rego}
"""
    print(">>> SEMANTIC EQUIVALENCE PROMPT TO LLM:\n", prompt)
    response = llm_model.invoke(prompt)
    raw = response.content
    print("<<< SEMANTIC EQUIVALENCE LLM RESPONSE:\n", raw)

    equiv = raw.strip().lower().startswith("yes")
    return equiv, raw


In [28]:
from difflib import SequenceMatcher

def calculate_text_similarity(a, b):
    """
    Calculate text similarity ratio between two Rego code blocks.
    """
    return SequenceMatcher(None, a, b).ratio()


##Function to do the full validation of a single policy (semantic + text similarity)

In [29]:
def validate_and_print_policy(sample, llm_model):
    """
    Validate a single policy:
      1) Semantic equivalence check using the ORIGINAL generated policy
      2) Calculate text similarity between original and gold policy

    Returns a dict with validation details.
    """
    nl_query = sample["query_nlp"]
    original = sample["generated_rego"].strip("```rego\n```").strip()
    expected = sample["expected_rego"]

    print(f"\n=== Policy #{sample.get('index','?')} ===\n")
    print("NL Query:\n", nl_query, "\n")


    # Semantic equivalence check (on original policy)
    equiv, eq_feedback = are_policies_equivalent_via_llm(nl_query, original, expected, llm_model)
    print("\nSemantically Equivalent? ", equiv)
    print("Equivalence Feedback:\n", eq_feedback, "\n")

    # Text similarity score
    similarity = calculate_text_similarity(original, expected)
    print(f"Text Similarity (original vs gold): {similarity:.2f}")

    return {
        "query_nlp": nl_query,
        "original_generated_rego": original,
        "expected_rego": expected,
        "semantic_equivalent": equiv,
        "equivalence_feedback": eq_feedback,
        "text_similarity_score": similarity
    }


Test first policies (small sample)

In [52]:
from google.colab import userdata
import os

# Set OpenAI API key
os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")

from langchain_openai import ChatOpenAI

# Initialize the LLM (e.g., GPT-4o-mini model)
llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0
)


print(">>> Testing the first policy...\n")

test_results = []

for i, sample in enumerate(evaluation_results[:1], start=1):
    sample["index"] = i
    result = validate_and_print_policy(sample, llm)
    test_results.append(result)


>>> Testing the first policy...


=== Policy #1 ===

NL Query:
 Global admins, maintainers, and gitops can read and write policies. 

>>> SEMANTIC EQUIVALENCE PROMPT TO LLM:
 
You are an expert in Open Policy Agent (OPA) and Rego policy logic.

User request (NL):
Global admins, maintainers, and gitops can read and write policies.

Compare these two Rego policies. Determine whether they allow/deny
the same requests under the same conditions.

STRICT RULES TO FOLLOW:

- The order of elements in subject.global_role does NOT affect equivalence.
  Differences in the order of roles are NOT a sign of inequivalence.

- The order of attributes does NOT affect equivalence.
  Differences in the order of roles are NOT a sign of inequivalence.

- Differences in formatting, indentation, or line breaks
  do NOT count as inequivalence.

- Only differences in the actual logic of access control (subject, object, action, conditions)
  should be considered as a valid reason to say the policies are not equ

In [53]:

policy_index = 1

print(f">>> Testing policy at index {policy_index}...\n")

test_results = []

sample = evaluation_results[policy_index]
sample["index"] = policy_index

result = validate_and_print_policy(sample, llm)
test_results.append(result)

>>> Testing policy at index 1...


=== Policy #1 ===

NL Query:
 Global admins and maintainers can read and write bootstrap packages. 

>>> SEMANTIC EQUIVALENCE PROMPT TO LLM:
 
You are an expert in Open Policy Agent (OPA) and Rego policy logic.

User request (NL):
Global admins and maintainers can read and write bootstrap packages.

Compare these two Rego policies. Determine whether they allow/deny
the same requests under the same conditions.

STRICT RULES TO FOLLOW:

- The order of elements in subject.global_role does NOT affect equivalence.
  Differences in the order of roles are NOT a sign of inequivalence.

- The order of attributes does NOT affect equivalence.
  Differences in the order of roles are NOT a sign of inequivalence.

- Differences in formatting, indentation, or line breaks
  do NOT count as inequivalence.

- Only differences in the actual logic of access control (subject, object, action, conditions)
  should be considered as a valid reason to say the policies are not 

##Validate all the policies

In [54]:
print(">>> Running full validation on all policies...\n")

all_validated = []

for i, sample in enumerate(evaluation_results, start=1):
    sample["index"] = i
    result = validate_and_print_policy(sample, llm)
    all_validated.append(result)



print("\n All policies validated and saved to 'validated_results_full.pickle'\n")




>>> Running full validation on all policies...


=== Policy #1 ===

NL Query:
 Global admins, maintainers, and gitops can read and write policies. 

>>> SEMANTIC EQUIVALENCE PROMPT TO LLM:
 
You are an expert in Open Policy Agent (OPA) and Rego policy logic.

User request (NL):
Global admins, maintainers, and gitops can read and write policies.

Compare these two Rego policies. Determine whether they allow/deny
the same requests under the same conditions.

STRICT RULES TO FOLLOW:

- The order of elements in subject.global_role does NOT affect equivalence.
  Differences in the order of roles are NOT a sign of inequivalence.

- The order of attributes does NOT affect equivalence.
  Differences in the order of roles are NOT a sign of inequivalence.

- Differences in formatting, indentation, or line breaks
  do NOT count as inequivalence.

- Only differences in the actual logic of access control (subject, object, action, conditions)
  should be considered as a valid reason to say the polic

## Print the validation summary

In [55]:
def print_validation_summary(results):
    """
    Print summary statistics about the validation process.
    """
    total = len(results)
    semantic_ok = sum(r["semantic_equivalent"] for r in results)
    avg_similarity = sum(r["text_similarity_score"] for r in results) / total

    print("\n=== Validation Summary ===")
    print(f"Total policies: {total}")
    #Any modifications in case the LLM's semantic analysis was not accurate.
    #semantic_ok=20
    print(f"Semantically equivalent: {semantic_ok}/{total} ({semantic_ok/total:.1%})")
    print(f"Average text similarity to gold: {avg_similarity:.2f}")
    print("="*30)


Any modifications in case the LLM's semantic analysis was not accurate.

In [56]:
print_validation_summary(all_validated)


=== Validation Summary ===
Total policies: 24
Semantically equivalent: 23/24 (95.8%)
Average text similarity to gold: 0.90


In [57]:
# --- 6. Save validation results to Google Drive ---
storeOnDrive(all_validated, "validated_results_full")
print("\n All policies validated and saved to 'validated_results_full.pickle'\n")


 All policies validated and saved to 'validated_results_full.pickle'

