In [1]:
import sys
print(sys.executable)

/home/jovyan/IsoonAI/bin/python


In [6]:
import os
import zipfile
import subprocess
import json
import pandas as pd
from langchain_ollama.llms import OllamaLLM
import shutil


# Paths and model settings\ nZIP_PATH = 'I-Soon-data.zip'      # Path to your downloaded zip file
ZIP_PATH = '0.zip'    # Directory to extract contents
EXTRACT_DIR = 'I-Soon-data'        # Directory to extract contents
print("kernel is working")

kernel is working


In [7]:
if not os.path.isdir(EXTRACT_DIR):
    with zipfile.ZipFile(ZIP_PATH, 'r') as zip_ref:
        zip_ref.extractall(EXTRACT_DIR)
    print(f"Extracted archive to '{EXTRACT_DIR}'")
else:
    print(f"Extraction directory '{EXTRACT_DIR}' already exists")

BadZipFile: File is not a zip file

# **DATA TYPE CATEGORIZATION**

In [None]:
# Define the parent directory
parent_directory = "I-Soon-data"

# Full path to the __MACOSX folder
macosx_folder = os.path.join(parent_directory, "__MACOSX")

# Check if __MACOSX exists and remove it
if os.path.exists(macosx_folder) and os.path.isdir(macosx_folder):
    shutil.rmtree(macosx_folder)
    print(f"Deleted: {macosx_folder}")
else:
    print(f"Folder not found: {macosx_folder}")

# Organize files by extension into subfolders
for root, dirs, files in os.walk(parent_directory):
    for file in files:
        # Skip hidden files and __MACOSX if any reappear
        if file.startswith('.') or '__MACOSX' in root:
            continue

        # Get the file extension (in lowercase, without the dot)
        file_extension = os.path.splitext(file)[1].lower().lstrip('.')
        if not file_extension:
            file_extension = "no_extension"

        # Define the new subfolder path
        subfolder_path = os.path.join(parent_directory, file_extension)

        # Create the subfolder if it doesn't exist
        os.makedirs(subfolder_path, exist_ok=True)

        # Define source and destination paths
        source_path = os.path.join(root, file)
        destination_path = os.path.join(subfolder_path, file)

        # Move the file if source and destination are not the same
        if os.path.abspath(source_path) != os.path.abspath(destination_path):
            shutil.move(source_path, destination_path)

# Remove any empty folders within the parent directory
for dirpath, dirnames, filenames in os.walk(parent_directory, topdown=False):
    if not dirnames and not filenames:
        try:
            os.rmdir(dirpath)
            print(f"Removed empty folder: {dirpath}")
        except OSError:
            pass  # Ignore errors (e.g., if directory is not empty due to permissions)

# **Markdown File Classification Using Local LLM (Ollama + LangChain)**

In [None]:
import os
import glob
import shutil
import concurrent.futures
from langchain.llms import Ollama
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from tqdm import tqdm

# === Step 1: Set up LLM and Prompt ===

llm = Ollama(model="llama3.1:8b")

prompt_template = PromptTemplate(
    input_variables=["content"],
    template="""
You are analyzing the content of a Markdown (.md) file.

Markdown content:
\"\"\"
{content}
\"\"\"

1. Classify the content into one of the following categories ONLY: chats, images, other.
2. State your confidence in the classification as one of: high, medium, or low.

Respond in the following format:
Category: <chats|images|other>
Confidence: <high|medium|low>
"""
)

chain = LLMChain(llm=llm, prompt=prompt_template)

# === Step 2: Preprocessing Function ===

def preprocess_first_20_lines(file_path):
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            lines = [line.strip() for _, line in zip(range(20), f)]
            content = " ".join(lines)
        return os.path.basename(file_path), content
    except Exception:
        return os.path.basename(file_path), ""

# === Step 3: Load .md Files ===

md_dir = "I-Soon-data/md"
md_files = glob.glob(os.path.join(md_dir, "*.md"))

with concurrent.futures.ThreadPoolExecutor() as executor:
    file_data = list(executor.map(preprocess_first_20_lines, md_files))

# === Step 4: Classify Each File and Move ===

valid_categories = {"chats", "images", "other"}
results_log = []

for file_name, content in tqdm(file_data, desc="Classifying files"):
    if not content:
        continue

    try:
        response = chain.run(content=content).strip().lower()
        lines = response.splitlines()

        category = next((line.replace("category:", "").strip() for line in lines if line.startswith("category:")), "")
        confidence = next((line.replace("confidence:", "").strip() for line in lines if line.startswith("confidence:")), "")

        if category not in valid_categories:
            category = "other"

        # Destination folder *within* the md_dir
        category_path = os.path.join(md_dir, category)
        os.makedirs(category_path, exist_ok=True)

        # Move file into category folder
        src_path = os.path.join(md_dir, file_name)
        dst_path = os.path.join(category_path, file_name)

        if os.path.exists(src_path):
            shutil.move(src_path, dst_path)

        results_log.append({
            "file": file_name,
            "category": category,
            "confidence": confidence
        })

    except Exception as e:
        print(f"Failed to process {file_name}: {e}")

# === Optional: Save results to file ===

# import json
# with open("classification_results.json", "w", encoding="utf-8") as f:
#     json.dump(results_log, f, indent=2)

# import csv
# with open("classification_results.csv", "w", newline="", encoding="utf-8") as f:
#     writer = csv.DictWriter(f, fieldnames=["file", "category", "confidence"])
#     writer.writeheader()
#     writer.writerows(results_log)

# **Stage 2: LLM-Based Categorization and File Organization by Description**

In [None]:
# import os
# import shutil
# from langchain.prompts import PromptTemplate
# from langchain.chains import LLMChain
# from langchain.llms import Ollama
# from tqdm import tqdm

# # Initialize second LLM
# llm2 = Ollama(model="taozhiyuai/llama-3-refueled:q4_k_m")

# # Prompt for classification based on description
# prompt_template_stage2 = PromptTemplate(
#     input_variables=["description"],
#     template="""
# You are a strict content classifier.

# Given the following short description of a Markdown (.md) file:

# \"\"\"
# {description}
# \"\"\"

# Classify the content into one of these categories only:
# - chats
# - images
# - other

# Return only one of those three exact words (in lowercase). Do not use synonyms or explanations. Do not make up new categories.
# """
# )

# # Set up LangChain chain
# chain2 = LLMChain(llm=llm2, prompt=prompt_template_stage2)

# # Directories
# original_dir = "I-Soon-data/md"
# filtered_dir = "Filtered-markdowns"
# os.makedirs(filtered_dir, exist_ok=True)

# # Allowed categories
# valid_categories = {"chats", "images", "other"}

# # Reclassify and copy files
# for file_name, description in tqdm(results.items()):
#     try:
#         category = chain2.run(description=description).strip().lower()
#         if category not in valid_categories:
#             category = "other"  # fallback to default

#         # Create destination folder
#         category_path = os.path.join(filtered_dir, category)
#         os.makedirs(category_path, exist_ok=True)

#         # Copy the file
#         src_path = os.path.join(original_dir, file_name)
#         dst_path = os.path.join(category_path, file_name)

#         if os.path.exists(src_path):
#             shutil.copy2(src_path, dst_path)

#     except Exception as e:
#         print(f"Failed to classify or copy {file_name}: {e}")

# **Finding connetions between the MD files - reduced size due to performance issues**

Idea: Have the LLM search the markdown files and look for any files linked outside the chats.

# *RegEx based identification of linked files within the chat*

In [None]:
import os
import re
import ast
from langchain.llms import Ollama
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

# Step 1: Initialize LLM
llm2 = Ollama(model="llama3.1:8b")

# Step 2: Define the prompt template
prompt_template_file_types = PromptTemplate(
    input_variables=["content"],
    template="""
List the top 50 file extensions most commonly found in cybersecurity leaked data, including data from breaches, ransomware leaks, and dark web dumps. THINK LIKE A CYBERSECURITY EXPERT.
Focus on file types that typically contain credentials, configurations, databases, personal data, internal documentation, archives, or images (e.g., screenshots of sensitive material). 

OUTPUT INSTRUCTIONS:
ONLY OUTPUT the extensions as a clean Python list format, like [<'file_extension'>, <'file_extension'>, etc.] 
Don't include "." and ALWAYS use "'" in the list. 
Do not include any explanations, comments, or extra text. 
JUST GIVE THE LIST.
"""
)

# Step 3: Run the chain to get extensions
chain = LLMChain(llm=llm2, prompt=prompt_template_file_types)
llm_response = chain.run(content="")

# Step 4: Print the LLM output
print("🔍 LLM-generated file extensions list:")
print(llm_response)

# Step 5: Parse LLM response into a Python list
try:
    # Extract only the list portion using regex
    match = re.search(r"\[(.*?)\]", llm_response, re.DOTALL)
    if match:
        list_str = "[" + match.group(1) + "]"
        common_extensions = ast.literal_eval(list_str)
    else:
        raise ValueError("❌ No list found in LLM response.")
except Exception as e:
    raise ValueError("❌ Failed to parse LLM response into a list.") from e

# Step 6: Build dynamic regex
ext_pattern = '|'.join(common_extensions)
file_pattern = re.compile(r'[\w\-/\.]{8,}\.(?:' + ext_pattern + r')', re.IGNORECASE)

# Step 7: Define chats path and extract files
chats_path = 'I-Soon-data/md/chats'
extracted_files_from_chats = {}

for filename in os.listdir(chats_path):
    if filename.endswith('.md'):
        full_path = os.path.join(chats_path, filename)
        with open(full_path, 'r', encoding='utf-8') as file:
            content = file.read()
            matches = file_pattern.findall(content)
            if matches:
                extracted_files_from_chats[filename] = matches

# Step 8: Display results
print("\n📄 Extracted filenames from chats:")
for chat, files in extracted_files_from_chats.items():
    print(f"{chat}:")
    for f in files:
        print(f"  - {f}")

# *Search which files are in the leaked data*

In [None]:
import os
import re
from langchain.llms import Ollama
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Step 1: Load markdown file
markdown_file_path = 'I-Soon-data/md/chats/2.md'
with open(markdown_file_path, 'r', encoding='utf-8') as file:
    md_content = file.read()

# Step 2: Split into overlapping chunks
splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200)
chunks = splitter.split_text(md_content)

# Step 3: Define the prompt
prompt_template = PromptTemplate(
    input_variables=["content"],
    template="""
You are a cybersecurity analyst. Think of the top 50 most common file types.
Analyze the following markdown content and extract all the file names mentioned. 
Think based on the structure of the document how could a file be referenced. 
Look for syntax that indicates a file name (source, reference, "." and extension name) or how it can be refenced to another location from the document.
ONLY OUTPUT a list of file names with extensions, separated by commas. Do not explain. No additional commentary.

Content:
{content}
"""
)

# Step 4: Set up the LLM chain
llm = Ollama(model="llama3.1:8b")
chain = LLMChain(llm=llm, prompt=prompt_template)

# Step 5: Collect only valid filenames using regex
found_files = []

# Basic file name pattern: alphanumerics, dashes, underscores, ending in .ext (2–6 chars)
file_regex = re.compile(r'\b[\w\-]+(?:\.[\w\-]+)*\.\w{2,6}\b')

for i, chunk in enumerate(chunks):
    print(f"\n🔍 Processing chunk {i+1}/{len(chunks)}...")
    try:
        response = chain.run(content=chunk)
        print("LLM Response:", response)

        # Use regex to extract file names only
        matches = file_regex.findall(response)
        found_files.extend([m.strip().lower() for m in matches])
    except Exception as e:
        print(f"⚠️ Error processing chunk {i+1}: {e}")

# Step 6: Final clean result
unique_files = sorted(set(found_files))

print("\n🎯 Unique filenames found across all chunks:")
for f in unique_files:
    print(f)