In [None]:
%%writefile app.py
import os
os.environ["STREAMLIT_WATCHER_TYPE"] = "none"
import streamlit as st
import torch
from transformers import AutoTokenizer
import outlines
import matplotlib.pyplot as plt

# Imports
from enum import Enum
from typing import Literal, Optional
from pydantic import BaseModel, Field
from datetime import datetime
import pandas as pd
from collections import Counter
# For pretty printing
from rich import print
from rich.panel import Panel
from rich.table import Table
from rich.console import Console
from rich.text import Text

import torch
from transformers import AutoTokenizer
# Severity levels are used classify the severity of a security event.
# High severity events are those that should be escalated to a human
# for further investigation.
class SeverityLevel(str, Enum):
    CRITICAL = "CRITICAL"
    HIGH = "HIGH"
    MEDIUM = "MEDIUM"
    LOW = "LOW"
    INFO = "INFO"

# Attack types are used to classify security events. This is not an exhaustive
# list of attack vectors!
class AttackType(str, Enum):
    BRUTE_FORCE = "BRUTE_FORCE"
    SQL_INJECTION = "SQL_INJECTION"
    XSS = "XSS"
    FILE_INCLUSION = "FILE_INCLUSION"
    COMMAND_INJECTION = "COMMAND_INJECTION"
    PRIVILEGE_ESCALATION = "PRIVILEGE_ESCALATION"
    UNKNOWN = "UNKNOWN"

# A WebTrafficPattern is a pattern of traffic to a web server --
# it highlights commonly accessed URLs, methods, and response codes.
#
# WebTrafficPatterns are low-priority summarizations used to help
# with understanding the overall traffic patterns to a web server.
class WebTrafficPattern(BaseModel):
    url_path: str
    http_method: str
    hits_count: int
    response_codes: dict[str, int]  # Maps status code to count
    unique_ips: int
# A LogID is a unique identifier for a log entry. The code in this
# script injects a LOGID-<LETTERS> identifier at the beginning of
# each log entry, which we can use to identify the log entry.
# Language models are fuzzy and they often cannot completely
# copy the original log entry verbatim, so we use the LOGID
# to retrieve the original log entry.
class LogID(BaseModel):
    log_id: str = Field(
        description="""
        The ID of the log entry in the format of LOGID-<LETTERS> where
        <LETTERS> indicates the log identifier at the beginning of
        each log entry.
        """,

        # This is a regular expression that matches the LOGID-<LETTERS> format.
        # The model will fill in the <LETTERS> part.
        pattern=r"LOGID-([A-Z]+)",
    )

    # Find the log entry in a list of logs. Simple
    # conveience function.
    def find_in(self, logs: list[str]) -> Optional[str]:
        for log in logs:
            if self.log_id in log:
                return log
        return None
# Class for an IP address.
class IPAddress(BaseModel):
    ip_address: str = Field(
        pattern=r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$",
    )

# Class for an HTTP response code.
class ResponseCode(BaseModel):
    response_code: str = Field(
        pattern=r"^\d{3}$",
    )
# A WebSecurityEvent is a security event that occurred on a web server.
#
# WebSecurityEvents are high-priority events that should be escalated
# to a human for further investigation.
class WebSecurityEvent(BaseModel):
    # The log entry IDs that are relevant to this event.
    relevant_log_entry_ids: list[LogID]

    # The reasoning for why this event is relevant.
    reasoning: str

    # The type of event.
    event_type: str

    # The severity of the event.
    severity: SeverityLevel

    # Whether this event requires human review.
    requires_human_review: bool

    # The confidence score for this event. I'm not sure if this
    # is meaningful for language models, but it's here if we want it.
    confidence_score: float = Field(
        ge=0.0,
        le=1.0,
        description="Confidence score between 0 and 1"
    )

    # Web-specific fields
    url_pattern: str = Field(
        min_length=1,
        description="URL pattern that triggered the event"
    )

    http_method: Literal["GET", "POST", "PUT", "DELETE", "OPTIONS", "HEAD", "TRACE", "CONNECT"]
    source_ips: list[IPAddress]
    response_codes: list[ResponseCode]
    user_agents: list[str]

    # Possible attack patterns for this event.
    possible_attack_patterns: list[AttackType]

    # Recommended actions for this event.
    recommended_actions: list[str]
# A LogAnalysis is a high-level analysis of a set of logs.
class LogAnalysis(BaseModel):
    # A summary of the analysis.
    summary: str

    # Observations about the logs.
    observations: list[str]

    # Planning for the analysis.
    planning: list[str]

    # Security events found in the logs.
    events: list[WebSecurityEvent]

    # Traffic patterns found in the logs.
    traffic_patterns: list[WebTrafficPattern]

    # The highest severity event found.
    highest_severity: Optional[SeverityLevel]
    requires_immediate_attention: bool
def format_log_analysis(analysis: LogAnalysis, logs: list[str]):
    """Format a LogAnalysis object into a rich console output.

    Args:
        analysis: A LogAnalysis object (not a list)
        logs: List of original log entries with LOGID prefixes
    """
    console = Console()

    # Create header
    header = Panel(
        f"[bold yellow]Log Analysis Report[/]\n[blue]{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}[/]",
        border_style="yellow"
    )

    # Create observations section
    observations = Table(show_header=True, header_style="bold magenta", show_lines=True)
    observations.add_column("Key Observations", style="cyan")
    for obs in analysis.observations:
        observations.add_row(obs)

    # Create security events section
    events_table = Table(show_header=True, header_style="bold red", show_lines=True)
    events_table.add_column("Security Events", style="red")
    events_table.add_column("Details", style="yellow")

    # Create a log table if there are any relevant log entry IDs
    event_logs_table = Table(show_header=True, header_style="bold cyan", show_lines=True)
    event_logs_table.add_column("Related Log Entries", style="cyan", width=100)

    for event in analysis.events:
        event_details = [
            f"Type: {event.event_type}",
            f"Severity: {event.severity.value}",
            f"Confidence: {event.confidence_score * 100}%",
            f"Source IPs: {', '.join([ip.ip_address for ip in event.source_ips])}",
            f"URL Pattern: {event.url_pattern}",
            f"Possible Attacks: {', '.join([attack.value for attack in event.possible_attack_patterns])}"
        ]
        events_table.add_row(
            Text(event.event_type, style="bold red"),
            "\n".join(event_details)
        )

        # Add related logs to the table
        for log_id in event.relevant_log_entry_ids:
            log = log_id.find_in(logs)
            if log:
                event_logs_table.add_row(log)

    # Create traffic patterns section
    traffic_table = Table(show_header=True, header_style="bold green", show_lines=True)
    traffic_table.add_column("URL Path", style="green")
    traffic_table.add_column("Method", style="cyan")
    traffic_table.add_column("Hits", style="yellow")
    traffic_table.add_column("Status Codes", style="magenta")

    for pattern in analysis.traffic_patterns:
        traffic_table.add_row(
            pattern.url_path,
            pattern.http_method,
            str(pattern.hits_count),
            ", ".join(f"{k}: {v}" for k, v in pattern.response_codes.items()),
        )

    # Create summary panel
    summary_text = f"[bold white]Summary:[/]\n[cyan]{analysis.summary}[/]\n\n"
    if analysis.highest_severity:
        summary_text += f"[bold red]Highest Severity: {analysis.highest_severity.value}[/]\n"
    summary_text += f"[bold {'red' if analysis.requires_immediate_attention else 'green'}]" + \
                   f"Requires Immediate Attention: {analysis.requires_immediate_attention}[/]"

    summary = Panel(
        summary_text,
        border_style="blue"
    )

    # Print everything
    console.print(header)
    console.print("\n[bold blue]📝 Analysis Summary:[/]")
    console.print(summary)
    console.print(observations)
    console.print("\n[bold red]⚠️  Security Events:[/]")
    console.print(events_table)
    console.print(event_logs_table)
    console.print("\n[bold green]📊 Traffic Patterns:[/]")
    console.print(traffic_table)

# Define the STRESSED class as previously
class STRESSED:
    def __init__(
        self,
        model,
        tokenizer,
        log_type: str,
        prompt_template_path: str,
        token_max: int,
        stressed_out: bool = False
    ):
        if token_max <= 0:
            raise ValueError("token_max must be positive")
        if not os.path.exists(prompt_template_path):
            raise FileNotFoundError(f"Prompt template not found: {prompt_template_path}")

        self.model = model
        self.tokenizer = tokenizer
        self.log_type = log_type
        self.token_max = token_max
        self.stressed_out = stressed_out
        # Load prompt template
        with open(prompt_template_path, "r") as file:
            self.prompt_template = file.read()

        # Initialize generator
        self.logger = outlines.generate.json(
            self.model,
            LogAnalysis,
            sampler=outlines.samplers.greedy(),
        )

    def _to_prompt(self, text: str, pydantic_class: BaseModel) -> str:
        if self.stressed_out:
            stress_prompt = """
            You are a computer security intern that's really stressed out.
            Your job is hard and you're not sure you're doing it well.

            Your observations and summaries should reflect your anxiety.
            Convey a sense of urgency and panic, be apologetic, and
            generally act like you're not sure you can do your job.

            In your summary, address your boss as "boss" and apologize for
            any mistakes you've made even if you haven't made any.

            Use "um" and "ah" a lot.
            """
        else:
            stress_prompt = ""

        messages = []
        if self.stressed_out:
            messages.append({"role": "system", "content": stress_prompt})

        messages.append(
            {"role": "user", "content": self.prompt_template.format(
                log_type=self.log_type,
                logs=text,
                model_schema=pydantic_class.model_json_schema(),
                stress_prompt=stress_prompt,
            )}
        )

        return self.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True,
        )

    def analyze_logs(
    self,
    logs: list[str],
    chunk_size: int = 10,
    format_output: bool = True
) -> list[LogAnalysis]:
        results = []
        for i in range(0, len(logs), chunk_size):
            chunked_logs = [log for log in logs[i:i+chunk_size] if log]
            if not chunked_logs:
                continue

            log_ids = [f"LOGID-{chr(65 + (j // 26) % 26)}{chr(65 + j % 26)}"
                      for j in range(len(chunked_logs))]
            logs_with_ids = [f"{log_id} {log}" for log_id, log in zip(log_ids, chunked_logs)]
            chunk = "\n".join(logs_with_ids)
            prompt = self._to_prompt(chunk, LogAnalysis)

            try:
                analysis = self.logger(prompt, max_tokens=self.token_max)
                if format_output:
                    format_log_analysis(analysis, logs_with_ids)
                results.append(analysis)
            except Exception as e:
                print(f"[Error] Skipping chunk {i // chunk_size + 1} due to error: {e}")
                continue

        return results


# Streamlit UI for log analysis
def run_log_analysis_app():
    st.title("Log File Analysis with Security Prompt")

    # Upload log file
    log_file = st.file_uploader("Upload Log File", type=["txt", "log"])
    if log_file:
        log_content = log_file.read().decode("utf-8").splitlines()
        st.write(f"Uploaded Log File: {log_file.name}")
        st.text_area("Log Content", "\n".join(log_content), height=200)

    # Upload the security prompt template file
    prompt_file = st.file_uploader("Upload Security Prompt Template", type=["txt"])
    if prompt_file:
        prompt_content = prompt_file.read().decode("utf-8")
        st.write(f"Uploaded Prompt Template File: {prompt_file.name}")

    # Choose whether to stress the analysis
    stressed_out = st.checkbox("Stressed Out Intern Mode", value=True)

    # Configuration
    token_max = st.slider("Max Tokens for Generation", min_value=500, max_value=32000, value=32000, step=500)
    model_name = "Qwen/Qwen2.5-Coder-7B-Instruct"
    log_type = "web server"
    os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

    if st.button("Analyze Logs"):
        # Load the model and tokenizer
        model_name = "Qwen/Qwen2.5-Coder-7B-Instruct"

        model = outlines.models.vllm(
            # The model we're using
            model_name,

            # The dtype to use for the model. bfloat16 is faster
            # than the native float size.
            dtype=torch.bfloat16,

            # Enable prefix caching for faster inference
            enable_prefix_caching=True,

            # Disable sliding window -- this is required
            # for prefix caching to work.
            disable_sliding_window=True,

            # The maximum sequence length for the model.
            # Modify this if you have more memory available,
            # and/or if your logs are longer.
            max_model_len=32000,

            max_num_seqs=10
        )
        # model = outlines.models.vllm(
        #     model_name,
        #     dtype=torch.bfloat16,
        #     enable_prefix_caching=True,
        #     disable_sliding_window=True,
        #     max_model_len=token_max,
        # )
        tokenizer = AutoTokenizer.from_pretrained(model_name)

        # Initialize the STRESSED parser
        parser = STRESSED(
            model=model,
            tokenizer=tokenizer,
            log_type=log_type,
            prompt_template_path="/content/security-prompt.txt",
            token_max=token_max,
            stressed_out=stressed_out
        )

        # Run the analysis
        results = parser.analyze_logs(
            log_content[:25],
            chunk_size=5,
            format_output=False

        )
        import json

        # Convert successful results to serializable dicts
        results_as_dicts = [r.dict() for r in results]

        # Save to a JSON file
        with open("analysis_results.json", "w") as f:
            json.dump(results_as_dicts, f, indent=2)

        st.success("Saved partial results to 'analysis_results.json'")

        # Display results
        for analysis in results[:5]:
            st.subheader("Log Analysis Summary")
            st.write(analysis.summary)
            st.subheader("Observations")
            st.write("\n".join(analysis.observations))
            st.subheader("Security Events")
            for event in analysis.events:
                st.write(f"**Event Type:** {event.event_type}")
                st.write(f"**Severity:** {event.severity.value}")
                st.write(f"**Confidence Score:** {event.confidence_score * 100}%")
                st.write(f"**Source IPs:** {', '.join([ip.ip_address for ip in event.source_ips])}")
                st.write(f"**URL Pattern:** {event.url_pattern}")
                st.write(f"**Possible Attacks:** {', '.join([attack.value for attack in event.possible_attack_patterns])}")
                st.write(f"**Recommended Actions:** {', '.join(event.recommended_actions)}")
                st.write("---")



        st.title("📊 Log Analysis Viewer")

        json_path = "/content/analysis_results.json"

        if os.path.exists(json_path):
            with open(json_path, "r") as f:
                data = json.load(f)

            for idx, entry in enumerate(data[:5]):
                st.header(f"🔍 Analysis {idx + 1}")
                st.subheader("📝 Summary")
                st.write(entry["summary"])

                st.subheader("📌 Observations")
                for obs in entry.get("observations", []):
                    st.markdown(f"- {obs}")

                st.subheader("🧠 Planning")
                for plan in entry.get("planning", []):
                    st.markdown(f"- {plan}")

                st.subheader("⚠️ Events")
                if entry.get("events"):
                    for event in entry["events"]:
                        st.markdown(f"**Event Type:** {event['event_type']}")
                        st.markdown(f"**Severity:** {event['severity']}")
                        st.markdown(f"**Requires Review:** {event['requires_human_review']}")
                        st.markdown(f"**Confidence:** {event['confidence_score']}")
                        st.markdown(f"**Source IPs:** {', '.join([ip['ip_address'] for ip in event['source_ips']])}")
                        st.markdown(f"**Recommended Actions:**")
                        for action in event["recommended_actions"]:
                            st.markdown(f"- {action}")
                        st.markdown("---")
                else:
                    st.info("No security events found in this analysis.")

                st.subheader("🚦 Highest Severity")
                st.write(entry.get("highest_severity", "N/A"))

                st.subheader("⛑️ Immediate Attention Required")
                st.write("✅ Yes" if entry.get("requires_immediate_attention") else "❌ No")

                st.markdown("---")
        else:
            st.error(f"File not found: {json_path}")

        st.title("📈 Log Analysis Visual Dashboard")

        json_path = "/content/analysis_results.json"

        if os.path.exists(json_path):
            with open(json_path, "r") as f:
                data = json.load(f)

            event_rows = []
            for entry in data:
                for event in entry.get("events", []):
                    event_rows.append({
                        "event_type": event["event_type"],
                        "severity": event["severity"],
                        "confidence_score": event["confidence_score"],
                        "source_ips": ", ".join([ip["ip_address"] for ip in event["source_ips"]]),
                    })

            if event_rows:
                df = pd.DataFrame(event_rows)

                st.subheader("🚨 Event Type Distribution")
                # Normalize event types
                df["event_type"] = df["event_type"].replace({
                    "BRUTE_FORCE": "Brute Force Attack",
                    "BRUTE FORCE": "Brute Force Attack",
                    "Brute Force": "Brute Force Attack"
                })

                event_counts = df["event_type"].value_counts()
                fig1, ax1 = plt.subplots()
                ax1.bar(event_counts.index, event_counts.values, color='skyblue')
                ax1.set_title("Event Types")
                st.pyplot(fig1)

                st.subheader("🔥 Severity Level Distribution")
                severity_counts = df["severity"].value_counts()
                fig2, ax2 = plt.subplots()
                ax2.bar(severity_counts.index, severity_counts.values, color='salmon')
                ax2.set_title("Severity Levels")
                st.pyplot(fig2)

                st.subheader("🎯 Confidence Score Distribution")
                fig3, ax3 = plt.subplots()
                ax3.hist(df["confidence_score"], bins=10, color='lightgreen')
                ax3.set_title("Confidence Scores")
                st.pyplot(fig3)

                st.subheader("🌍 Source IP Address Frequency")
                ip_counts = Counter()
                for ip_list in df["source_ips"]:
                    for ip in ip_list.split(", "):
                        ip_counts[ip] += 1
                ip_df = pd.DataFrame(ip_counts.items(), columns=["IP", "Count"])
                ip_df = ip_df.sort_values(by="Count", ascending=False)
                fig4, ax4 = plt.subplots()
                ax4.bar(ip_df["IP"], ip_df["Count"], color='orange')
                ax4.set_title("Top Source IPs")
                plt.xticks(rotation=45)
                st.pyplot(fig4)

            else:
                st.warning("No events found in the file.")
        else:
            st.error(f"File not found: {json_path}")


        from reportlab.lib.pagesizes import A4
        from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak
        from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
        from reportlab.lib.enums import TA_LEFT
        from reportlab.lib import colors

        from io import BytesIO

        st.title("📄 Export Log Analysis to PDF")

        # JSON path
        json_path = "/content/analysis_results.json"

        if os.path.exists(json_path):
            with open(json_path, "r") as f:
                data = json.load(f)

            buffer = BytesIO()
            doc = SimpleDocTemplate(buffer, pagesize=A4)
            styles = getSampleStyleSheet()
            styles.add(ParagraphStyle(name="Heading", fontSize=14, leading=16, spaceAfter=12, textColor=colors.darkblue))
            styles.add(ParagraphStyle(name="SubHeading", fontSize=12, leading=14, spaceAfter=6, textColor=colors.darkred))
            styles.add(ParagraphStyle(name="Body", fontSize=10, leading=12, spaceAfter=4, alignment=TA_LEFT))

            elements = []

            for idx, entry in enumerate(data):
                elements.append(Paragraph(f"🔍 Analysis {idx + 1}", styles["Heading"]))
                elements.append(Paragraph("📝 Summary", styles["SubHeading"]))
                elements.append(Paragraph(entry["summary"], styles["Body"]))

                elements.append(Paragraph("📌 Observations", styles["SubHeading"]))
                for obs in entry.get("observations", []):
                    elements.append(Paragraph(f"• {obs}", styles["Body"]))

                elements.append(Paragraph("🧠 Planning", styles["SubHeading"]))
                for plan in entry.get("planning", []):
                    elements.append(Paragraph(f"• {plan}", styles["Body"]))

                elements.append(Paragraph("⚠️ Events", styles["SubHeading"]))
                if entry.get("events"):
                    for event in entry["events"]:
                        elements.append(Paragraph(f"Event Type: {event['event_type']}", styles["Body"]))
                        elements.append(Paragraph(f"Severity: {event['severity']}", styles["Body"]))
                        elements.append(Paragraph(f"Requires Review: {event['requires_human_review']}", styles["Body"]))
                        elements.append(Paragraph(f"Confidence: {event['confidence_score']}", styles["Body"]))
                        elements.append(Paragraph(f"Source IPs: {', '.join([ip['ip_address'] for ip in event['source_ips']])}", styles["Body"]))
                        elements.append(Paragraph("Recommended Actions:", styles["Body"]))
                        for action in event["recommended_actions"]:
                            elements.append(Paragraph(f"• {action}", styles["Body"]))
                        elements.append(Spacer(1, 6))
                else:
                    elements.append(Paragraph("No security events found in this analysis.", styles["Body"]))

                elements.append(Paragraph(f"🚦 Highest Severity: {entry.get('highest_severity', 'N/A')}", styles["Body"]))
                attention = "✅ Yes" if entry.get("requires_immediate_attention") else "❌ No"
                elements.append(Paragraph(f"⛑️ Immediate Attention Required: {attention}", styles["Body"]))
                elements.append(PageBreak())

            doc.build(elements)

            st.success("PDF report created successfully!")
            st.download_button(
                label="📥 Download PDF",
                data=buffer.getvalue(),
                file_name="log_analysis_report.pdf",
                mime="application/pdf"
            )
        else:
            st.error(f"File not found: {json_path}")

if __name__ == "__main__":
    run_log_analysis_app()

Overwriting app.py


In [None]:
import urllib
print("Password/Enpoint IP for localtunnel is:",urllib.request.urlopen('https://ipv4.icanhazip.com').read().decode('utf8').strip("\n"))

Password/Enpoint IP for localtunnel is: 34.143.144.190


In [None]:
!streamlit run /content/app.py --server.runOnSave false & npx localtunnel --port 8501


[1G[0K⠙
Collecting usage statistics. To deactivate, set browser.gatherUsageStats to false.
[0m
[1G[0K⠹[1G[0K⠸[0m
[34m[1m  You can now view your Streamlit app in your browser.[0m
[0m
[34m  Local URL: [0m[1mhttp://localhost:8501[0m
[34m  Network URL: [0m[1mhttp://172.28.0.12:8501[0m
[34m  External URL: [0m[1mhttp://34.143.144.190:8501[0m
[0m
[1G[0K⠼[1G[0Kyour url is: https://yellow-worms-grab.loca.lt
2025-04-29 14:22:04.986 Examining the path of torch.classes raised:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/streamlit/web/bootstrap.py", line 347, in run
    if asyncio.get_running_loop().is_running():
       ^^^^^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: no running event loop

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/streamlit/watcher/local_sources_watcher.py", line 217, in get_module_paths
    potential_paths = extr