In [None]:
from langchain.tools import BaseTool
from typing import Type
from pydantic import BaseModel, Field

class DiskCapacityInput(BaseModel):
    host: str = Field(..., description="Hostname or IP of the server")
    user: str = Field(..., description="SSH username")
    ssh_key_path: str = Field(..., description="Path to private SSH key")

class DiskCapacity(BaseTool):
    name: str = "Check disk capacity"
    description: str = """
Connects via SSH to a remote Linux server and checks the disk usage of the root (/) filesystem.
Required arguments:
- host: The hostname or IP address of the remote server.
- user: The SSH username used for authentication.
- ssh_key_path: The local file path to the private SSH key used for connecting.

Returns the output of the 'df -h /' command, or an error message if the connection fails.
"""
    args_schema: Type[DiskCapacityInput] = DiskCapacityInput
    def _run(
        self,
        host: str,
        user: str,
        ssh_key_path: str
    ) -> str:
        import subprocess
        
        print(f"Connecting to {host} with user {user} and key {ssh_key_path}")

        cmd = [
            "ssh",
            "-i", ssh_key_path,
            "-o", "StrictHostKeyChecking=no",
            f"{user}@{host}",
            "df -h"
        ]
        
        result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
        print(result)
        if result.returncode != 0:
            return f"Error: {result.stderr.strip()}"
        return result.stdout.strip()


test = DiskCapacity()
rslt = test._run(host="node-0", user="root", ssh_key_path="~/.ssh/id_rsa")
rslt

# prometheus

In [None]:
from prometheus_api_client import PrometheusConnect
import json

def classify_target(target):
    # K8s basic components keywords based on the provided targets
    k8s_basic_keywords = ['apiserver', 'coredns', 'kube-controller-manager', 'kube-proxy', 'kube-scheduler', 'kubelet']
    # Observability associated components keywords based on the provided targets (Prometheus, Alertmanager, etc.)
    observability_keywords = ['alertmanager', 'prometheus', 'kube-state-metrics', 'node-exporter', 'operator']

    # Use 'job' or 'container' to identify the component
    job_name = target.get('job', '').lower()
    container_name = target.get('container', '').lower() if target.get('container') else ''

    # Check for K8s basic components
    for keyword in k8s_basic_keywords:
        if keyword in job_name or keyword in container_name:
            return 'K8sComponents'

    # Check for observability associated components
    for keyword in observability_keywords:
        if keyword in job_name or keyword in container_name:
            return 'ObservabilityComponents'

    # If not classified, categorize as 'others'
    return 'others'

def get_prometheus_targets(prometheus_url: str) -> dict:
    """
    Queries the Prometheus API to retrieve all configured targets and their status.

    Args:
        prometheus_url: The URL of the Prometheus server (e.g., "http://localhost:9090").

    Returns:
        A dictionary containing the active and dropped targets, as retrieved from the API.
    """
    # Initialize the PrometheusConnect client.
    # This establishes a connection object pointing to the Prometheus server URL.
    prom = PrometheusConnect(url=prometheus_url)

    try:
        # Use the get_targets() method to fetch target information from the
        # Prometheus /api/v1/targets endpoint.
        targets_info = prom.get_targets()
        # print(json.dumps(targets_info, indent=2))  # Debugging output

        # The API response typically contains 'activeTargets' and 'droppedTargets'.
        # We return the entire dictionary for comprehensive information.

        lstRtn = list()
        lstTagets = targets_info.get("activeTargets")
        for taget in lstTagets:
            lstRtn.append({
                "job": taget.get("labels").get("job"),
                "container": taget.get("labels").get("container"),
                "namespace": taget.get("labels").get("namespace"),
                "service": taget.get("labels").get("service"),
                "pod": taget.get("labels").get("pod"),
                "health": taget.get("health"),
                "lastScrape": taget.get("lastScrape")
            })
        del targets_info
        
        # Classify the targets
        result = {
            'K8sComponents': [],
            'ObservabilityComponents': [],
            'others': []
        }

        for target in lstRtn:
            category = classify_target(target)
            result[category].append(target)

        return result

    except Exception as e:
        # Handle potential connection errors or API issues.
        print(f"Error querying Prometheus targets: {e}")
        return {"error": str(e)}

# Example Usage:
prometheus_server_url = "http://192.168.72.59:9090"
targets_data = get_prometheus_targets(prometheus_server_url)
print(targets_data)

In [None]:
from prometheus_api_client import PrometheusConnect
from datetime import datetime, timedelta
import json

def query_target_metrics_timeframe(
    prometheus_url: str, 
    promql_query: str, 
    start_time: datetime, 
    end_time: datetime, 
    step_size: str = "30s"
) -> dict:
    """
    Queries specific metrics from Prometheus over a defined timeframe.

    Args:
        prometheus_url: The URL of the Prometheus server (e.g., "http://localhost:9090").
        promql_query: The PromQL expression to execute (e.g., 'node_cpu_seconds_total').
        start_time: The start timestamp of the query range (datetime object).
        end_time: The end timestamp of the query range (datetime object).
        step_size: The resolution step width for the query (e.g., "1m", "30s").

    Returns:
        A dictionary containing the query results in Prometheus matrix format.
    """
    # Initialize the PrometheusConnect client.
    prom = PrometheusConnect(url=prometheus_url)

    try:
        # The custom_query_range method executes a PromQL query over a specified range.
        # It takes the query string, start time, end time, and step size.
        # This is the standard approach for range queries (API endpoint: /api/v1/query_range).
        query_results = prom.custom_query_range(
            query=promql_query,
            start_time=start_time,
            end_time=end_time,
            step=step_size
        )

        # The results are returned as a list of dictionaries, representing time series data.
        return query_results

    except Exception as e:
        # Handle potential errors during the query execution.
        print(f"Error executing range query: {e}")
        return {"error": str(e)}

In [None]:
# Example Usage for CPU Utilization (assuming node_exporter metrics are available):

# Define connection details and timeframe
prometheus_server_url = "http://192.168.72.59:9090"
end_time = datetime.now()
start_time = end_time - timedelta(minutes=3) # Querying data from the last hour

# Common PromQL query for CPU utilization (percentage of non-idle CPU usage)
# Note: Metric names like 'node_cpu_seconds_total' are typically provided by node_exporter.
# The 'instance="your_target_ip:port"' label filters the data for a specific target.
# memory: `100 * (1 - ((node_memory_MemFree_bytes + node_memory_Buffers_bytes + node_memory_Cached_bytes) / node_memory_MemTotal_bytes))`
# network receive: `rate(node_network_receive_bytes_total[5m])`
# network transmit: `rate(node_network_transmit_bytes_total[5m])`
# IOPS: `rate(node_disk_reads_completed_total[5m]) + rate(node_disk_writes_completed_total[5m])`
query = 'kube_pod_info'

# Example execution (uncomment to run):
query = query_target_metrics_timeframe(
    prometheus_server_url, 
    query, 
    start_time, 
    end_time, 
    step_size="1m"
)
# print(json.dumps(query, indent=4))

lstTmp = []
for i in query:
    lstTmp.append({
        "namespace": i.get('metric').get("namespace"),
        "service": i.get('metric').get("service"),
        "pod": i.get('metric').get("pod"),
        "node": i.get('metric').get("node"),
    })

from collections import defaultdict
# Group lstTmp by namespace
grouped_by_namespace = defaultdict(list)
for item in lstTmp:
    namespace = item.get("namespace", "unknown")  # Use "unknown" if namespace is None
    grouped_by_namespace[namespace].append(item)

# Convert defaultdict to regular dict for cleaner output
grouped_by_namespace = dict(grouped_by_namespace)
print(json.dumps(grouped_by_namespace, indent=4))

In [None]:
prometheus_server_url = "http://192.168.72.59:9090"
end_time = datetime.now()
start_time = end_time - timedelta(minutes=1) # Querying data from the last hour
# pod name: controller-58fdf44d87-tmtqt
# CPU, average CPU utilization of the entire pod in **CPU core per second**
# query = 'sum(rate(container_cpu_usage_seconds_total{pod="controller-58fdf44d87-tmtqt", container!=""}[5m])) by (pod)'
# MEM
# query = 'sum(rate(container_memory_working_set_bytes{pod="controller-58fdf44d87-tmtqt", container!=""}[5m])) by (pod)'
# Network  
# query = 'sum(rate(container_network_receive_bytes_total{pod!=""}[5m])) by (pod)'
# query = 'sum(rate(container_network_transmit_bytes_total{pod!=""}[5m])) by (pod, namespace)'
# pod status
# query = 'kube_pod_status_phase{pod="controller-58fdf44d87-tmtqt", phase="Running"}'

query_conditions = {'cpu':'sum(rate(container_cpu_usage_seconds_total{pod="***"}[1m])) by (pod)',
                    'mem':'sum(rate(container_memory_working_set_bytes{pod="***"}[1m])) by (pod)',
                    'net_in':'sum(rate(container_network_receive_bytes_total{pod="***"}[1m])) by (pod)',
                    'net_out':'sum(rate(container_network_transmit_bytes_total{pod="***"}[1m])) by (pod, namespace)',
                    'pod_status':'kube_pod_status_phase{pod="***", phase="Running"}'
                    }

keys = grouped_by_namespace.keys()
for ns in keys:
    for pod_info in grouped_by_namespace.get(ns):
        podname = pod_info.get('pod')
        for key in query_conditions.keys():
            # Example execution (uncomment to run):
            rslt = query_target_metrics_timeframe(
                prometheus_server_url, 
                query_conditions.get(key).replace("***", podname),
                start_time, 
                end_time
            )
            value = [float(sublist[-1]) for sublist in rslt[0].get('values')]
            pod_info.update({key: value})

grouped_by_namespace

# rslt = query_target_metrics_timeframe(
#             prometheus_server_url, 
#             query, 
#             start_time, 
#             end_time
#         )
# print(json.dumps(rslt, indent=4))

# Loki

In [None]:
import requests
import datetime
import json

# Configuration
LOKI_URL = "http://192.168.72.58:3100" # From kubectl get service output

def get_loki_labels_api():
    """
    Fetches available label names from Loki.
    These labels represent the 'targets' or streams of logs.
    Returns:
        list: A list of strings, where each string is a label name.
        dict: Error details if any.
    """
    try:
        response = requests.get(f"{LOKI_URL}/loki/api/v1/labels")
        response.raise_for_status()
        data = response.json()

        if data['status'] == 'success':
            return data['data'], None
        else:
            return [], {"error": data.get('error', 'Unknown error'), "status": data['status']}
    except requests.exceptions.ConnectionError:
        return [], {"error": f"Could not connect to Loki at {LOKI_URL}. Please ensure Loki is running and accessible.", "status": "connection_error"}
    except requests.exceptions.RequestException as e:
        return [], {"error": f"An HTTP request error occurred: {e}", "status": "http_error"}
    except json.JSONDecodeError:
        return [], {"error": "Failed to decode JSON response from Loki.", "status": "json_decode_error"}
    except Exception as e:
        return [], {"error": f"An unexpected error occurred: {e}", "status": "unknown_error"}

def get_loki_label_values_api(label_name: str):
    """
    Fetches values for a specific label from Loki.
    Args:
        label_name (str): The name of the label (e.g., 'app', 'namespace').
    Returns:
        list: A list of strings, where each string is a value for the given label.
        dict: Error details if any.
    """
    try:
        response = requests.get(f"{LOKI_URL}/loki/api/v1/label/{label_name}/values")
        response.raise_for_status()
        data = response.json()

        if data['status'] == 'success':
            return data['data'], None
        else:
            return [], {"error": data.get('error', 'Unknown error'), "status": data['status']}
    except requests.exceptions.ConnectionError:
        return [], {"error": f"Could not connect to Loki at {LOKI_URL}. Please ensure Loki is running and accessible.", "status": "connection_error"}
    except requests.exceptions.RequestException as e:
        return [], {"error": f"An HTTP request error occurred: {e}", "status": "http_error"}
    except json.JSONDecodeError:
        return [], {"error": "Failed to decode JSON response from Loki.", "status": "json_decode_error"}
    except Exception as e:
        return [], {"error": f"An unexpected error occurred: {e}", "status": "unknown_error"}

def query_loki_logs_api(query: str, start_time_str: str, end_time_str: str):
    """
    Queries Loki for logs matching a LogQL query within a specified time range.
    Args:
        query (str): The LogQL query string.
        start_time_str (str): Start time in 'YYYY-MM-DD HH:MM:SS' format.
        end_time_str (str): End time in 'YYYY-MM-DD HH:MM:SS' format.
        limit (int): The maximum number of log lines to retrieve. Default is 1000.
    Returns:
        list: A list of log entries, where each entry is a dictionary with 'stream' (labels) and 'values' (timestamps and log lines).
        dict: Error details if any.
    """
    try:
        start_time = datetime.datetime.strptime(start_time_str, "%Y-%m-%d %H:%M:%S")
        end_time = datetime.datetime.strptime(end_time_str, "%Y-%m-%d %H:%M:%S")

        params = {
            'query': query,
            'start': int(start_time.timestamp() * 1e9), # Loki expects nanoseconds
            'end': int(end_time.timestamp() * 1e9),     # Loki expects nanoseconds
        }
        response = requests.get(f"{LOKI_URL}/loki/api/v1/query_range", params=params)
        response.raise_for_status()
        data = response.json()

        if data['status'] == 'success':
            # Format the output for easier consumption in an API
            formatted_results = []
            if data['data']['resultType'] == 'streams': # Ensure it's stream data
                for result in data['data']['result']:
                    stream_labels = result['stream']
                    values = []
                    for entry in result['values']:
                        timestamp_ns = int(entry[0])
                        log_line = entry[1]
                        timestamp = datetime.datetime.fromtimestamp(timestamp_ns / 1e9).isoformat() # ISO 8601
                        values.append({"timestamp": timestamp, "log": log_line})
                    formatted_results.append({"stream": stream_labels, "values": values})
            return formatted_results, None
        else:
            return [], {"error": data.get('error', 'Unknown error'), "status": data['status']}
    except ValueError:
        return [], {"error": "Invalid date/time format or limit. Please use YYYY-MM-DD HH:MM:SS for time and an integer for limit.", "status": "invalid_input"}
    except requests.exceptions.ConnectionError:
        return [], {"error": f"Could not connect to Loki at {LOKI_URL}. Please ensure Loki is running and accessible.", "status": "connection_error"}
    except requests.exceptions.RequestException as e:
        return [], {"error": f"An HTTP request error occurred: {e}", "status": "http_error"}
    except json.JSONDecodeError:
        return [], {"error": "Failed to decode JSON response from Loki.", "status": "json_decode_error"}
    except Exception as e:
        return [], {"error": f"An unexpected error occurred: {e}", "status": "unknown_error"}

# Example of how you would test these functions (without being a command-line program)
print("\n--- Testing Loki Functions ---")

# Test get_loki_labels_api
labels, err = get_loki_labels_api()
if err:
    print(f"Error fetching labels: {err}")
else:
    print("\nAvailable Loki Labels:")
    for label in labels[:]: # Print labels
        print(f"  - {label}")
print(labels)

# Test get_loki_label_values_api
label_value = dict()
if labels:
    for label in labels:
        values, err = get_loki_label_values_api(label)
        if err:
            print(f"Error fetching label values for '{label}': {err}")
        else:
            label_value.update({label: values})
print(label_value)


### --- ###
# Test query_loki_logs_api
print("\nQuerying Loki Logs:")
# Using current time as a reference for demonstration
end_time_example = datetime.datetime.now()
start_time_example = end_time_example - datetime.timedelta(minutes=30) # Last 30 minutes
start_time_str_example = start_time_example.strftime("%Y-%m-%d %H:%M:%S")
end_time_str_example = end_time_example.strftime("%Y-%m-%d %H:%M:%S")

# Example: Query for logs from any stream (if any exist)
# You might need to adjust this query based on what labels your Loki instance has.
# A common starting point is to query based on a 'job' or 'namespace' label.
# For example: '{job="kubernetes-pods"}' or '{namespace="default"}'
# log_query_example = '{job=~".+"}' # This is a very broad query, might be slow/large
log_query_example = '{app="calico-node"}' # More specific query, adjust as needed
print(f"Attempting to query with LogQL: '{log_query_example}' for the last 30 minutes...")
logs, err = query_loki_logs_api(log_query_example, start_time_str_example, end_time_str_example)
print(logs)
if err:
    print(f"Error querying logs: {err}")
else:
    if logs:
        print("Found Logs (first 2 streams, 3 lines each):")
        for stream_group in logs:
            print(f"  Stream: {stream_group['stream']}")
            for log_entry in stream_group['values']:
                print(f"    [{log_entry['timestamp']}] {log_entry['log']}")
            if len(stream_group['values']) > 3:
                print(f"    ... and {len(stream_group['values']) - 3} more entries in this stream.")
        if len(logs) > 2:
            print(f"  ... and {len(logs) - 2} more log streams.")
    else:
        print("No logs found for the given query and time range.")

# Example with a specific label (if you know one exists, e.g., 'app')
# Replace 'some_app_name' with an actual application name from your Loki setup
# log_query_specific = '{app="some_app_name"}'
# print(f"\nAttempting to query with LogQL: '{log_query_specific}'...")
# specific_logs, err_specific = query_loki_logs_api(log_query_specific, start_time_str_example, end_time_str_example, limit=5)
# if err_specific:
#     print(f"Error querying specific logs: {err_specific}")
# else:
#     if specific_logs:
#         print("Found Specific Logs:")
#         # Process specific_logs similarly
#     else:
#         print("No specific logs found.")

In [7]:
import datetime
start_time_str = "2023-10-01 00:00:00"  # Example start time
start_time = datetime.datetime.strptime(start_time_str, "%Y-%m-%d %H:%M:%S")
int(start_time.timestamp()) # unix timestamp in seconds
date = datetime.datetime.now() - datetime.timedelta(minutes=30) # 30 minutes ago
date.strftime("%Y-%m-%d %H:%M:%S") # Format to string

'2025-07-15 07:19:09'