In [1]:
import os
import re
import json
import time # For simulating real-time delay
import hashlib 
import pandas as pd # For saving templates and offset index
from tqdm.notebook import tqdm

# --- Configuration ---
# Paths for raw log input
RAW_LOGS_DIR = "../data/raw_logs/hdfs_v2/HDFS_v2/node_logs/"
# Specific file to stream for this test
SINGLE_RAW_LOG_FILE_PATH = os.path.join(RAW_LOGS_DIR, "hadoop-hdfs-datanode-mesos-01.log") # <--- Your specified file

# Output paths for generated/live-updated files
# These will be written in APPEND mode to simulate continuous updates
OUTPUT_TEMPLATES_CSV = "../data/templates/hdfs_v2_templates_stream_test.csv"
OUTPUT_PARSED_JSONL = "../data/parsed_logs/hdfs_v2_parsed_stream_test.jsonl"
OUTPUT_OFFSET_INDEX_FILE = "../data/parsed_logs/hdfs_v2_offset_index_stream_test.csv"

# Simulate stream delay (seconds per raw log line)
STREAM_DELAY_SECONDS = 0.001 # 1 millisecond delay per line (adjust as needed for speed)

# Ensure output directories exist
os.makedirs(os.path.dirname(OUTPUT_TEMPLATES_CSV), exist_ok=True)
os.makedirs(os.path.dirname(OUTPUT_PARSED_JSONL), exist_ok=True)
os.makedirs(os.path.dirname(OUTPUT_OFFSET_INDEX_FILE), exist_ok=True)


# --- Confirm input raw log file exists ---
if not os.path.exists(SINGLE_RAW_LOG_FILE_PATH):
    print(f"ERROR: Specified raw log file NOT FOUND at {SINGLE_RAW_LOG_FILE_PATH}.")
    print("Please ensure your raw HDFS log files are correctly placed in that directory.")
    log_file_to_stream = None # Set to None to prevent errors
else:
    log_file_to_stream = SINGLE_RAW_LOG_FILE_PATH
    print(f"Configured to stream from: {log_file_to_stream}")

print("\nSetup complete. Paths configured for Raw Log Streaming Test.")

Configured to stream from: ../data/raw_logs/hdfs_v2/HDFS_v2/node_logs/hadoop-hdfs-datanode-mesos-01.log

Setup complete. Paths configured for Raw Log Streaming Test.


In [2]:
# --- Define Core HDFS Log Header Regex ---
HDFS_HEADER_REGEX = re.compile(
    r'^(?P<Date>\d{4}-\d{2}-\d{2})\s'  
    r'(?P<Time>\d{2}:\d{2}:\d{2},\d{3})\s' 
    r'(?P<Level>[A-Z]+)\s'                
    r'(?P<Component>[\w\._-]+(?:\[[\w\s\.-]+\])?):?\s+' 
    r'(?P<Content>.*)'                    
)

# --- Define NORMALIZATION_RULES for Log Content ---
NORMALIZATION_RULES = [
    (re.compile(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(:\d+)?\b'), '<IP_ADDRESS>'), 
    (re.compile(r'\b(?:blk_|BP-|DS-)[-_a-zA-Z0-9]+\b'), '<HDFS_ID>'), 
    (re.compile(r'\b(?:mesos-master-\d+|nodename \d+@mesos-master-\d+|localhost)\b'), '<HOSTNAME_OR_NODE>'), 
    (re.compile(r'\b([a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12})\b'), '<UUID>'), 
    (re.compile(r'\b(?:/usr/local/hadoop|/opt/hdfs/data|/[a-zA-Z0-9/\._-]+/\.jar)[\w\/\.-]*\b'), '<HADOOP_PATH>'), 
    (re.compile(r'[-_a-zA-Z0-9]+\.jar'), '<JAR_FILE>'), 
    (re.compile(r'\b(?:version|build)\s*=\s*[\w\.\-]+(?:(?:-|:)?\s*[\w\.:-]+)?(?:; compiled by [\w\s\']+\son\s[\d\-T:]+Z)?\b'), '<VERSION_INFO>'), 
    (re.compile(r'java = \d+\.\d+\.\d+_?\d*'), '<JAVA_VERSION>'), 
    (re.compile(r'took\s+\d+ms\s+\(threshold=\d+ms\)'), 'took <NUM>ms (threshold=<NUM>ms)'), 
    (re.compile(r'Scheduled snapshot period at \d+ second\(s\)\.'), 'Scheduled snapshot period at <NUM> second(s).'), 
    (re.compile(r'Balanced bandwith is \d+ bytes/s'), 'Balancing bandwith is <NUM> bytes/s'), 
    (re.compile(r'Number threads for balancing is \d+'), 'Number threads for balancing is <NUM>'), 
    (re.compile(r'registered UNIX signal handlers for \[[\w,\s]+\]'), 'registered UNIX signal handlers for [<SIGNAL_LIST>]'), 
    (re.compile(r'Listening HTTP traffic on /<IP_ADDRESS>:<NUM>'), 'Listening HTTP traffic on <IP_ADDRESS>:<NUM>'), 
    (re.compile(r'IPC server at /<IP_ADDRESS>:<NUM>'), 'IPC server at <IP_ADDRESS>:<NUM>'), 
    (re.compile(r'Jetty bound to port \d+'), 'Jetty bound to port <NUM>'), 
    (re.compile(r'jetty-[\d\.]+'), 'jetty-<VERSION>'), 
    (re.compile(r'block report from <IP_ADDRESS>\. Number of blocks: \d+'), 'block report from <IP_ADDRESS>. Number of blocks: <NUM>'), 
    (re.compile(r'Logging to org\.slf4j\.impl\.Log4jLoggerAdapter\(org\.mortbay\.log\) via org\.mortbay\.log\.Slf4jLog'), 'Logging to <LOGGER_IMPL> via <LOGGER_BRIDGE>'), 
    (re.compile(r'Unable to initialize FileSignerSecretProvider, falling back to use random secrets\.'), 'Unable to initialize FileSignerSecretProvider, falling back to use random secrets.'), 
    (re.compile(r'\d+'), '<NUM>'), 
]

def normalize_log_content(content):
    """Applies normalization rules to the log message content."""
    normalized_content = content
    for regex, placeholder in NORMALIZATION_RULES:
        normalized_content = regex.sub(placeholder, normalized_content)
    # Further cleanup: remove extra spaces created by replacements
    normalized_content = re.sub(r'\s+', ' ', normalized_content).strip()
    return normalized_content

def parse_log_line_hybrid_single(raw_line):
    """
    Parses a single HDFS log line for its header components.
    Returns parsed dictionary or None if header does not match.
    """
    match = HDFS_HEADER_REGEX.match(raw_line)
    if not match:
        return None 
    groups = match.groupdict()
    return {
        "timestamp": f"{groups.get('Date', '')} {groups.get('Time', '')}",
        "level": groups.get('Level', ''),
        "component": groups.get('Component', ''),
        "content_raw": groups.get('Content', '').strip(),
    }

print("Log parsing and normalization functions defined.")

Log parsing and normalization functions defined.


In [3]:
print(f"--- Simulating Raw Log Stream & On-the-Fly Parsing/Indexing ---")

# --- Global state for multi-line log assembly and offset map ---
current_multi_line_log_buffer = [] # Buffer for lines of a single logical log entry
last_parsed_multi_line_header_info = None # Stores header info for the current logical log entry
parsed_line_counter = 0 # Counter for unique line IDs in the parsed JSONL file
unique_templates_map = {} # To store unique templates discovered

# --- Clear previous test files (if any) ---
if os.path.exists(OUTPUT_PARSED_JSONL):
    os.remove(OUTPUT_PARSED_JSONL)
    print(f"Cleared existing {OUTPUT_PARSED_JSONL}")
if os.path.exists(OUTPUT_OFFSET_INDEX_FILE):
    os.remove(OUTPUT_OFFSET_INDEX_FILE)
    print(f"Cleared existing {OUTPUT_OFFSET_INDEX_FILE}")
if os.path.exists(OUTPUT_TEMPLATES_CSV):
    os.remove(OUTPUT_TEMPLATES_CSV)
    print(f"Cleared existing {OUTPUT_TEMPLATES_CSV}")

# Open output files in APPEND mode for continuous writing
try:
    with open(OUTPUT_PARSED_JSONL, 'a', encoding='utf-8') as f_parsed_jsonl, \
         open(OUTPUT_OFFSET_INDEX_FILE, 'a', encoding='utf-8') as f_offset_index:
        
        print(f"Streaming from: {log_file_to_stream}")
        with open(log_file_to_stream, 'r', encoding='utf-8', errors='ignore') as f_raw:
            # Use tqdm to show progress for reading the single log file
            for i, raw_line in tqdm(enumerate(f_raw), desc=f"Streaming {os.path.basename(log_file_to_stream)}"):
                time.sleep(STREAM_DELAY_SECONDS) # Simulate real-time delay
                
                # Step 1: Attempt to parse the current raw line as a new header
                header_info = parse_log_line_hybrid_single(raw_line)

                if header_info: # This line starts a new logical log entry
                    # Step 2: Process the previous multi-line log entry (if buffer has content)
                    if current_multi_line_log_buffer and last_parsed_multi_line_header_info:
                        full_original_message = "".join(current_multi_line_log_buffer).strip()
                        
                        # Get content for normalization and templating
                        content_to_normalize = last_parsed_multi_line_header_info['content_raw'] 
                        normalized_template_key = normalize_log_content(content_to_normalize)
                        
                        # Discover unique template
                        if normalized_template_key not in unique_templates_map:
                            event_id_hash = hashlib.md5(normalized_template_key.encode('utf-8')).hexdigest()[:8].upper()
                            unique_templates_map[normalized_template_key] = {
                                'EventId': f'HDFS_{event_id_hash}', 
                                'EventTemplate': normalized_template_key, 
                                'Description': 'Auto-generated template (Needs human review)', 
                                'SampleOriginalMessage': full_original_message, 
                                'SampleLevel': last_parsed_multi_line_header_info['level'], 
                                'SampleComponent': last_parsed_multi_line_header_info['component'], 
                            }
                        
                        # Prepare the final parsed entry JSON
                        final_parsed_entry = {
                            "line_id_in_file_header": last_parsed_multi_line_header_info['line_id_in_file_header'], 
                            "source_file": last_parsed_multi_line_header_info['source_file'],
                            "original_log_full": full_original_message, 
                            "timestamp": last_parsed_multi_line_header_info['timestamp'],
                            "level": last_parsed_multi_line_header_info['level'],
                            "component": last_parsed_multi_line_header_info['component'],
                            "event_id": unique_templates_map[normalized_template_key]['EventId'], 
                            "event_template": unique_templates_map[normalized_template_key]['EventTemplate'],
                            "parameters": "" 
                        }
                        
                        # Step 3: Append to Parsed JSONL and Byte-Offset Index
                        json_line = json.dumps(final_parsed_entry) + '\n'
                        
                        # Store current byte offset before writing this line
                        current_byte_offset = f_parsed_jsonl.tell()
                        
                        # Write parsed JSONL entry
                        f_parsed_jsonl.write(json_line)
                        
                        # Write byte-offset index entry
                        offset_index_entry = {
                            'source_file': final_parsed_entry['source_file'],
                            'line_id_in_file_header': final_parsed_entry['line_id_in_file_header'],
                            'byte_offset': current_byte_offset
                        }
                        f_offset_index.write(json.dumps(offset_index_entry) + '\n')
                        
                        parsed_line_counter += 1 # Increment global counter for parsed lines

                    # Step 4: Start new logical log entry
                    current_multi_line_log_buffer = [raw_line]
                    last_parsed_multi_line_header_info = header_info
                    # Add current line_id and source_file for the new header
                    last_parsed_multi_line_header_info['line_id_in_file_header'] = i + 1 
                    last_parsed_multi_line_header_info['source_file'] = os.path.basename(log_file_to_stream)

                else: # This line is a continuation
                    current_multi_line_log_buffer.append(raw_line)
                    # Handle cases where buffer might be empty at start (e.g., file doesn't start with header)
                    if not last_parsed_multi_line_header_info and raw_line.strip():
                        # These are lines that cannot be associated with a preceding header. Drop them.
                        current_multi_line_log_buffer = [] 

        # --- FINAL FLUSH: Process the very last log entry after file finishes ---
        if current_multi_line_log_buffer and last_parsed_multi_line_header_info:
            full_original_message = "".join(current_multi_line_log_buffer).strip()
            content_to_normalize = last_parsed_multi_line_header_info['content_raw']
            normalized_template_key = normalize_log_content(content_to_normalize)
            
            if normalized_template_key not in unique_templates_map:
                event_id_hash = hashlib.md5(normalized_template_key.encode('utf-8')).hexdigest()[:8].upper()
                unique_templates_map[normalized_template_key] = {
                    'EventId': f'HDFS_{event_id_hash}',
                    'EventTemplate': normalized_template_key,
                    'Description': 'Auto-generated template (Needs human review)',
                    'SampleOriginalMessage': full_original_message,
                    'SampleLevel': last_parsed_multi_line_header_info['level'],
                    'SampleComponent': last_parsed_multi_line_header_info['component'],
                }
            final_parsed_entry = {
                "line_id_in_file_header": last_parsed_multi_line_header_info['line_id_in_file_header'], 
                "source_file": last_parsed_multi_line_header_info['source_file'],
                "original_log_full": full_original_message, 
                "timestamp": last_parsed_multi_line_header_info['timestamp'],
                "level": last_parsed_multi_line_header_info['level'],
                "component": last_parsed_multi_line_header_info['component'],
                "event_id": unique_templates_map[normalized_template_key]['EventId'], 
                "event_template": normalized_template_key,
                "parameters": "" 
            }
            json_line = json.dumps(final_parsed_entry) + '\n'
            f_parsed_jsonl.write(json_line)
            
            offset_index_entry = {
                'source_file': final_parsed_entry['source_file'],
                'line_id_in_file_header': final_parsed_entry['line_id_in_file_header'],
                'byte_offset': current_byte_offset # This is byte offset of the LAST entry
            }
            f_offset_index.write(json.dumps(offset_index_entry) + '\n')
            parsed_line_counter += 1

    print(f"\nStreaming & parsing complete for {os.path.basename(log_file_to_stream)}.")
    print(f"Total logical log entries parsed: {parsed_line_counter}")

except Exception as e:
    print(f"An error occurred during streaming or parsing: {e}")

--- Simulating Raw Log Stream & On-the-Fly Parsing/Indexing ---
Streaming from: ../data/raw_logs/hdfs_v2/HDFS_v2/node_logs/hadoop-hdfs-datanode-mesos-01.log


Streaming hadoop-hdfs-datanode-mesos-01.log: 0it [00:00, ?it/s]

KeyboardInterrupt: 

In [4]:
print("--- Saving Generated Templates to CSV ---")

templates_data_list = []
for template_key, template_info in unique_templates_map.items(): 
    templates_data_list.append(template_info)

if templates_data_list:
    templates_df = pd.DataFrame(templates_data_list)
    templates_df = templates_df[['EventId', 'EventTemplate', 'Description', 'SampleOriginalMessage', 'SampleLevel', 'SampleComponent']]
    templates_df.to_csv(OUTPUT_TEMPLATES_CSV, index=False)
    print(f"Successfully saved {len(templates_df)} unique templates to {OUTPUT_TEMPLATES_CSV}")
else:
    print("No templates were generated. Check parsing process.")

print(f"\nExample of generated templates (first 5):\n{templates_df.head()}")

--- Saving Generated Templates to CSV ---
Successfully saved 891 unique templates to ../data/templates/hdfs_v2_templates_stream_test.csv

Example of generated templates (first 5):
         EventId                                      EventTemplate  \
0  HDFS_B1BF6BD0                                       STARTUP_MSG:   
1  HDFS_274E4A29  registered UNIX signal handlers for [<SIGNAL_L...   
2  HDFS_A7699F29  loaded properties from hadoop-metrics<NUM>.pro...   
3  HDFS_5EA7140F      Scheduled snapshot period at <NUM> second(s).   
4  HDFS_EA28BB80                    DataNode metrics system started   

                                    Description  \
0  Auto-generated template (Needs human review)   
1  Auto-generated template (Needs human review)   
2  Auto-generated template (Needs human review)   
3  Auto-generated template (Needs human review)   
4  Auto-generated template (Needs human review)   

                               SampleOriginalMessage SampleLevel  \
0  2015-12-03 14:3

In [5]:
print("\n--- Testing Sequence Retrieval with Live-Generated Index ---")

# --- IMPORTANT: MANUALLY PROVIDE A SAMPLE PROBLEMATIC ENTRY'S METADATA ---
# You need to get this from the output of Cell 3's execution (console output).
# Find an entry that says "--- Analyzing Problem: ..." and copy its
# "source_file" and "line_id_in_file_header".
# Adjust these values to one you see in your Cell 3 output.
sample_problem_entry_metadata = {
  "source_file": "hadoop-hdfs-datanode-mesos-01.log", # Example, please adjust for your data
  "line_id_in_file_header": 80939 # Example, please adjust for your data
} 

# Number of preceding logs to retrieve
NUM_LOGS_BEFORE = 5 

if offset_map: # Ensure offset_map was populated
    print(f"Attempting to retrieve {NUM_LOGS_BEFORE} preceding log entries for:")
    print(f"  File: {sample_problem_entry_metadata['source_file']}")
    print(f"  Line: {sample_problem_entry_metadata['line_id_in_file_header']}")

    log_sequence = get_contextual_log_sequence_from_disk(
        all_parsed_jsonl_path=OUTPUT_PARSED_JSONL, # Path to the live-updated parsed JSONL
        target_entry_metadata=sample_problem_entry_metadata, 
        num_lines_before=NUM_LOGS_BEFORE,
        offset_map=offset_map # Use the in-memory offset_map populated in Cell 3
    )

    if log_sequence:
        print(f"\n--- Retrieved Sequence ({len(log_sequence)} entries) ---")
        for entry in log_sequence:
            print(f"  [{entry.get('line_id_in_file_header', 'N/A')}] {entry.get('timestamp', 'N/A')} {entry.get('level', 'N/A')} {entry.get('component', 'N/A')}: {entry.get('original_log_full', '')[:100]}...")
        print("-----------------------------------")
    else:
        print("No sequence retrieved. Check if target entry metadata is correct, or if it's too early in the log file.")
else:
    print("Offset map not loaded/populated. Cannot test sequence retrieval.")

print("\n--- Sequence Retrieval Test Complete ---")


--- Testing Sequence Retrieval with Live-Generated Index ---


NameError: name 'offset_map' is not defined

In [None]:
print("--- Building/Loading RAG Knowledge Base (with FAISS Persistence) ---")

# Check if FAISS index already exists on disk
faiss_index_exists = os.path.exists(FAISS_INDEX_PATH) and os.listdir(FAISS_INDEX_PATH)

if faiss_index_exists:
    print(f"Loading existing FAISS index from {FAISS_INDEX_PATH}...")
    try:
        vectorstore = FAISS.load_local(FAISS_INDEX_PATH, embedding_model, allow_dangerous_deserialization=True) 
        print("FAISS index loaded successfully and contains data.")
    except Exception as e:
        print(f"Error loading FAISS index: {e}. Rebuilding from scratch.")
        faiss_index_exists = False 

if not faiss_index_exists: 
    print("No existing valid FAISS index found or loading failed. Building from scratch...")
    
    # 1. Load Solution Documents
    solution_doc_files_to_load = []
    if not os.path.exists(SOLUTION_DOCS_DIR):
        print(f"ERROR: Solution documents directory not found at {SOLUTION_DOCS_DIR}. RAG will not have context.")
    else:
        for root, _, files in os.walk(SOLUTION_DOCS_DIR):
            for file in files:
                file_path = os.path.join(root, file)
                if file_path.endswith('.pdf'):
                    loader = PyPDFLoader(file_path) 
                elif file_path.endswith(('.txt', '.md', '.html', '.docx', '.xlsx')):
                    loader = UnstructuredFileLoader(file_path) 
                else:
                    print(f"Skipping unsupported file type: {file_path}")
                    continue
                
                try:
                    docs = loader.load()
                    solution_doc_files_to_load.extend(docs)
                    print(f"Loaded {len(docs)} pages/chunks from {os.path.basename(file_path)}")
                except Exception as e:
                    print(f"ERROR: Could not load {file_path}: {e}")

    print(f"\nTotal raw documents loaded for RAG: {len(solution_doc_files_to_load)}")
    if not solution_doc_files_to_load:
        print("WARNING: No solution documents were loaded. RAG will not have context.")

    # 2. Split Documents into Chunks
    print("Splitting documents into smaller chunks...")
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,     # Size of each chunk
        chunk_overlap=200,   # Overlap between chunks to maintain context
        length_function=len  # Use character length
    )
    rag_chunks = text_splitter.split_documents(solution_doc_files_to_load)
    print(f"Split into {len(rag_chunks)} chunks for RAG.")

    if not rag_chunks:
        print("WARNING: No chunks created for RAG. Check document loading or text splitter settings.")

    # 3. Create Embeddings and Store in FAISS
    print(f"Creating embeddings and storing in FAISS (in-memory and saving to disk at {FAISS_INDEX_PATH})...")
    vectorstore = FAISS.from_documents(
        documents=rag_chunks,
        embedding=embedding_model
    )
    vectorstore.save_local(FAISS_INDEX_PATH) 
    print("Embeddings created and stored in FAISS, and saved to disk.")

# 4. Create Retriever
retriever = vectorstore.as_retriever()
print("RAG retriever initialized.")

print("\n--- RAG Knowledge Base Setup Complete ---")

In [None]:
print("--- Defining LLM Prompt Template ---")

# Define the Prompt Template for the LLM
prompt_template = """
You are an expert HDFS (Hadoop Distributed File System) Site Reliability Engineer (SRE) and incident responder.
Your task is to concisely analyze a given problematic HDFS log entry, understand the specific underlying problem, and generate a brief, actionable incident response plan.

Here is the problematic HDFS log entry for analysis:
{log_entry_full}

Here is the sequence of preceding log events that led up to this problem:
{sequence_of_events_json}

Here is relevant troubleshooting and solution context from our knowledge base:
{context}

Based on the HDFS log entry, the sequence of events, and the provided context, please perform the following:

1.  **Problem Summary:** Provide a concise, clear summary of what specifically went wrong, referencing insights from the sequence if possible.
2.  **Severity Assessment:** Assign a severity level (Critical, High, Medium, Low) based on the log's impact.
3.  **Root Cause Hypothesis:** Suggest the most probable root cause(s), specifically mentioning any causal links identified in the sequence.
4.  **Affected Components:** List the HDFS components or services that are most likely affected.
5.  **Actionable Response Plan (Role-Specific):**
    * **DevOps/SRE Actions:** Detailed, step-by-step actions.
    * **Developer Actions:** Specific areas to check in code, potential data issues, or configurations.
    * **Security Actions (if applicable):** Steps to verify/address potential security implications.

Format your entire response as a single JSON object with the following keys:
"summary": "...",
"severity": "...",
"root_cause_hypothesis": "...",
"affected_components": ["...", "..."],
"response_plan": {{
    "devops_sre_actions": ["...", "..."],
    "developer_actions": ["...", "..."],
    "security_actions": ["...", "..."]
}}

**CRITICAL INSTRUCTION:** Ensure your response is ONLY the complete and valid JSON object. Do NOT include any text before or after the JSON. Do NOT truncate the JSON object. It must be perfectly parsable JSON.
"""

# Create the PromptTemplate object
PROMPT_TEMPLATE_LLM = PromptTemplate( # Renamed to avoid clash if 'PROMPT' is used by other LangChain components
    template=prompt_template,
    input_variables=["log_entry_full", "sequence_of_events_json", "context"]
)

print("LLM Prompt Template defined.")

print("\n--- Running LLM Analysis & Solution Generation on Sample Problematic Entries ---")

# We will need to define a sample problematic entry here with source_file and line_id_in_file_header
# as this notebook doesn't stream from the full 43GB file for all problematic entries.
# This test is just to confirm the LLM/RAG/Sequence integration logic works.

# IMPORTANT: You need to manually pick a sample entry from your 43GB JSONL file
# (e.g., from an ERROR/WARN/FATAL entry) and copy its metadata here.
# Example:
sample_target_entry_metadata = {
    "source_file": "hadoop-hdfs-datanode-mesos-01.log", 
    "line_id_in_file_header": 80939, # Example line ID from WARN: IOException in offerService
    "original_log_full": "2016-07-28 15:43:29,170 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: IOException in offerService\njava.io.EOFException: End of File Exception between local host is: \"mesos-master-1/10.10.34.11\"; destination host is: \"mesos-master-1\":9000; : java.io.EOFException; For more details see:  http://wiki.apache.org/hadoop/EOFException\n..."
}

if offset_map: # Ensure offset_map is loaded from Cell 2
    print(f"\n--- Analyzing Sample Entry (ID: N/A, Level: {sample_target_entry_metadata.get('level', 'N/A')}) ---")
    print(f"Original Full Log Snippet:\n{sample_target_entry_metadata.get('original_log_full', '')[:500]}...") 

    # 1. Retrieve Contextual Sequence (using the function from Cell 3)
    log_sequence = get_contextual_log_sequence_from_disk(
        all_parsed_jsonl_path=INPUT_PARSED_JSONL_FULL, 
        target_entry_metadata=sample_target_entry_metadata, 
        num_lines_before=NUM_PRECEDING_LOGS_FOR_SEQUENCE,
        offset_map=offset_map 
    )
    sequence_json_str = json.dumps(log_sequence, indent=2)
    if not log_sequence:
        sequence_json_str = "No preceding log events retrieved or sequence unavailable."
        print("Sequence analysis result: No preceding events retrieved.")
    else:
        print(f"Retrieved {len(log_sequence)} preceding log entries for sequence analysis.")
    
    # 2. Prepare query for RAG retriever
    retriever_query = f"HDFS troubleshooting for {sample_target_entry_metadata.get('level', 'N/A')} log: {sample_target_entry_metadata.get('event_template', 'N/A')}. Original log snippet: {sample_target_entry_metadata.get('original_log_full', '')[:200]}"
    
    # 3. Retrieve relevant context from knowledge base
    print("Retrieving relevant context from knowledge base...")
    retrieved_docs = retriever.invoke(retriever_query)
    
    context_text = "\n\n".join([doc.page_content for doc in retrieved_docs])
    if not context_text:
        context_text = "No specific context found in the knowledge base. The LLM will generate a general solution."
        print("WARNING: No specific context retrieved from RAG. Solution may be general.")
    else:
        print(f"Retrieved {len(retrieved_docs)} document chunks.")

    # 4. Prepare the final prompt for the LLM
    final_prompt_for_llm = PROMPT_TEMPLATE_LLM.format(
        log_entry_full=sample_target_entry_metadata['original_log_full'], 
        sequence_of_events_json=sequence_json_str, 
        context=context_text
    )

    # 5. Invoke the LLM directly with the prepared prompt
    print("Sending final prompt to Gemini LLM for solution generation...")
    try:
        llm_response = llm.invoke(final_prompt_for_llm)
        solution_json_str = llm_response.content.strip()

        if solution_json_str.startswith("```json"):
            solution_json_str = solution_json_str.lstrip("```json").rstrip("```").strip()
        
        try:
            generated_solution = json.loads(solution_json_str)
            print("\n--- GENERATED INCIDENT RESPONSE PLAN (JSON) ---")
            print(json.dumps(generated_solution, indent=2))
            print("-----------------------------------------------")
        except json.JSONDecodeError as e:
            print(f"ERROR: LLM response was not valid JSON: {e}")
            print(f"Raw LLM response content (first 500 chars):\n {solution_json_str[:500]}") 

    except Exception as e:
        print(f"ERROR: Failed to invoke LLM: {e}")
        print("Please check your API key, internet connection, and Gemini API quotas.")

else:
    print("Offset map not loaded. Cannot proceed with LLM analysis.")

print("\n--- LLM Analysis & Solution Generation Complete ---")