In [21]:
import os
from openai import OpenAI
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np
from typing import List
import tiktoken

# Set up OpenAI API client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

# Connect to Milvus
uri = "https://in03-773b56d268b0981.api.gcp-us-west1.zillizcloud.com"
token = os.getenv("ZILLIZ_API_KEY")
connections.connect("default", uri=uri, token=token)

# Define collection schema
dim = 1536  # dimension for text-embedding-3-small
collection_name = "k8s_logs_events"

fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="type", dtype=DataType.VARCHAR, max_length=10),  # 'log' or 'event'
    FieldSchema(name="namespace", dtype=DataType.VARCHAR, max_length=100),
    FieldSchema(name="chunk_index", dtype=DataType.INT64),  # New field to track chunks
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=dim)
]
schema = CollectionSchema(fields, "K8s logs and events embeddings")

# Create collection
collection = Collection(collection_name, schema)

# Create an IVF_FLAT index for the embedding field
index_params = {
    "metric_type": "L2",
    "index_type": "IVF_FLAT",
    "params": {"nlist": 1024}
}
collection.create_index("embedding", index_params)

def num_tokens_from_string(string: str, encoding_name: str) -> int:
    """Returns the number of tokens in a text string."""
    encoding = tiktoken.get_encoding(encoding_name)
    num_tokens = len(encoding.encode(string))
    return num_tokens

def split_text(text: str, max_tokens: int = 8000) -> List[str]:
    """Split the text into chunks of maximum token length."""
    chunks = []
    current_chunk = ""
    current_token_count = 0
    
    for line in text.split('\n'):
        line_token_count = num_tokens_from_string(line, "cl100k_base")
        if current_token_count + line_token_count > max_tokens:
            chunks.append(current_chunk.strip())
            current_chunk = line
            current_token_count = line_token_count
        else:
            current_chunk += line + '\n'
            current_token_count += line_token_count
    
    if current_chunk:
        chunks.append(current_chunk.strip())
    
    return chunks

def generate_embedding(text: str) -> List[float]:
    result = client.embeddings.create(input=[text], model="text-embedding-3-small")
    return result.data[0].embedding

def process_events(events_content: str) -> List[dict]:
    events = []
    lines = events_content.strip().split('\n')[1:]  # Skip the header
    for line in lines:
        parts = line.split('\t')
        if len(parts) == 6:
            namespace, last_seen, event_type, reason, obj, message = parts
            event_text = f"Namespace: {namespace}, Type: {event_type}, Reason: {reason}, Object: {obj}, Message: {message}, Last Seen: {last_seen}"
            events.append({
                "text": event_text,
                "type": "event",
                "namespace": namespace
            })
    return events

def process_logs(logs_content: str) -> List[dict]:
    logs = []
    current_pod = ""
    current_log = ""
    for line in logs_content.strip().split('\n'):
        if line.startswith("Logs for "):
            if current_pod and current_log:
                logs.append({
                    "text": current_log.strip(),
                    "type": "log",
                    "namespace": current_pod.split('/')[0]
                })
            current_pod = line.split("Logs for ")[1].strip(':')
            current_log = ""
        elif line == "-----------------------":
            continue
        else:
            current_log += line + "\n"
    
    if current_pod and current_log:
        logs.append({
            "text": current_log.strip(),
            "type": "log",
            "namespace": current_pod.split('/')[0]
        })
    
    return logs

def insert_data(data: List[dict]):
    entities = []
    
    for item in data:
        chunks = split_text(item["text"])
        for i, chunk in enumerate(chunks):
            embedding = generate_embedding(chunk)
            entity = {
                "text": chunk,
                "type": item["type"],
                "namespace": item["namespace"],
                "chunk_index": i,
                "embedding": embedding
            }
            entities.append(entity)
    
    collection.insert(entities)
    collection.flush()

# Process events
with open("../core-deployed-events.txt", "r") as f:
    events_content = f.read()
events = process_events(events_content)

# Process logs
with open("../core-deployed-logs.txt", "r") as f:
    logs_content = f.read()
logs = process_logs(logs_content)

# Combine events and logs
all_data = events + logs

# Insert data into Milvus
insert_data(all_data)

# Load the collection
collection.load()

print(f"Inserted {len(all_data)} K8s events and logs into Milvus.")

Inserted 23 K8s events and logs into Milvus.

Search Results:
ID: 451951540723338732, Distance: 0.0
Type: log
Namespace: grafana
Chunk Index: 0
Text: logger=infra.usagestats t=2024-08-22T18:32:44.078575466Z level=info msg="Usage stats are ready to report"

ID: 451951540723338796, Distance: 0.0
Type: log
Namespace: grafana
Chunk Index: 0
Text: logger=infra.usagestats t=2024-08-22T18:32:44.078575466Z level=info msg="Usage stats are ready to report"

ID: 451951540723338764, Distance: 0.0
Type: log
Namespace: grafana
Chunk Index: 0
Text: logger=infra.usagestats t=2024-08-22T18:32:44.078575466Z level=info msg="Usage stats are ready to report"

ID: 451951540723338828, Distance: 4.6146933527779765e-06
Type: log
Namespace: grafana
Chunk Index: 0
Text: logger=infra.usagestats t=2024-08-22T18:32:44.078575466Z level=info msg="Usage stats are ready to report"

ID: 451951540723338860, Distance: 4.6146933527779765e-06
Type: log
Namespace: grafana
Chunk Index: 0
Text: logger=infra.usagestats t=2024-0

In [28]:
# Example search (using the first event as a query)
search_text = """
determine health status of cluster
"""
search_embedding = generate_embedding(search_text)

search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = collection.search(
    data=[search_embedding],
    anns_field="embedding",
    param=search_params,
    limit=5,
    output_fields=["text", "type", "namespace", "chunk_index"]
)

print("\nSearch Results:")
for hit in results[0]:
    print(f"ID: {hit.id}, Distance: {hit.distance}")
    print(f"Type: {hit.entity.get('type')}")
    print(f"Namespace: {hit.entity.get('namespace')}")
    print(f"Chunk Index: {hit.entity.get('chunk_index')}")
    print(f"Text: {hit.entity.get('text')}")
    print()


Search Results:
ID: 451951540723338754, Distance: 1.161525845527649
Type: log
Namespace: pepr-system
Chunk Index: 0
Text: {"level":30,"time":1724351519073,"pid":16,"hostname":"pepr-uds-core-65ff788585-578cp","method":"GET","url":"/healthz","status":200,"duration":"1 ms"}
{"level":30,"time":1724351519073,"pid":16,"hostname":"pepr-uds-core-65ff788585-578cp","method":"GET","url":"/healthz","status":200,"duration":"0 ms"}
{"level":30,"time":1724351529070,"pid":16,"hostname":"pepr-uds-core-65ff788585-578cp","method":"GET","url":"/healthz","status":200,"duration":"0 ms"}
{"level":30,"time":1724351529071,"pid":16,"hostname":"pepr-uds-core-65ff788585-578cp","method":"GET","url":"/healthz","status":200,"duration":"0 ms"}
{"level":30,"time":1724351532400,"pid":16,"hostname":"pepr-uds-core-65ff788585-578cp","method":"GET","url":"/metrics","status":200,"duration":"2 ms"}
{"level":30,"time":1724351539073,"pid":16,"hostname":"pepr-uds-core-65ff788585-578cp","method":"GET","url":"/healthz","status":