In [9]:
from langchain.prompts import PromptTemplate
import json
import pandas as pd
from pprint import pprint
# from sklearn.model_selection import train_test_split
# from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
# from langchain_core.example_selectors import SemanticSimilarityExampleSelector
# from langchain_openai import OpenAIEmbeddings
# from langchain_chroma import Chroma
from langchain_openai import ChatOpenAI
import os
import re
os.environ["OPENAI_API_KEY"] = "TOKEN"

In [10]:
def read_sampled_policies(base_path="sampled_policies"):
    # Define the expected nine PaC tool folder names
    pac_tools = [
        "Open Policy Agent (OPA)",
        "HashiCorp Sentinel",
        "Pulumi",
        "Cedar Policy Language (CPL)",
        "Kyverno OSS",
        "Cloud Custodian",
        "AWS Config",
        "OpagateKeeper",
        "Kubewarden"
    ]

    # Dictionary to hold the content lists
    pac_contents = {tool: [] for tool in pac_tools}

    # Iterate over each PaC tool folder
    for tool in pac_tools:
        tool_path = os.path.join(base_path, tool)
        if not os.path.isdir(tool_path):
            print(f"Warning: Folder '{tool}' not found in '{base_path}'")
            continue

        # Traverse all files in the tool's folder
        for root, _, files in os.walk(tool_path):
            for file in files:
                file_path = os.path.join(root, file)
                try:
                    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                        content = f.read()
                        pac_contents[tool].append(content)
                except Exception as e:
                    print(f"Error reading {file_path}: {e}")

    # Unpack dictionary into individual lists
    rego = pac_contents["Open Policy Agent (OPA)"]
    sentinel = pac_contents["HashiCorp Sentinel"]
    pulumi = pac_contents["Pulumi"]
    cedar = pac_contents["Cedar Policy Language (CPL)"]
    kyverno = pac_contents["Kyverno OSS"]
    custodian = pac_contents["Cloud Custodian"]
    aws_config = pac_contents["AWS Config"]
    opagatekeeper = pac_contents["OpagateKeeper"]
    kubewarden = pac_contents["Kubewarden"]

    return rego, sentinel, pulumi, cedar, kyverno, custodian, aws_config, opagatekeeper, kubewarden

In [11]:
rego, sentinel, pulumi, cedar, kyverno, custodian, aws_config, opagatekeeper, kubewarden = read_sampled_policies()
print(len(rego), len(sentinel), len(pulumi))  # Check how many files were read

234 15 17


In [12]:
def extract_valid_json(llm_response):
    """
    Extracts and parses a valid JSON dictionary from LLM response content.

    Parameters:
    - llm_response: A LangChain LLM result object (with a `content` attribute)

    Returns:
    - A dictionary if JSON is valid, else an error dictionary
    """
    try:
        # Extract content string
        content = llm_response.content

        # Remove Markdown code block markers ```json ... ```
        cleaned = re.sub(r"```json\s*|\s*```", "", content.strip(), flags=re.IGNORECASE)

        # Parse the cleaned string into a JSON object
        return json.loads(cleaned)
    except Exception as e:
        return {"error": str(e), "raw_content": content}

In [13]:
def analyze_pac_list(pac_code_list, language, llm):
    """
    Analyze a list of PaC policy code snippets using the GPT-4o-mini model via LangChain (new syntax).

    Args:
        pac_code_list (list): List of strings (each containing PaC policy code).
        language (str): The name of the PaC language (e.g., "Rego", "Sentinel").
        llm: LangChain LLM instance (ChatOpenAI).

    Returns:
        list: List of dictionaries parsed from JSON model outputs.
    """
    base_prompt = f"""
You are an expert in policy analysis, software governance, and cloud-native security.

Given the following policy code snippet written in a Policies-as-Code (PaC) language (e.g., {language}), analyze it and provide a structured taxonomy entry by addressing the following seven dimensions: start by identifying the main purpose of the script, then from the main purpose identify sub-purposes from that category and sub-category for the taxonomy. The output should be structured in JSON format:

1. Primary Purpose:
What is the high-level governance or security domain this policy addresses? Express it in simple, unambiguous sentence.

2. Sub-purposes: 
What are the specific goals or aspects of governance that the policy addresses within the broader purpose? Express it in a simple, unambiguous sentence. 

3. taxonomy category:
Express it in 1 to 4 word maximum, simple, unambiguous.

4. taxonomy sub-category:
Express it in 1 to 4 word maximum, simple, unambiguous.

5. Policy Implemented:
Describe the specific rule enforced. Express it in simple, unambiguous, actionable language.

6. Target Resource:
Specify the resource type, configuration object, or API target to which the policy applies.

7. Rationale:
Justify your interpretation by explaining how the code logic enforces the policy.

Here is the policy code snippet:
{{policy_code}}
Return only a valid JSON dictionary.
"""
    # Use LangChain’s new syntax for prompt composition
    prompt = PromptTemplate(
        input_variables=["policy_code"],
        template=base_prompt
    )
    chain = prompt | llm

    results = []
    
    for i, code in enumerate(pac_code_list[:2]):
        formatted_prompt = prompt.format(policy_code=code)
        print(f"\n--- Prompt for Example {i+1} ---\n")
        print(formatted_prompt)
    
    for code in pac_code_list:
        try:
            output = chain.invoke({"policy_code": code})
            # print(output)
            parsed_json = extract_valid_json(output)  # where output is the LLM result object
            # print(parsed_json)
            # parsed = json.loads(parsed_json)
            # print(parsed)
            results.append(parsed_json)
            # print(results)
        except Exception as e:
            results.append({"error": str(e), "raw_output": output if 'output' in locals() else None})

    return results

In [14]:
# Let's assume you already have a list of Rego policies from the previous function
# and your model is defined as shown in your image:
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.2)

results = analyze_pac_list(kubewarden, "kubewarden policy library", llm)


--- Prompt for Example 1 ---


You are an expert in policy analysis, software governance, and cloud-native security.

Given the following policy code snippet written in a Policies-as-Code (PaC) language (e.g., kubewarden policy library), analyze it and provide a structured taxonomy entry by addressing the following seven dimensions: start by identifying the main purpose of the script, then from the main purpose identify sub-purposes from that category and sub-category for the taxonomy. The output should be structured in JSON format:

1. Primary Purpose:
What is the high-level governance or security domain this policy addresses? Express it in simple, unambiguous sentence.

2. Sub-purposes: 
What are the specific goals or aspects of governance that the policy addresses within the broader purpose? Express it in a simple, unambiguous sentence. 

3. taxonomy category:
Express it in 1 to 4 word maximum, simple, unambiguous.

4. taxonomy sub-category:
Express it in 1 to 4 word maximum, simpl

In [15]:
# def save_results_to_excel(json_list, filename="pac_analysis_results.xlsx"):
#     """
#     Save a list of JSON dictionaries to an Excel file.

#     Parameters:
#     - json_list: List[Dict] – the list of JSON objects (e.g., output from the LLM).
#     - filename: str – the name of the Excel file to save (default: "pac_analysis_results.xlsx").

#     Returns:
#     - None (writes the Excel file to disk).
#     """
#     try:
#         df = pd.DataFrame(json_list)
#         df.to_excel(filename, index=False)
#         print(df.head(10))
#         print(f"Saved {len(df)} entries to '{filename}' successfully.")
#     except Exception as e:
#         print(f"Failed to save results: {e}")

def save_results_to_excel_with_code(json_list, pac_code_list, filename="pac_analysis_results.xlsx"):
    """
    Save a list of JSON dictionaries to an Excel file, including their associated policy code snippets.

    Parameters:
    - json_list: List[Dict] – the list of JSON objects (e.g., output from the LLM).
    - pac_code_list: List[str] – the corresponding list of code snippets.
    - filename: str – the name of the Excel file to save.

    Returns:
    - None (writes the Excel file to disk).
    """
    try:
        # Inject each code snippet into its corresponding JSON dictionary
        enriched_results = []
        for json_obj, code in zip(json_list, pac_code_list):
            json_obj = dict(json_obj)  # Ensure it's mutable
            json_obj["Code Snippet"] = code
            enriched_results.append(json_obj)

        df = pd.DataFrame(enriched_results)
        df.to_excel(filename, index=False)
        print(df.head(5))
        print(f"Saved {len(df)} entries to '{filename}' successfully.")
    except Exception as e:
        print(f"Failed to save results: {e}")


In [16]:
# Assuming `results` is your list of JSON dictionaries
# save_results_to_excel(results, "OPA_results.xlsx")
save_results_to_excel_with_code(results, kubewarden, "Kubewarden_results.xlsx")

                                     Primary Purpose  \
0  This policy addresses the enforcement of label...   

                                        Sub-purposes   taxonomy category  \
0  The policy aims to mandate the presence of spe...  Labeling Standards   

  taxonomy sub-category                                 Policy Implemented  \
0        Pod Compliance  Require the label 'portefaix.xyz/version' on a...   

   Target Resource                                          Rationale  \
0  Kubernetes Pods  The policy logic specifies that any pod creati...   

                                        Code Snippet  
0  # Copyright (C) Nicolas Lamirault <nicolas.lam...  
Saved 1 entries to 'Kubewarden_results.xlsx' successfully.
