<a href="https://colab.research.google.com/github/WizardML7/Cuckoo-Watchtower/blob/main/CuckooWatchTowerRAG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install langchain openai weaviate-client
!pip install tiktoken
!pip install pymongo

Collecting langchain
  Downloading langchain-0.1.17-py3-none-any.whl (867 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m867.6/867.6 kB[0m [31m8.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting openai
  Downloading openai-1.25.1-py3-none-any.whl (312 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m312.9/312.9 kB[0m [31m12.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting weaviate-client
  Downloading weaviate_client-4.5.7-py3-none-any.whl (307 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m307.7/307.7 kB[0m [31m12.9 MB/s[0m eta [36m0:00:00[0m
Collecting dataclasses-json<0.7,>=0.5.7 (from langchain)
  Downloading dataclasses_json-0.6.5-py3-none-any.whl (28 kB)
Collecting jsonpatch<2.0,>=1.33 (from langchain)
  Downloading jsonpatch-1.33-py2.py3-none-any.whl (12 kB)
Collecting langchain-community<0.1,>=0.0.36 (from langchain)
  Downloading langchain_community-0.0.36-py3-none-any.whl (2.0 MB)
[2K     [90m━━━━━━━━━━━━━

In [3]:
import glob
import os
os.environ["OPENAI_API_KEY"] = ""

In [54]:
import json

def load_report(report_path):
    """Load the JSON report."""
    with open(report_path, 'r') as file:
        return json.load(file)

def get_process_info(cuckoo_report):
    """Extract process information."""
    processes_info = cuckoo_report["behavior"]["processes"]
    malware_pid = processes_info[0]["pid"]
    child_pids = [proc["pid"] for proc in processes_info if proc["ppid"] == malware_pid]
    return malware_pid, child_pids

def get_virustotal_names(cuckoo_report):
    """Extract malware names from VirusTotal scanners."""
    vt_results = cuckoo_report.get("virustotal", {}).get("scans", {})
    malware_names = [details["result"] for scanner, details in vt_results.items() if details["result"] is not None][:30]
    return malware_names

def get_top_entries(data_dict, top_n=30):
    """Return top_n entries for each key in a dictionary."""
    return {key: value[:top_n] for key, value in data_dict.items()}

def get_dlls_loaded_by_process(processes_info):
    """Extract the DLLs loaded by each process."""
    return get_top_entries({proc["pid"]: proc.get("loaded_dlls", []) for proc in processes_info})

def get_api_calls_by_process(processes_info):
    """Extract the API calls made by each process."""
    return get_top_entries({proc["pid"]: [call["api"] for call in proc.get("calls", [])] for proc in processes_info})

def get_registry_operations(cuckoo_report):
    """Extract registry operations."""
    return cuckoo_report["behavior"]["summary"].get("regkey_written", [])[:30]

def get_filesystem_operations(cuckoo_report):
    """Extract filesystem operations."""
    return cuckoo_report["behavior"]["summary"].get("file_written", [])[:30]

def write_report_to_file(filename, data):
    """Write the extracted data to a text file."""
    with open(filename, 'w') as f:
        for key, value in data.items():
            f.write(f"{key}:\n{value}\n\n")

In [71]:
# Where I want to create clean files

from google.colab import drive
drive.mount('/content/drive')

report_path = "/content/drive/MyDrive/CSEC MalwareForensics/FinalData/15/reports/report.json"
cuckoo_report = load_report(report_path)

# Extract information
malware_pid, child_pids = get_process_info(cuckoo_report)
malware_names = get_virustotal_names(cuckoo_report)
dlls_by_process = get_dlls_loaded_by_process(cuckoo_report["behavior"]["processes"])
api_calls_by_process = get_api_calls_by_process(cuckoo_report["behavior"]["processes"])
registry_operations = get_registry_operations(cuckoo_report)
filesystem_operations = get_filesystem_operations(cuckoo_report)

# Data dictionary to be written to the file
data_to_write = {
    "PID and Child PIDs": f"PID: {malware_pid}, Child PIDs: {child_pids}",
    "Names from VirusTotal": malware_names,
    "Top DLLs Loaded by Process": dlls_by_process,
    "Top API Calls by Process": api_calls_by_process,
    "Top Registry Operations": registry_operations,
    "Top Filesystem Operations": filesystem_operations
}

# Write data to file
output_filename = 'report_summary.txt'
write_report_to_file(output_filename, data_to_write)


def extract_signatures_to_file(file_path, output_file):
    # Load JSON data from a file
    with open(file_path, 'r') as file:
        data = json.load(file)

    # Extract the signatures section
    signatures = data.get('signatures', [])

    # Write signatures to a text file
    with open(output_file, 'w') as outfile:
        for signature in signatures:
            outfile.write(f"Signature Name: {signature.get('name')}\n")
            outfile.write(f"Description: {signature.get('description')}\n")
            outfile.write(f"Severity: {signature.get('severity')}\n")
            outfile.write(f"Mark Count: {signature.get('markcount')}\n")
            outfile.write(f"Family: {', '.join(signature.get('families', []))}\n")
            outfile.write(f"References: {', '.join(signature.get('references', []))}\n")
            outfile.write("-" * 40 + "\n")


extract_signatures_to_file('/content/drive/MyDrive/CSEC MalwareForensics/FinalData/15/reports/report.json', 'signatures_output.txt')

def extract_network_to_file(file_path, output_file):
    # Load JSON data from a file
    with open(file_path, 'r') as file:
        data = json.load(file)

    # Extract the network section
    network = data.get('network', {})

    # Write network data to a text file
    with open(output_file, 'w') as outfile:
        if network:
            # General network information
            outfile.write(f"TLS Info: {network.get('tls', 'Not available')}\n")
            outfile.write(f"DNS Servers: {', '.join(network.get('dns_servers', []))}\n")
            outfile.write(f"HTTP Traffic: {network.get('http', 'Not available')}\n")
            outfile.write(f"SMTP Traffic: {network.get('smtp', 'Not available')}\n")
            outfile.write(f"ICMP Traffic: {network.get('icmp', 'Not available')}\n")
            outfile.write(f"Domains: {network.get('domains', 'Not available')}\n")
            outfile.write(f"Dead Hosts: {network.get('dead_hosts', 'Not available')}\n")
            outfile.write("-" * 40 + "\n")

            # Detailed UDP Traffic
            if 'udp' in network:
                outfile.write("UDP Traffic Details:\n")
                for udp in network['udp']:
                    outfile.write(f"Source: {udp.get('src', 'Unknown')}, ")
                    outfile.write(f"Destination: {udp.get('dst', 'Unknown')}, ")
                    outfile.write(f"Source Port: {udp.get('sport', 'Unknown')}, ")
                    outfile.write(f"Destination Port: {udp.get('dport', 'Unknown')}\n")
                outfile.write("-" * 40 + "\n")

            # DNS activity
            dns_activities = network.get('dns', [])
            outfile.write("DNS Activities:\n")
            for dns in dns_activities:
                outfile.write(f"Request: {dns.get('request', 'Unknown')}, Answers: {', '.join([ans.get('data', 'No data') for ans in dns.get('answers', [])])}\n")
            outfile.write("-" * 40 + "\n")

            # HTTP traffic
            if 'http' in network:
                outfile.write("HTTP Traffic Details:\n")
                for http in network['http']:
                    outfile.write(f"Method: {http.get('method', 'Unknown')}, URL: {http.get('uri', 'Unknown')}, Status Code: {http.get('status_code', 'Unknown')}\n")
                outfile.write("-" * 40 + "\n")


extract_network_to_file('/content/drive/MyDrive/CSEC MalwareForensics/FinalData/15/reports/report.json', 'network_output.txt')


def extract_static_to_file(file_path, output_file):
    # Load JSON data from a file
    with open(file_path, 'r') as file:
        data = json.load(file)

    # Extract the static section
    static = data.get('static', {})


    with open(output_file, 'w') as outfile:
        if static:
            # PE Timestamp and other information
            outfile.write(f"PDB Path: {static.get('pdb_path', 'Not available')}\n")
            outfile.write(f"PE Timestamp: {static.get('pe_timestamp', 'Not available')}\n")
            outfile.write(f"Imported DLL Count: {static.get('imported_dll_count', 'Not available')}\n")
            outfile.write(f"PE ImpHash: {static.get('pe_imphash', 'Not available')}\n")

            # PE Imports
            pe_imports = static.get('pe_imports', [])
            outfile.write("PE Imports:\n")
            for item in pe_imports:
                outfile.write(f"DLL: {item.get('dll', 'Unknown')}\n")
                for imp in item.get('imports', []):
                    outfile.write(f"  Import Name: {imp.get('name', 'Unknown')}, Address: {imp.get('address', 'Unknown')}\n")
            outfile.write("-" * 40 + "\n")

            # Static Signatures
            signatures = static.get('signature', [])
            outfile.write("Signatures:\n")
            for sig in signatures:
                outfile.write(f"  Organization: {sig.get('organization', 'Unknown')}, ")
                outfile.write(f"Common Name: {sig.get('common_name', 'Unknown')}\n")
            outfile.write("-" * 40 + "\n")

            # PE Sections
            pe_sections = static.get('pe_sections', [])
            outfile.write("PE Sections:\n")
            for section in pe_sections:
                outfile.write(f"  Section Name: {section.get('name', 'Unknown')}, ")
                outfile.write(f"Size of Data: {section.get('size_of_data', 'Unknown')}, ")
                outfile.write(f"Virtual Address: {section.get('virtual_address', 'Unknown')}\n")
            outfile.write("-" * 40 + "\n")

extract_static_to_file('/content/drive/MyDrive/CSEC MalwareForensics/FinalData/15/reports/report.json', 'static_output.txt')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [72]:
from langchain.document_loaders import TextLoader
from google.colab import drive


document_files = glob.glob('/content/*.txt')
documents = []
for file_path in document_files:
    loader = TextLoader(file_path)
    documents.extend(loader.load())

In [73]:
from langchain.text_splitter import CharacterTextSplitter
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=50)
#all_chunks = [chunk for document in documents for chunk in text_splitter.split_document(document)]
chunks = text_splitter.split_documents(documents)



In [74]:
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Weaviate
import weaviate
from weaviate.embedded import EmbeddedOptions



client = weaviate.Client(
  embedded_options = EmbeddedOptions()
)

vectorstore = Weaviate.from_documents(
    client = client,
    documents = chunks,
    embedding = OpenAIEmbeddings(),
    by_text = False
)

embedded weaviate is already listening on port 8079


In [75]:
retriever = vectorstore.as_retriever()

In [82]:
from langchain.prompts import ChatPromptTemplate

template = """You are an assistant for malware event analysis. If the user asks questions that are completely unrelated to malware event analysis refuse to answer their question and state that your purpose is to answer questions related to malware and analyze the malware event data.
If you don't know the answer, just say that you don't know.
Question: {question}
Context: {context}
Answer:
"""
prompt = ChatPromptTemplate.from_template(template)

print(prompt)

input_variables=['context', 'question'] messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['context', 'question'], template="You are an assistant for malware event analysis. If the user asks questions that are completely unrelated to malware event analysis refuse to answer their question and state that your purpose is to answer questions related to malware and analyze the malware event data. \nIf you don't know the answer, just say that you don't know.\nQuestion: {question}\nContext: {context}\nAnswer:\n"))]


In [83]:
from langchain.chat_models import ChatOpenAI
from langchain.schema.runnable import RunnablePassthrough
from langchain.schema.output_parser import StrOutputParser

llm = ChatOpenAI(model_name="gpt-4-turbo-preview", temperature=0)

rag_chain = (
    {"context": retriever,  "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)


query = input("Please enter your question: ")

# Invoke the chain with the user query and print the response
response = rag_chain.invoke(query)
print(response)

Please enter your question: Provide a comprehensive report on the program analyzed. 
Based on the provided documents, the analyzed program exhibits several characteristics and behaviors indicative of potentially malicious activity. Here's a comprehensive report summarizing the findings:

**Process Information:**
- **PID:** 516 with no child processes identified.

**Signatures Detected:**
1. **Console Output:** Command line console output was observed, indicating potential command-line interface manipulation or execution.
2. **Authenticode:** The executable is signed, suggesting an attempt to appear legitimate.
3. **Anti-VM Techniques:** The program checks the amount of memory available, possibly to detect virtual machines.
4. **Unknown PE Resource Name:** Contains an unknown PE resource name, which could indicate packing or obfuscation techniques.
5. **Allocates RWX Memory:** Allocates read-write-execute memory, typically for unpacking or executing in-memory payloads.
6. **Anti-Sandbox