In [41]:
!pip install setuptools==67.2.0 -q
!pip install --no-cache-dir mercury -q
#To see the summary of cluster
%reload_ext autoreload
%autoreload 2

%reload_ext dotenv
%dotenv
    
from docbooks.lib.sql import Cluster
import pandas as pd
c = Cluster.from_id(id="")
snap = c.snapshot
snap.dump_to_file("cluster_snap.json")

cannot find .env file


<h3>The above code should be executed, then allow some time for the snapshot JSON (cluster_snap.json) file to be generated.</h3>

<h2>Deployment Analyzer</h2>

In [49]:
import json
import logging
import pandas as pd
import mercury as mr

logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

# Helper function to match a deployment with PDB or HPA
def get_resource_for_deployment(deployment, resources, resource_type):
    pod_labels = deployment.get('spec', {}).get('template', {}).get('metadata', {}).get('labels', {})
    name = deployment.get('metadata', {}).get('name', 'N/A')
    namespace = deployment.get('metadata', {}).get('namespace', 'N/A')
    if resources is None:
        return 'N/A'

    for resource in resources:
        res_namespace = resource.get('metadata', {}).get('namespace', 'N/A')
        if resource_type == 'pdb':
            res_selector = resource.get('spec', {}).get('selector', {}).get('matchLabels', {})
            if res_namespace == namespace:
                # Check if all PDB labels are present in the deployment's labels
                if all(item in pod_labels.items() for item in res_selector.items()):
                    return resource
        elif resource_type == 'hpa':
            hpa_target = resource.get('spec', {}).get('scaleTargetRef', {})
            hpa_name = hpa_target.get('name', 'N/A')
            if hpa_name == name and res_namespace == namespace:
                return resource
    return 'N/A'

# Function to get PVCs for a given deployment
def get_pvcs_for_deployment(deployment, pvcs):
    matched_pvcs = []
    volume_claims = deployment.get('spec', {}).get('template', {}).get('spec', {}).get('volumes', [])

    for claim in volume_claims:
        claim_name = claim.get('persistentVolumeClaim', {}).get('claimName', 'N/A')
        matched_pvcs.extend(pvc for pvc in pvcs if pvc.get('metadata', {}).get('name') == claim_name 
                            and pvc.get('metadata', {}).get('namespace') == deployment.get('metadata', {}).get('namespace'))
    return matched_pvcs

# Function to extract data and save it as JSON, and return a DataFrame and JSON structure
def extract_and_save(json_data):
    try:
        data = json.loads(json_data)
    
        # Extracting data from JSON
        deployments = data.get('deploymentList', {}).get('items', [])
        if not isinstance(deployments, list):
            return pd.DataFrame(), []
        resources = {
            'pdbs': data.get('podDisruptionBudgetList', {}).get('items', []),
            'hpas': data.get('horizontalPodAutoscalerList', {}).get('items', []),
            'pvcs': data.get('persistentVolumeClaimList', {}).get('items', []),
            'pvs': data.get('persistentVolumeList', {}).get('items', [])
        }
    
        csv_columns = [
            'Deployment Name', 'Namespace', 'Affinity', 'Topology Spread Constraints',
            'Node Selector', 'Tolerations', 
            'PDB Spec', 
            'HPA Spec', 'PVC', 'PV'
        ]
    
        csv_data = []
        json_data_list = []
    
        for deployment in deployments:
            name = deployment.get('metadata', {}).get('name', 'N/A')
            namespace = deployment.get('metadata', {}).get('namespace', 'N/A')
            spec = deployment.get('spec', {}).get('template', {}).get('spec', {})
            
            pdb = get_resource_for_deployment(deployment, resources['pdbs'], 'pdb')
            hpa = get_resource_for_deployment(deployment, resources['hpas'], 'hpa')
            matched_pvcs = get_pvcs_for_deployment(deployment, resources['pvcs'])
            # Update this part to consider namespace when matching PVs
            matched_pvs = []
            for pvc in matched_pvcs:
                pvc_name = pvc.get('metadata', {}).get('name')
                pvc_namespace = pvc.get('metadata', {}).get('namespace')
                # Find matching PVs that correspond to the PVC and are in the same namespace
                matched_pvs.extend(pv for pv in resources['pvs'] if
                                   pv.get('spec', {}).get('claimRef', {}).get('name') == pvc_name and
                                   pv.get('spec', {}).get('claimRef', {}).get('namespace') == pvc_namespace)

            
            row_data = {
                'Deployment Name': name,
                'Namespace': namespace,
                'Affinity': spec.get('affinity', {}),
                'Topology Spread Constraints': spec.get('topologySpreadConstraints', []),
                'Node Selector': spec.get('nodeSelector', {}),
                'Tolerations': spec.get('tolerations', []),
                'PDB Spec': pdb.get('spec') if pdb != 'N/A' else 'N/A',
                'HPA Spec': hpa.get('spec') if hpa != 'N/A' else 'N/A',
                'PVC': matched_pvcs,
                'PV': matched_pvs
            }
            csv_data.append(list(row_data.values()))
            json_data_list.append(row_data)
        
        # Convert to DataFrame and JSON format
        df = pd.DataFrame(csv_data, columns=csv_columns)
        return df, json_data_list

    except (json.JSONDecodeError, FileNotFoundError) as e:
        logging.error(f"An error occurred: {e}")
        return pd.DataFrame(), []  # Return an empty DataFrame and JSON list on error

# Main function to execute the script
def main():
    try:
        with open('cluster_snap.json', 'r') as json_file:
            json_data = json_file.read()
        df, json_data_list = extract_and_save(json_data)
        
        mr.Table(df)
        #mr.JSON(json_data_list)

    except FileNotFoundError as e:
        logging.error(f"File not found: {e}")

if __name__ == '__main__':
    main()

<h2>StatefulSet Analyzer (Configurations)</h2>

In [48]:
import json
import logging
import pandas as pd
import mercury as mr

logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

# Helper function to match a stateful set with PDB or HPA
def get_resource_for_statefulset(statefulset, resources, resource_type):
    pod_labels = statefulset.get('spec', {}).get('template', {}).get('metadata', {}).get('labels', {})
    name = statefulset.get('metadata', {}).get('name', 'N/A')
    namespace = statefulset.get('metadata', {}).get('namespace', 'N/A')
    if resources is None:
        return 'N/A'

    for resource in resources:
        res_namespace = resource.get('metadata', {}).get('namespace', 'N/A')
        if resource_type == 'pdb':
            res_selector = resource.get('spec', {}).get('selector', {}).get('matchLabels', {})
            if res_namespace == namespace:
                # Check if all PDB labels are present in the deployment's labels
                if all(item in pod_labels.items() for item in res_selector.items()):
                    return resource
        elif resource_type == 'hpa':
            hpa_target = resource.get('spec', {}).get('scaleTargetRef', {})
            hpa_name = hpa_target.get('name', 'N/A')
            if hpa_name == name and res_namespace == namespace:
                return resource
    return 'N/A'

# Function to extract PVCs for a given stateful set
def get_volume_claim_templates(statefulset):
    return statefulset.get('spec', {}).get('volumeClaimTemplates', [])

# Function to extract data and save as JSON, returning DataFrame and JSON structure
def extract_statefulset_data(json_data):
    try:
        data = json.loads(json_data)
    
        # Extracting data from JSON
        statefulsets = data.get('statefulSetList', {}).get('items', [])
        if not isinstance(statefulsets, list):
            return pd.DataFrame(), []
        resources = {
            'pdbs': data.get('podDisruptionBudgetList', {}).get('items', []),
            'hpas': data.get('horizontalPodAutoscalerList', {}).get('items', [])
        }
    
        csv_columns = [
            'StatefulSet Name', 'Namespace', 'Affinity', 'Topology Spread Constraints',
            'Node Selector', 'Tolerations', 'Volume Claim Templates', 'Service Name',
            'PDB Spec', 'HPA Spec'
        ]
    
        csv_data = []
        json_data_list = []
    
        for statefulset in statefulsets:
            name = statefulset.get('metadata', {}).get('name', 'N/A')
            namespace = statefulset.get('metadata', {}).get('namespace', 'N/A')
            spec = statefulset.get('spec', {}).get('template', {}).get('spec', {})
            
            pdb = get_resource_for_statefulset(statefulset, resources['pdbs'], 'pdb')
            hpa = get_resource_for_statefulset(statefulset, resources['hpas'], 'hpa')
            volume_claim_templates = get_volume_claim_templates(statefulset)
            
            row_data = {
                'StatefulSet Name': name,
                'Namespace': namespace,
                'Affinity': spec.get('affinity', {}),
                'Topology Spread Constraints': spec.get('topologySpreadConstraints', []),
                'Node Selector': spec.get('nodeSelector', {}),
                'Tolerations': spec.get('tolerations', []),
                'Volume Claim Templates': volume_claim_templates,
                'Service Name': statefulset.get('spec', {}).get('serviceName', 'N/A'),
                'PDB Spec': pdb.get('spec') if pdb != 'N/A' else 'N/A',
                'HPA Spec': hpa.get('spec') if hpa != 'N/A' else 'N/A'
            }
            csv_data.append(list(row_data.values()))
            json_data_list.append(row_data)
        
        # Convert to DataFrame and JSON format
        df = pd.DataFrame(csv_data, columns=csv_columns)
        return df, json_data_list

    except (json.JSONDecodeError, FileNotFoundError) as e:
        logging.error(f"An error occurred: {e}")
        return pd.DataFrame(), []  # Return an empty DataFrame and JSON list on error

# Main function to execute the script
def main():
    try:
        with open('cluster_snap.json', 'r') as json_file:
            json_data = json_file.read()
        df, json_data_list = extract_statefulset_data(json_data)
        mr.Table(df)
        #mr.JSON(json_data_list)

    except FileNotFoundError as e:
        logging.error(f"File not found: {e}")

if __name__ == '__main__':
    main()

<h2>StatefulSet Analyzer (Storage Details)</h2>

In [47]:
import json
import logging
import pandas as pd
import mercury as mr

logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

# Helper function to match PVCs for a pod
def get_pvc_for_pod(pod, all_pvcs):
    pod_namespace = pod.get('metadata', {}).get('namespace', 'N/A')  # Get the pod's namespace
    return [pvc for claim in pod.get('spec', {}).get('volumes', []) if 'persistentVolumeClaim' in claim
            for pvc in all_pvcs 
            if (pvc.get('metadata', {}).get('name') == claim['persistentVolumeClaim'].get('claimName', 'N/A') and 
                pvc.get('metadata', {}).get('namespace') == pod_namespace)]  # Check namespace



# Helper function to match PVs for a PVC
def get_pv_for_pvc(pvc, pvs):
    pvc_namespace = pvc.get('metadata', {}).get('namespace', 'N/A')  # Get the PVC's namespace
    pvc_name = pvc.get('metadata', {}).get('name', 'N/A')  # Get the PVC's name
    return [pv for pv in pvs if pv.get('spec', {}).get('claimRef', {}).get('name') == pvc_name and
            pv.get('spec', {}).get('claimRef', {}).get('namespace') == pvc_namespace]  # Check namespace

# Function to extract StatefulSet, PVC, and PV data
def extract_statefulset_storage_data(json_data):
    try:
        data = json.loads(json_data)
        statefulsets = data.get('statefulSetList', {}).get('items', [])
        if not isinstance(statefulsets, list):
            return pd.DataFrame(), []
        
        pvcs = data.get('persistentVolumeClaimList', {}).get('items', [])
        pvs = data.get('persistentVolumeList', {}).get('items', [])
        pod_list = data.get('podList', {}).get('items', [])

        csv_columns = ['Pod Name', 'StatefulSet Name', 'Namespace', 'PVC', 'PV']
        csv_data = []
        json_data_output = []

        for statefulset in statefulsets:
            statefulset_name = statefulset.get('metadata', {}).get('name', 'N/A')
            namespace = statefulset.get('metadata', {}).get('namespace', 'N/A')

            for pod in pod_list:
                pod_name = pod.get('metadata', {}).get('name', 'N/A')
                if pod_name.startswith(statefulset_name):
                    matched_pvcs = get_pvc_for_pod(pod, pvcs)
                    matched_pvs = [get_pv_for_pvc(pvc, pvs) for pvc in matched_pvcs]  # Get PVs for each matched PVC
                    matched_pvs_flat = [pv for sublist in matched_pvs for pv in sublist]  # Flatten the list of lists

                    row_data = {
                        'Pod Name': pod_name,
                        'StatefulSet Name': statefulset_name,
                        'Namespace': namespace,
                        'PVC': matched_pvcs,
                        'PV': matched_pvs_flat  # Use the flattened list of PVs
                    }
                    csv_data.append(list(row_data.values()))
                    json_data_output.append(row_data)

        # Convert to DataFrame and JSON format
        df = pd.DataFrame(csv_data, columns=csv_columns)
        return df, json_data_output

    except (json.JSONDecodeError, FileNotFoundError) as e:
        logging.error(f"An error occurred: {e}")
        return pd.DataFrame(), []  # Return an empty DataFrame and JSON list on error

# Main function to execute the script
def main():
    try:
        with open('cluster_snap.json', 'r') as json_file:
            json_data = json_file.read()
        df, json_data_output = extract_statefulset_storage_data(json_data)
        mr.Table(df)
        #mr.JSON(json_data_output)
    except FileNotFoundError as e:
        logging.error(f"File not found: {e}")

if __name__ == "__main__":
    main()


<h2>Pod Details in the cluster, grouped by owner reference.</h2>

In [46]:
import pandas as pd
from collections import defaultdict
import mercury as mr

# Initialize a defaultdict to group pods by owner reference, tracking pods, nodes, and other configurations
owner_reference_groups = defaultdict(lambda: {
    "Pods": [], "Nodes": set(), "ControllerKind": "", "Affinity": None, 
    "NodeSelector": None, "TopologySpreadConstraints": None, "Tolerations": None
})

# Iterate through the pods and group them by owner reference, excluding DaemonSet pods
for p in snap.pods:
    if p.controller_kind != "DaemonSet":
        owner_name = p.controller if p.controller else 'Unknown'
        owner_reference_groups[owner_name]["Pods"].append(p.name)
        owner_reference_groups[owner_name]["Nodes"].add(p.node_name)  # Track the node the pod is scheduled on
        owner_reference_groups[owner_name]["Affinity"] = p.affinity  # Capture affinity
        owner_reference_groups[owner_name]["NodeSelector"] = p.node_selector  # Capture nodeSelector
        # Extract topologySpreadConstraints from the pod's spec using get()
        owner_reference_groups[owner_name]["TopologySpreadConstraints"] = p.spec.get('topologySpreadConstraints', None)
        owner_reference_groups[owner_name]["Tolerations"] = p.tolerations  # Capture tolerations
        owner_reference_groups[owner_name]["ControllerKind"] = p.controller_kind  # Set controller kind

# Prepare data for DataFrame, ensuring ControllerKind is the second column
pod_data = [{
    "OwnerReference": owner, 
    "ControllerKind": data["ControllerKind"],  # Make ControllerKind the second column
    "PodCount": len(data["Pods"]), 
    "NodeCount": len(data["Nodes"]),  # Count the number of unique nodes
    "Affinity": data["Affinity"],  # Affinity (same for all pods under a single owner reference)
    "NodeSelector": data["NodeSelector"],  # NodeSelector (same for all pods under a single owner reference)
    "TopologySpreadConstraints": data["TopologySpreadConstraints"],  # Extracted from the pod spec
    "Tolerations": data["Tolerations"]  # Tolerations
} for owner, data in owner_reference_groups.items()]

# Create a DataFrame
pod_df = pd.DataFrame(pod_data)

# Display the DataFrame using mercury
mr.Table(pod_df)
