# Testing script

For each manifest we make a fresh k8s cluster using kind.
Then we deploy the manifest and execute some health checks.
Finally we destroy the cluster before moving on to the next manifest.

In [3]:
# Imports
import subprocess
import os
import time
import base64
import re

import yaml
import json

from kubernetes import client, config, utils
from kubernetes.client.rest import ApiException

from openai import OpenAI, OpenAIError

In [3]:
#Global variables
OPENAI_API_KEY = "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"
OPENAI_ORG = "CnSwXGBvKfezdZ4ka9Pyy54o"
OPENAI_PROJECT = "proj_mJBD3GAWDyRnilAI5pSl0Msx"

In [105]:
# Send OpenAI request
def request_yaml(output, model="gpt-4", n=1):
    client = OpenAI(
        api_key=OPENAI_API_KEY
    )

    messages = [
        {
            "role": "system",
            "content": "You are an expert Kubernetes YAML generator, that only generates valid Kubernetes YAML manifests. You should never provide any explanations. You should always output raw YAML only, and always wrap the raw YAML with ```yaml."
        },
        {
            "role": "user",
            "content": "Create a kubernetes manifest for a wordpress website using a mysql database as back-end. Name the wordpress container wordpress and the mysql container mysql."
        }
    ]

    # Send a request to OpenAI with the specified prompt
    response = client.chat.completions.create(
    model=model,
    messages=messages,
    temperature=0.9,
    max_tokens=2048,
    top_p=1,
    n=n
    )

    # Save each completion in a separate .txt file
    for i, completion in enumerate(response.choices):
        yaml_content = completion.message.content
        
        # Ensure output folder exists
        os.makedirs(output, exist_ok=True)

        file_path = output + "/response_" + str(i) + ".yaml"
        
        # Write each response to a separate .txt file
        with open(file_path, "w") as file:
            file.write(yaml_content)

In [106]:
request_yaml("testyaml", n=5)

In [133]:
def create_yaml(input, output="test.yaml"):
    # Result
    not_wrapped_yaml = False
    explanation = False

    # Load content from input file as plain text
    with open(input, 'r') as file:
        content = file.read()

    # Trim every thin outside of ```yaml and ``` (removes explanation if the LLM gave it even though we asked him not to)
    yaml_start = content.find("```yaml")
    yaml_end = content.find("```", yaml_start + 7)

    # Check if LLM wrapped YAML code in ```yaml and ```
    if yaml_start == -1 or yaml_end == -1:
        not_wrapped_yaml = True
        yaml_code = content
        print("LLM response did not wrap code in ```yaml and ```.")
    else:
        # Check if LLM gave extra explanation outside of the YAML code
        if yaml_start != 0 or yaml_end != len(content) - 3:
            explanation = True
            print("LLM responded with extra explanation.")
        
        # Extract YAML code
        yaml_code = content[7:yaml_end].strip()

    # Parse the YAML code
    yaml_documents = list(yaml.safe_load_all(yaml_code))

    # Save the parsed YAML code to a file
    with open(output, 'w') as file:
        yaml.dump_all(yaml_documents, file)
    print(f"YAML content saved to {output}")

    return not_wrapped_yaml, explanation

In [131]:
create_yaml('testyaml/response_4.yaml')

LLM response did not wrap code in ```yaml and ```.
YAML content saved to test.yaml


(True, False, False)

In [9]:
# Create kind cluster
def create_cluster(cluster_name):
    command = ['kind', 'create', 'cluster', '--name', cluster_name, '--config', 'kind-cluster-config.yaml']
    # Run the command and capture the output
    result = subprocess.run(command, capture_output=True, text=True, check=True)
        
    print("Cluster created successfully")

# Delete kind cluster
def delete_cluster(cluster_name):
    command = ['kind', 'delete', 'cluster', '--name', cluster_name]
    # Run the command and capture the output
    result = subprocess.run(command, capture_output=True, text=True, check=True)
        
    print("Cluster deleted successfully")

In [135]:
create_cluster('test1')


Cluster created successfully


In [134]:
delete_cluster('test')

Cluster deleted successfully


In [2]:
def validate_k8s_manifest(yaml_file):
    try:
        # Run kubeconform command and capture the output
        result = subprocess.run(
            ["kubeconform", "--verbose", "--output=json", yaml_file],
            check=False,
            capture_output=True,
            text=True
        )
        
        # Parse the JSON output
        output = json.loads(result.stdout)

        resources = []
        invalid_resources = []
        is_valid = True

        # Iterate through resources to gather all resources and check for invalid ones
        for resource in output["resources"]:
            resources.append({
                "kind": resource["kind"],
                "name": resource["name"]
            })
            
            # Check if the resource is valid or not
            if resource["status"] != "statusValid":
                is_valid = False
                invalid_resources.append({
                    "kind": resource["kind"],
                    "name": resource["name"],
                    "reason": resource["msg"]
                })

        # Return appropriate result based on validation status
        if is_valid:
            return True, resources, None
        else:
            return False, resources, invalid_resources

    except json.JSONDecodeError as e:
        print(f"Error parsing JSON output: {e}")
        return False, None, [{"error": "Invalid JSON output"}]

In [4]:
result, all_resources, invalid_resources = validate_k8s_manifest("test7.yaml")

if result:
    print("All resources are valid.")
else:
    print(f"Invalid resources: {invalid_resources}")
    print(f"All resources: {all_resources}")

All resources are valid.


In [73]:
def analyze_k8s_manifest(manifest_file):
    """
    Analyzes a Kubernetes manifest file containing multiple documents and returns information about secrets, persistent volumes/claims, and services.
    
    Parameters:
    manifest_file (str): Path to the Kubernetes manifest file.
    
    Returns:
    dict: A dictionary containing the following keys:
        - has_secrets (bool): Whether the manifest contains secrets.
        - persistent_volumes (list): List of persistent volumes, including the type (RWO, RWX, etc.).
        - persistent_volume_claims (list): List of persistent volume claims, including the type.
        - services (list): List of services, including the name and type (ClusterIP, NodePort, LoadBalancer, Ingress).
    """
    has_secrets = False
    namespaces = []
    secrets = []
    persistent_volumes = []
    persistent_volume_claims = []
    services = []
    
    with open(manifest_file, 'r') as file:
        documents = yaml.safe_load_all(file)
        for doc in documents:
            if doc['kind'] == 'Namespace':
                namespaces.append({
                    'name': doc['metadata']['name']
                })
            elif doc['kind'] == 'Secret':
                secrets.append({
                    'name': doc['metadata']['name']
                })
            elif doc['kind'] == 'PersistentVolume':
                persistent_volumes.append({
                    'name': doc['metadata']['name'],
                    'type': doc['spec']['accessModes'][0]
                })
            elif doc['kind'] == 'PersistentVolumeClaim':
                persistent_volume_claims.append({
                    'name': doc['metadata']['name'],
                    'type': doc['spec']['accessModes'][0]
                })
            elif doc['kind'] == 'Service':
                service_type = spec = doc.get('spec', {}).get('type', 'ClusterIP')
                service_name = doc['metadata']['name']
                services.append({
                    'name': service_name,
                    'type': service_type
                })
            elif doc['kind'] == 'Ingress':
                service_name = doc['metadata']['name']
                services.append({
                    'name': service_name,
                    'type': "Ingress"
                })
    
    return namespaces, secrets, persistent_volumes, persistent_volume_claims, services

In [74]:
result = analyze_k8s_manifest('test1.yaml')
print(result)

([], [{'name': 'mysql-pass'}], [], [], [{'name': 'wordpress', 'type': 'ClusterIP'}, {'name': 'mysql', 'type': 'ClusterIP'}])


In [119]:
def encode_secret(input_yaml, output_yaml=None):
    """
        Base64-encodes any plain-text values in the 'data' section of each Secret in a Kubernetes YAML manifest.

        Raises:
            FileNotFoundError: If the YAML file is not found.
            ValueError: If there are no secrets in the YAML manifest.
            yaml.YAMLError: If there is an issue with parsing the YAML file.
    """
    try:
        # Load the YAML file
        with open(input_yaml, 'r') as file:
            yaml_documents = list(yaml.safe_load_all(file))
        
        secrets_found = False

        # Process each document individually
        for doc in yaml_documents:
            # Check if the document is a Secret resource
            if doc.get("kind") == "Secret" and "data" in doc:
                for key, value in doc["data"].items():
                    # Check if the value is not base64-encoded (valid base64 strings have a length multiple of 4)
                    if not re.fullmatch(r'[A-Za-z0-9+/=]+', value) or len(value) % 4 != 0:
                        # Encode plain text key to base64 and update the secret data field
                        doc["data"][key] = base64.b64encode(key.encode()).decode('utf-8')
                        secrets_found = True  # Mark that we found at least one Secret where base64-encoding was needed

        # Check if no Secret resources were found with non base64encoded values, print log
        if not secrets_found:
            print(f"No Secrets that needed base64encoding needed in the YAML file: {input_yaml}")

        # If output_file_path is not provided, overwrite the original file
        if output_yaml is None:
            output_yaml = input_yaml

        # Write the updated YAML back to the file with all documents
        with open(output_yaml, 'w') as file:
            yaml.dump_all(yaml_documents, file)
        
        return output_yaml
    
    except FileNotFoundError:
        raise FileNotFoundError(f"Error YAML file '{input_yaml}' not found.")
    except yaml.YAMLError as e:
        raise yaml.YAMLError(f"Error while parsing YAML file: {e}")
    except Exception as e:
        raise Exception(f"Unexpected error: {e}")

In [None]:
try:
    modified_yaml_content = encode_secret('updated1.yaml')
    print(f"Secrets base64-encoded successfully. Updated file: {modified_yaml_content}")
except ValueError as e:
    print(e)

In [13]:
def add_probes_to_container(input_yaml, container_name, liveness_probe, readiness_probe, output_yaml=None):
    """
        Adds a probes to a container in a Kubernetes YAML manifest.

        Raises:
            FileNotFoundError: If the YAML file is not found.
            KeyError: If the specified container is not found in the YAML manifest.
            yaml.YAMLError: If there is an issue with parsing the YAML file.
    """
    try:
        # Load the YAML file
        with open(input_yaml, 'r') as file:
            yaml_documents = list(yaml.safe_load_all(file))
        
        container_found = False

        # Loop through each document and find the container
        for doc in yaml_documents:
            containers = doc.get('spec', {}).get('template', {}).get('spec', {}).get('containers', [])
            for container in containers:
                if container['name'] == container_name:
                    # Add startup Probe to the container
                    container['livenessProbe'] = liveness_probe
                    container['readinessProbe'] = readiness_probe
                    container_found = True
                    break
        
        if not container_found:
            raise KeyError(f"Container with name {container_name} not found in the YAML file.")

        # If output_file_path is not provided, overwrite the original file
        if output_yaml is None:
            output_yaml = input_yaml

        # Write the updated YAML back to the file with all documents
        with open(output_yaml, 'w') as file:
            yaml.dump_all(yaml_documents, file)
        
        return output_yaml
    
    except FileNotFoundError:
        raise FileNotFoundError(f"Error YAML file '{input_yaml}' not found.")
    
    except yaml.YAMLError as e:
        raise yaml.YAMLError(f"Error while parsing YAML file: {e}")

    except Exception as e:
        raise Exception(f"Unexpected error: {e}")

In [14]:
liveness_probe_wordpress = {
    'tcpSocket': {
        'port': 80
    },
    'initialDelaySeconds': 30,
    'timeoutSeconds': 5,
    'periodSeconds': 10,
    'failureThreshold': 3
}

readiness_probe_wordpress = {
    'httpGet': {
        'path': "/wp-admin/install.php",
        'port': 80
    },
    'initialDelaySeconds': 30,
    'timeoutSeconds': 5,
    'periodSeconds': 5
}

liveness_probe_mysql = {
    'tcpSocket': {
        'port': 3306
    },
    'initialDelaySeconds': 20,
    'timeoutSeconds': 5,
    'periodSeconds': 10,
    'failureThreshold': 3
}

readiness_probe_mysql = {
    'exec': {
        'command': ["mysqladmin", "ping", "-h", "localhost"]
    },
    'initialDelaySeconds': 20,
    'timeoutSeconds': 5,
    'periodSeconds': 5
}

In [None]:
try:
    add_probes_to_container('yaml_indentation_issue.yaml', 'mysql', liveness_probe_mysql, readiness_probe_mysql, 'updated1.yaml')
    updated_file_path = add_probes_to_container('updated1.yaml', 'wordpress', liveness_probe_wordpress, readiness_probe_wordpress)
    print(f"Startup probes added successfully. Updated file: {updated_file_path}")
except Exception as e:
    print(f"Failed to add readiness probe: {e}")

In [5]:
# Deploy YAML manifest
def deploy_manifest(context, yaml_file):
    config.load_kube_config(context=context)

    k8s_client = client.ApiClient()

    # Apply the manifest
    utils.create_from_yaml(k8s_client, yaml_file)

In [6]:
deploy_manifest("kind-test", 'yaml_indentation_issue.yaml')

ConfigException: Invalid kube-config file. Expected object with name kind-test in /home/cavefeet/.kube/config/contexts list

In [81]:
def check_pv_health(context):
    # Load the Kubernetes configuration (assumes the script is running in a pod or has access to kubeconfig)
    config.load_kube_config(context=context)  # Use load_incluster_config() if running inside a Kubernetes pod
    
    v1 = client.CoreV1Api()
    all_bound = True
    issues = []

    # Check PV status
    pvs = v1.list_persistent_volume().items
    for pv in pvs:
        if pv.status.phase != 'Bound':
            all_bound = False
            issues.append({
                'status': pv.status.phase,
                'reason': pv.status.message or 'No message provided'
            })

    # Return True if all PVs are bound, else return False and the issues list
    return all_bound, issues

In [80]:
def check_pvc_health(context, namespace="default"):
    # Load the Kubernetes configuration (assumes the script is running in a pod or has access to kubeconfig)
    config.load_kube_config(context=context)  # Use load_incluster_config() if running inside a Kubernetes pod
    
    v1 = client.CoreV1Api()
    all_bound = True
    issues = []

    # Check PVC status
    pvcs = v1.list_persistent_volume_claim(namespace).items
    for pvc in pvcs:
        if pvc.status.phase != 'Bound':
            all_bound = False
            issues.append({
                'name': pvc.metadata.name,
                'status': pvc.status.phase,
                'reason': pvc.status.message or 'No message provided'
            })

    # Return True if all PVCs are bound, else return False and the issues list
    return all_bound, issues

In [None]:
def check_pods_health(context, namespace="default"):
    """
    Check detailed pod and container health status including container states and events.
    All containers in a pod must be running for the pod to be considered healthy.
    
    :param context: The kubernetes context to use
    :param namespace: The namespace to check. Defaults to 'default'.
    :return: A tuple containing a boolean (True if all pods and their containers are running, False otherwise) 
            and a list of problematic pod details including container states and events.
    """
    try:
        config.load_kube_config(context=context)
        v1 = client.CoreV1Api()
        v1_events = client.CoreV1Api()
        # Get all pods in the specified namespace
        pods = v1.list_namespaced_pod(namespace=namespace)
        problematic_pods = []

        for pod in pods.items:
            pod_name = pod.metadata.name
            pod_namespace = pod.metadata.namespace
            pod_status = pod.status.phase
            
            # Initialize pod_has_problems flag
            pod_has_problems = False
            
            # Check if pod phase itself is not Running
            if pod_status != "Running":
                pod_has_problems = True
                DEBUG 
            
            # Check container statuses even if pod is "Running"
            if pod.status.container_statuses:
                for container in pod.status.container_statuses:
                    # If any container is not ready or has restarts, mark pod as problematic
                    if not container.ready or container.restart_count > 0:
                        pod_has_problems = True
                        break

            if pod_has_problems:
                pod_details = {
                    'name': pod_name,
                    'status': pod_status,
                    'containers': []
                }
                
                # Get container statuses
                if pod.status.container_statuses:
                    for container in pod.status.container_statuses:
                        container_status = {
                            'name': container.name,
                            'ready': container.ready,
                            'restart_count': container.restart_count,
                        }
                
                problematic_pods.append(pod_details)

        if not problematic_pods:
            return True, None
        else:
            return False, problematic_pods

    except ApiException as e:
        raise ApiException(f"Kubernetes API Error: {e}")
    except Exception as e:
        raise Exception(f"Unexpected error: {e}")

In [17]:
def check_pods_readiness(context, namespace="default"):
    """
    Check if all pods in the specified namespace have passed their readiness probes.
    If not, retrieve the latest event related to readiness probe failure.
    
    :param namespace: The namespace to check. Defaults to 'default'.
    :return: A tuple containing a boolean (True if all pods are ready, False otherwise)
             and a list of non-ready pods with reasons (event details or generic message).
    """
    try:
        config.load_kube_config(context=context)  # Or use config.load_incluster_config() if running inside a cluster

        v1 = client.CoreV1Api()
        v1_events = client.CoreV1Api()

        # Get all pods in the specified namespace
        # pods = v1.list_namespaced_pod(namespace)
        pods = v1.list_pod_for_all_namespaces()
    
        non_ready_pods = []

        # Iterate over the pods to check their readiness
        for pod in pods.items:
            pod_name = pod.metadata.name
            pod_ready = True  # Assume the pod is ready unless proven otherwise

            # Check the readiness condition for the pod
            for condition in pod.status.conditions:
                if condition.type == "Ready" and condition.status != "True":
                    pod_ready = False

                    # If the pod is not ready, fetch the related events
                    # events = v1_events.list_namespaced_event(namespace=namespace, field_selector=f"involvedObject.name={pod_name}")
                    events = v1_events.list_event_for_all_namespaces(field_selector=f"involvedObject.name={pod_name}")
                    readiness_probe_failure_event = None

                    # Find the latest event mentioning readiness probe failure
                    for event in events.items:
                        if "Readiness probe failed" in event.message:
                            readiness_probe_failure_event = event.message
                            break
                
                    # If an event explaining the readiness probe failure exists, use it
                    if readiness_probe_failure_event:
                        non_ready_pods.append({
                            'name': pod_name,
                            'status': 'Readiness probe failed',
                            'reason': readiness_probe_failure_event
                        })
                    else:
                        # If no readiness probe failure event
                        non_ready_pods.append({
                            'name': pod_name,
                            'status': 'ContainersNotReady',
                            'reason': 'Readiness probe not run'
                        })
                    break  # No need to check further conditions if pod isn't ready
                
    except ApiException as e:
        raise ApiException(f"Kubernetes API Error: {e}")
    
    except Exception as e:
        raise Exception(f"Unexpected error deploying: {e}")

    # Return True if all pods are ready, otherwise return False with details
    if not non_ready_pods:
        return True, None
    else:
        return False, non_ready_pods

In [132]:
all_pods_running, result = check_pods_health("kind-test")
if all_pods_running:
    print("Health check passed: All pods are running.")
else:
    print(f"Health check failed: Non-running pods found: {result}")

all_pods_ready, result = check_pods_readiness("kind-test")
if all_pods_ready:
    print("Health check passed: All pods are ready.")
else:
    print(f"Health check failed: Non-ready pods found: {result}")

Health check failed: Non-running pods found: [{'name': 'mysql-5c8d5db747-jn7jk', 'status': 'Pending', 'containers': [{'name': 'mysql', 'ready': False, 'restart_count': 0, 'state': 'Waiting', 'reason': 'CreateContainerConfigError', 'message': 'secret "db-password" not found'}]}, {'name': 'wordpress-8699f6c4b4-btwjj', 'status': 'Pending', 'containers': [{'name': 'wordpress', 'ready': False, 'restart_count': 0, 'state': 'Waiting', 'reason': 'CreateContainerConfigError', 'message': 'secret "db-password" not found'}]}]
Health check failed: Non-ready pods found: [{'name': 'mysql-5c8d5db747-jn7jk', 'status': 'ContainersNotReady', 'reason': 'Readiness probe not run'}, {'name': 'wordpress-8699f6c4b4-btwjj', 'status': 'ContainersNotReady', 'reason': 'Readiness probe not run'}]


In [None]:
json_result = {
    "info": {
        "namespaces": None,
        "secrets": None,
        "pv": None,
        "pvc": None,
        "services": None
    },
    "LLM_response_issues": {
        "not_wrapped": False,
        "extra_explanation": False,
        "base64_encoding_needed": False,
    },
    "health": {
        "pv_health": None,
        "pvc_health": None,
        "pods_health": None
    },
    "failed": {
        "reason": None,
        "explanation": None # health failed
    }
}

In [None]:
def testing(input_path, cluster_name, index, dir):

    output_path = dir + "/manifests/manifest_" + str(index) + ".yaml"

    context = "kind-" + cluster_name
    namespace = "default"

    json_result = {}
    json_result["file"] = output_path

    failed = {
        "reason": None,
        "explanation": None,
    }

    # Create YAML file from LLM response
    try: 
        not_wrapped_yaml, explanation = create_yaml(input_path, output_path)
    except yaml.YAMLError as e:
        failed["reason"] = 
        json_result["failed"] = "YAML error"
        return json_result
        print("Invalid YAML:", e)        
    
    # Validate YAML file against k8s scheme and get info about the resources in the file
    valid, all_resources, invalid_resources = validate_k8s_manifest(output_path)

    if (not valid):
        print('manifest failed conform')
        print(invalid_resources)
        json_result["k8s_validation"] = invalid_resources
        json_result["failed"] = invalid_resources
        return json_result
    
    # Get info about the manifest (are there namespaces, secrets, service, PV or PVC defined)
    namespaces, secrets, pvs, pvcs, services = analyze_k8s_manifest(output_path)

    json_info = {
        "namespaces": None,
        "secrets": None,
        "pv": None,
        "pvc": None,
        "services": None
    }

    if not namespaces:
        namespace = namespace[0]
        json_info["namespaces"] = namespaces
    
    if not secrets:
        json_info["secrets"] = secrets

        # Base64-encode secrets if needed
        encode_secret(output_path)
        #TODO: update json result if secrets needed base64 encoding

    if not pvs:
        json_info["pv"] = pvs

    if not pvcs:
        json_info["pvc"] = pvcs

    if not services:
        json_info["services"] = services

    # Add probes
    try:
        add_probes_to_container(output_path, 'mysql', liveness_probe_mysql, readiness_probe_mysql)
    except KeyError as e:
        print(e)
        json_result["failed"] = "No Container found named mysql."
        return json_result # Stop testing

    try:
        add_probes_to_container(output_path, 'wordpress', liveness_probe_wordpress, readiness_probe_wordpress)
    except KeyError as e:
        print(e)
        json_result["failed"] = "No Container found named wordpress."
        return json_result # Stop testing
    
    # Create cluster
    try:
        # TODO: change to ingress/loadbalancer cluster / RWX cluster if needed
        create_cluster(cluster_name)

    except subprocess.CalledProcessError as e:
        # Handle the case where the command fails
        print(f"Failed to create the cluster: {e}")
        print(e.stdout)
        print(e.stderr)
        json_result["failed"] = "Failed to create cluster."
        return json_result # Stop testing
    
    # Deploy manifest TODO: catch errors
    try:
        deploy_manifest(context, output_path)
    except ApiException as e:
        print(e)
        json_result["failed"] = e
        return json_result # Stop testing
    
    # Check health of cluster
    timeout = 100
    periode = 5
    start_time = time.time()
    healthy = False
    result_ready = None
    # As long as timeout is not reached and cluster is not healthy check cluster
    while (time.time() - start_time < timeout) and (not healthy):
        bound_pvc, issues_pvc = check_pvc_health(context, namespace)
        bound_pv, issues_pv = check_pv_health(context, namespace)
        all_pods_running, result_health = check_pods_health(context)
        if all_pods_running and bound_pv and bound_pv:
            healthy = True
            break    
        time.sleep(periode)
    
    if healthy:
        print('healthy')
    else:
        print('unhealthy')
        print(result_health)
        print(result_ready)
        json_result["pods_health"] = result_health

    # Delete cluster crash if cluster could not be deleted
    # TODO: change to ingress/loadbalancer cluster / RWX cluster if needed
    delete_cluster(cluster_name)
        
    return json_result

In [90]:
def test_procedure(file_path, cluster_name):
    #Test Results, if None, the test passed
    json_result = {
        "file": file_path,
        "LLM_request": None,
        "create_cluster": None,
        "delete_cluster": None,
        "yaml_error" : None, # Valid YAML file
        "k8s_validation": None, # Valid k8s manifest (dry run on client)
        "containers": None, # Find wordpress and mysql container (by name)
        "deploy": None, # Deploy to cluster (~= dry run on server)
        "pods_health": None, # Health check if pods are running and container are ready
        "pv_check": None,
        "pvc_check": None,
        "secrets_check": None, # Check if secrets needed base64 encoding
        "info": None # Info about the yaml file (from analyze k8s manifest)
    }
    context = "kind-" + cluster_name
    json_result["file"] = file_path
    namespace = "default"

    # Request YAML from LLM
    try:
        yaml_content = request_yaml()
    except OpenAIError as e:
        print("OpenAI API error:", e)
        json_result["LLM_request"] = e
        return json_result # Stop testing
    except Exception as e:
        print("An unexpected error occurred:", e)
        return json_result # Stop testing

    # Create YAML file
    try:
        create_yaml(yaml_content, file_path) #TODO: yaml parsing error = LLM made YAML mistake
    # YAML conten contained non YAML code
    except yaml.scanner.ScannerError as e:
        print(e)
        json_result["yaml_error"] = e
        json_result["failed"] = "LLM responded with non YAML code"
        return json_result # Stop testing
    except yaml.YAMLError as e:
        print(e)
        json_result["yaml_error"] = e
        json_result["failed"] = "YAML error"
        return json_result # Stop testing
    except Exception as e:
        json_result["yaml_error"] = e
        json_result["failed"] = "Unexpected YAML error"
        print(f"Unexpected error creating YAML file: {e}")
        return json_result # Stop testing

    

    # Validate YAML file against k8s scheme and get info about the resources in the file
    valid, all_resources, invalid_resources = validate_k8s_manifest(file_path)

    if (not valid):
        print('manifest failed conform')
        print(invalid_resources)
        json_result["k8s_validation"] = invalid_resources
        return json_result # Stop testing
    
    # Get info about the manifest (are there namespaces, secrets, service, PV or PVC defined)
    namespaces, secrets, pvs, pvcs, services = analyze_k8s_manifest(file_path)

    json_info = {
        "namespaces": None,
        "secrets": None,
        "pv": None,
        "pvc": None,
        "services": None
    }

    if not namespaces:
        namespace = namespace[0]
        json_info["namespaces"] = namespaces
    
    if not secrets:
        json_info["secrets"] = secrets

        # Base64-encode secrets if needed
        encode_secret(file_path)

        #TODO: update json result if secrets needed base64 encoding

    if not pvs:
        json_info["pv"] = pvs

        # Check PVs
        bound, issues = check_pv_health(context)
        if not bound:
            json_result["pv_check"] = issues

    if not pvcs:
        json_info["pvc"] = pvcs

        # Check PVCs
        bound, issues = check_pvc_health(context, namespace)
        if not bound:
            json_result["pvc_check"] = issues

    if not services:
        json_info["services"] = services


    try:
        # Create cluster TODO: change to ingress/loadbalancer cluster / RWX cluster if needed
        create_cluster(cluster_name)

    except subprocess.CalledProcessError as e:
        # Handle the case where the command fails
        print(f"Failed to create the cluster: {e}")
        print(e.stdout)
        print(e.stderr)
        json_result["create_cluster"] = e
        return json_result # Stop testing
    

    # Add probes
    try:
        add_probes_to_container(file_path, 'mysql', liveness_probe_mysql, readiness_probe_mysql)
        add_probes_to_container(file_path, 'wordpress', liveness_probe_wordpress, readiness_probe_wordpress)
    except KeyError as e:
        print(e)
        json_result["containers"] = e
        return json_result # Stop testing
    
    print('Added probes')
    # Deploy manifest TODO: catch errors
    try:
        deploy_manifest(context, file_path)
    except ApiException as e:
        print(e)
        json_result["deploy"] = e
        return json_result # Stop testing
    
    print('Deployed manifest')
    
    # Check health of cluster
    timeout = 100
    periode = 5
    start_time = time.time()
    healthy = False
    result_ready = None
    # As long as timeout is not reached and cluster is not healthy check cluster
    while (time.time() - start_time < timeout) and (not healthy):
        all_pods_running, result_health = check_pods_health(context)
        bound_pv, issues_pv = check_pv_health(context)
        bound_pvc, issues_pvc = check_pvc_health(context)
        if all_pods_running:
            if bound_pv:
                if bound_pvc:
                    healthy = True
            healthy = True
            # Check PV/PVC
            bound, issues = check_pv_health(context)
            if not bound:
                json_result["pv_check"] = issues
            bound, issues = check_pvc_health(context, namespace)
            if not bound:
                json_result["pvc_check"] = issues
            break
        time.sleep(periode)
    
    if healthy:
        print('healthy')
    else:
        print('unhealthy')
        print(result_health)
        print(result_ready)
        json_result["pods_health"] = result_health

    # Delete cluster crash if cluster could not be deleted
    try:
        # Create cluster TODO: change to ingress/loadbalancer cluster / RWX cluster if needed
        delete_cluster(cluster_name)

    except subprocess.CalledProcessError as e:
        # Handle the case where the command fails
        print(f"Failed to delete the cluster: {e}")
        print(e.stdout)
        print(e.stderr)
        json_result["delete_cluster"] = e
        return json_result # Stop testing
    
    return json_result

In [84]:
delete_cluster("test1")

Cluster deleted successfully


In [87]:
# Valid YAML input
yaml_content = """
- name: example
  value: 42
"""


# Non-YAML input
non_yaml_content = "This is not valid YAML."


# Invalid YAML input
invalid_yaml_content = """
- name: example
  value: 42
invalid_key: value
"""


try:
    create_yaml(yaml_content, "example.yaml")
    # YAML conten contained non YAML code
except yaml.scanner.ScannerError as e:
    print(f"Scanner error creating YAML file: {e}")
except yaml.YAMLError as e:
    print(f"YAML error creating YAML file: {e}")
except Exception as e:
    print(f"Unexpected error creating YAML file: {e}")

try:
    create_yaml(non_yaml_content, "non_yaml.yaml")
    # YAML conten contained non YAML code
except yaml.scanner.ScannerError as e:
    print(f"Scanner error creating YAML file: {e}")
except yaml.YAMLError as e:
    print(f"YAML error creating YAML file: {e}")
except Exception as e:
    print(f"Unexpected error creating YAML file: {e}")

try:
    create_yaml(invalid_yaml_content, "invalid_yaml.yaml")
    # YAML conten contained non YAML code
except yaml.scanner.ScannerError as e:
    print(f"Scanner error creating YAML file: {e}")
except yaml.YAMLError as e:
    print(f"YAML error creating YAML file: {e}")
except Exception as e:
    print(f"Unexpected error creating YAML file: {e}")

YAML content saved to example.yaml
YAML content saved to non_yaml.yaml
YAML error creating YAML file: while parsing a block collection
  in "<unicode string>", line 2, column 1:
    - name: example
    ^
expected <block end>, but found '?'
  in "<unicode string>", line 4, column 1:
    invalid_key: value
    ^


In [91]:
results = []
for i in range(10): 
    results.append(test_procedure("test" + str(i) + ".yaml", "test" + str(i)))
print(results)

Request successful
while scanning a simple key
  in "<unicode string>", line 124, column 1:
    Replace `YOUR_BASE64_ENCODED_PAS ... 
    ^
could not find expected ':'
  in "<unicode string>", line 124, column 84:
     ... tual password encoded in base64.
                                         ^
Request successful
YAML content saved to test1.yaml
Request successful
YAML content saved to test2.yaml
Request successful
while scanning a simple key
  in "<unicode string>", line 112, column 1:
    Replace YOUR_BASE64_ENCODED_SECR ... 
    ^
could not find expected ':'
  in "<unicode string>", line 112, column 73:
     ... your password encoded in base64.
                                         ^
Request successful
YAML content saved to test4.yaml
Request successful
YAML content saved to test5.yaml
Request successful
YAML content saved to test6.yaml
Request successful
while scanning a simple key
  in "<unicode string>", line 113, column 1:
    Please update the `mysql-pass` K ... 
    ^
co

In [85]:
results = []
for i in range(10): 
    results.append(test_procedure("test" + str(i) + ".yaml", "test" + str(i)))
print(results)

Request successful
while scanning a simple key
  in "<unicode string>", line 106, column 1:
    You'll also need to create a sec ... 
    ^
could not find expected ':'
  in "<unicode string>", line 106, column 102:
     ... rd` for the MYSQL root password.
                                         ^
Request successful
YAML content saved to test1.yaml
Cluster created successfully
Added probes
No Secrets that needed base64encoding needed in the YAML file: test1.yaml
Secret fix
Deployed manifest
unhealthy
[{'name': 'mysql-656cc8fbd4-7t79t', 'status': 'Pending', 'containers': [{'name': 'mysql', 'ready': False, 'restart_count': 0, 'state': 'Waiting', 'reason': 'CreateContainerConfigError', 'message': 'secret "mysql-pass" not found'}]}, {'name': 'wordpress-7dbfbf5c66-9kgqt', 'status': 'Pending', 'containers': [{'name': 'wordpress', 'ready': False, 'restart_count': 0, 'state': 'Waiting', 'reason': 'CreateContainerConfigError', 'message': 'secret "mysql-pass" not found'}]}]
None
Cluster deleted

In [86]:
# Save the JSON results to a file
with open("results.json", "w") as json_file:
    json.dump(results, json_file, indent=4)

TypeError: Object of type ScannerError is not JSON serializable

In [27]:
create_cluster("test")

Cluster created successfully
