In [4]:
from pydantic.v1 import BaseModel, Field
from kubernetes import client, config
from langchain.tools import StructuredTool, tool
from typing import List
# import json

In [2]:
try:
    config.load_incluster_config()
    print("Loaded in-cluster configuration")
except Exception as e:
    print(f"Exception loading incluster configuration: {e}")
    try:
        config.load_kube_config()
        print("Loaded local kube_config")
    except Exception as e1:
        print(f"Exception loading local kube_config: {e1}")

Loaded in-cluster configuration


In [3]:
class OperatorInfo(BaseModel):
    """Information about an OpenShift Operator"""

    name: str = Field(description="The name of the operator")
    namespace: str = Field(description="The namespace where the operator is present")
    version: str = Field(description="The version of the operator")
    status: str = Field(description="The status of the operator")

In [4]:
class OperatorList(BaseModel):
    """List of the OpenShift Operators"""

    operator_list: List[OperatorInfo] = Field(description="The list of the OpenShift Operator information")

In [5]:
class ToolsInputSchema(BaseModel):
    input_parameters: str = Field(description="The input parameters as a dictionary that uses double quotes for the parameter names. Do NOT enclose the dictionary in any additional quotes.")

In [6]:
class OpListInputSchema(BaseModel):
    namespace: str = Field(description="The namespace string value.")

In [7]:
# Function to list OpenShift operators in a specific namespace
def list_operators(namespace: str) -> OperatorList:
    v1 = client.CustomObjectsApi()
    operators = v1.list_namespaced_custom_object(
        group="operators.coreos.com",
        version="v1alpha1",
        namespace=namespace,
        plural="clusterserviceversions"
    )
    operator_list: List[OperatorInfo] = []
    for item in operators.get("items", []):
        operator_info = OperatorInfo(
            name=item["metadata"]["name"].split(".")[0],
            namespace=item["metadata"]["namespace"],
            version=item["spec"]["version"],
            status=item["status"]["phase"]
        )
        operator_list.append(operator_info)
    return OperatorList(operator_list=operator_list)

In [8]:
operator_list_descr = """
Lists OpenShift operators information in a given namespace.
:param namespace: the string value of the namespace

:return: an object containing the list of operator information for the available operators such as name, namespace, version and status.
"""

In [11]:
tool_operators_list = StructuredTool(
    name="List_OpenShift_Operators",
    func=list_operators,
    args_schema=OpListInputSchema,
    description=operator_list_descr,
    handle_tool_error=True,
    handle_validation_error=True,
)

In [5]:
@tool
def tool_print(input_str: str):
    """
    Use this function to print out any final answer. You should immediatelly stop after this.
    """
    return f'{input_str}'