<a href="https://colab.research.google.com/github/ben854719/Alarm-System_Cybersecurity/blob/main/Malware_Detection_in_Agentic_AI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install --upgrade langchain-google-genai google-generativeai
!pip install --upgrade langchain-google-genai google-generativeai langgraph

Collecting langchain-google-genai
  Using cached langchain_google_genai-2.1.5-py3-none-any.whl.metadata (5.2 kB)
Collecting google-ai-generativelanguage<0.7.0,>=0.6.18 (from langchain-google-genai)
  Using cached google_ai_generativelanguage-0.6.18-py3-none-any.whl.metadata (9.8 kB)
INFO: pip is looking at multiple versions of google-generativeai to determine which version is compatible with other requirements. This could take a while.
Collecting google-generativeai
  Using cached google_generativeai-0.8.5-py3-none-any.whl.metadata (3.9 kB)
  Using cached google_generativeai-0.8.4-py3-none-any.whl.metadata (4.2 kB)
  Using cached google_generativeai-0.8.3-py3-none-any.whl.metadata (3.9 kB)
  Using cached google_generativeai-0.8.2-py3-none-any.whl.metadata (3.9 kB)
INFO: pip is still looking at multiple versions of google-generativeai to determine which version is compatible with other requirements. This could take a while.
  Using cached google_generativeai-0.8.1-py3-none-any.whl.metad

In [None]:
from IPython import get_ipython
from IPython.display import display
import os
from langgraph.graph import StateGraph
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.schema import SystemMessage, HumanMessage
from typing import TypedDict, List
from google.colab import userdata

Colab_Secret_key = "Ben85"

api_key = userdata.get("Ben85")
if not api_key:
    raise ValueError("Ben85 secret not found. Please set your API key in Colab Secrets with the name 'Ben85'.")

# Define the state schema using TypedDict
class LogAnalysisState(TypedDict):
    logs: List[str]
    analysis: str

# Initialize Gemini model
gemini_model = ChatGoogleGenerativeAI(model="gemini-1.5-pro", api_key=api_key)

# Create analyze logs.

def analyze_logs(state: LogAnalysisState) -> dict:
    """
    Analyzes a list of security logs to detect anomalies using the Gemini model.

    Args:
        state: The current state of the LangGraph workflow, containing the logs.

    Returns:
        A dictionary containing the analysis to update the state. The key 'analysis'
        will hold a string with the detailed analysis, potentially including identified
        anomalies and their descriptions.
    """
    logs = state['logs']

    # Construct a more detailed prompt for the model.
    prompt_text = (
        "Analyze the following security logs carefully. Your task is to identify "
        "any anomalies, suspicious patterns, or potential security threats. "
        "For each identified anomaly, provide a brief description and indicate "
        "which log entries are related.\n\n"
        "Security Logs:\n" + "\n".join(logs) + "\n\n"
        "Please provide your analysis in a clear and concise manner."
    )

    # Invoke the Gemini model with the prompt wrapped in a HumanMessage.
    try:
        response = gemini_model.invoke([HumanMessage(content=prompt_text)]) # Use HumanMessage
        analysis_result = response.content
    except Exception as e:
        analysis_result = f"Error during analysis: {e}"
        print(analysis_result) # Print error for debugging

    # Return a dictionary with the analysis content to update the state.
    return {"analysis": analysis_result}

# Create LangGraph workflow
workflow = StateGraph(state_schema=LogAnalysisState)
workflow.add_node("log_analysis", analyze_logs)
workflow.set_entry_point("log_analysis")

# Compile the workflow
app = workflow.compile()

# Create a security log.
security_logs = [
    "User login successful",
    "Suspicious activity detected in network traffic",
    "File accessed by unauthorized user",
    "System reboot initiated",
    "Failed login attempt from unknown IP address 192.168.1.100",
    "Regular system health check passed",
]

# Run the workflow
print("Running log analysis workflow...")
result = app.invoke({"logs": security_logs})
print("\nAnalysis Result:")
print(result['analysis'])

Running log analysis workflow...

Analysis Result:
* **Unauthorized File Access:** The log entry "File accessed by unauthorized user" indicates a critical security breach.  Further investigation is needed to determine the accessed file, the unauthorized user involved, and the extent of data compromise.

* **Suspicious Network Traffic & Failed Login from Unknown IP:** The combination of "Suspicious activity detected in network traffic" and "Failed login attempt from unknown IP address 192.168.1.100" strongly suggests a potential intrusion attempt. The suspicious network traffic may be reconnaissance related to the failed login, or it might be a separate attack vector.  The IP address 192.168.1.100 should be investigated further.

* **Unexpected Reboot:** While not necessarily malicious, the "System reboot initiated" log entry warrants scrutiny.  If no scheduled maintenance or updates were planned, this could indicate an attempt to cover up malicious activity or the result of a successfu