# Security Knowledge Base - Sigma, MITRE ATT&CK, OWASP

Notebook n√†y load v√† query c√°c documents t·ª´:
1. **Sigma Rules** - Detection rules t·ª´ YAML files
2. **MITRE ATT&CK Techniques** - T·ª´ trang enterprise
3. **OWASP Cheatsheets** - T·ª´ th∆∞ m·ª•c local

## üìã Th·ª© t·ª± ch·∫°y c√°c cells:

### B∆∞·ªõc 1: Setup
- **Cell 1**: Imports
- **Cell 2**: Helper Functions

### B∆∞·ªõc 2: Load Documents
- **Cell 3-5**: Load v√† Parse Sigma Rules
- **Cell 6-7**: Load MITRE ATT&CK Techniques (t·ª´ trang enterprise)
- **Cell 8**: Load OWASP Cheatsheets (t·ª´ th∆∞ m·ª•c local)

### B∆∞·ªõc 3: Text Splitting
- **Cell 9**: Text Splitting v√† Combine All Documents
- **Cell 10**: Xem Chunks v√† Metadata

### B∆∞·ªõc 4: Embedding & ChromaDB
- **Cell 11**: Embedding Model Setup
- **Cell 12**: ChromaDB Setup (embed t·∫•t c·∫£ documents)

### B∆∞·ªõc 5: Query & Test
- **Cell 13**: Balanced Query (c√¢n b·∫±ng k·∫øt qu·∫£ t·ª´ m·ªói lo·∫°i)
- **Cell 14**: Balanced Query Test

### B∆∞·ªõc 6: Hybrid Retrieval & Reranking (N√¢ng cao)
- **Cell 15**: Setup Reranker v√† BM25
- **Cell 16**: Hybrid Retrieval Function (Semantic + Keyword)
- **Cell 17**: Reranking Function
- **Cell 18**: Complete Pipeline (Hybrid + Rerank)
- **Cell 19**: Test Hybrid Retrieval + Reranking

**L∆∞u √Ω:** C·∫ßn c√†i ƒë·∫∑t:
- `pip install rank-bm25 flashrank` cho hybrid retrieval v√† reranking


In [None]:
# Imports
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain_community.document_loaders import WebBaseLoader, AsyncChromiumLoader
from langchain_core.documents import Document
from langchain_chroma import Chroma
from langchain_community.vectorstores.utils import filter_complex_metadata
from langchain_text_splitters import RecursiveCharacterTextSplitter
try:
    from langchain_huggingface import HuggingFaceEmbeddings
except ImportError:
    from langchain_community.embeddings import HuggingFaceEmbeddings

# Reranker v√† Hybrid Retrieval
try:
    from rank_bm25 import BM25Okapi
except ImportError:
    print("‚ö†Ô∏è rank_bm25 ch∆∞a ƒë∆∞·ª£c c√†i ƒë·∫∑t. Ch·∫°y: pip install rank-bm25")
    BM25Okapi = None

# Reranker: S·ª≠ d·ª•ng Cross-Encoder t·ª´ sentence-transformers (t∆∞∆°ng th√≠ch v·ªõi embedding model)
try:
    from sentence_transformers import CrossEncoder
    CROSS_ENCODER_AVAILABLE = True
except ImportError:
    CROSS_ENCODER_AVAILABLE = False
    print("‚ö†Ô∏è sentence-transformers ch∆∞a c√≥ CrossEncoder. S·∫Ω d√πng simple reranking.")
    CrossEncoder = None

# Fallback: FlashRank ho·∫∑c Cohere
try:
    from flashrank import Ranker, RerankRequest
    FLASHRANK_AVAILABLE = True
except ImportError:
    FLASHRANK_AVAILABLE = False

try:
    from langchain_cohere import CohereRerank
    COHERE_AVAILABLE = True
except ImportError:
    COHERE_AVAILABLE = False

import os
import yaml
import json
import requests
import glob
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import time
import numpy as np
from typing import List, Tuple, Dict
from collections import Counter


In [73]:
# Helper Functions

def parse_chroma_metadata(doc: Document) -> dict:
    """
    Parse metadata t·ª´ ChromaDB, convert JSON strings v·ªÅ l·∫°i list/dict
    """
    metadata = doc.metadata.copy()
    json_fields = ['tags', 'references', 'detection', 'logsource', 'detection_keywords', 'falsepositives']
    
    for field in json_fields:
        if field in metadata and isinstance(metadata[field], str):
            try:
                metadata[field] = json.loads(metadata[field])
            except (json.JSONDecodeError, TypeError):
                pass  # Gi·ªØ nguy√™n n·∫øu kh√¥ng parse ƒë∆∞·ª£c
    
    return metadata

def format_doc_for_llm(doc: Document, include_full_rule: bool = False) -> str:
    """
    Format document v·ªõi metadata ƒë·ªÉ LLM d·ªÖ ƒë·ªçc
    """
    metadata = parse_chroma_metadata(doc)
    
    output = []
    output.append(f"Title: {metadata.get('title', 'N/A')}")
    output.append(f"ID: {metadata.get('id', 'N/A')}")
    output.append(f"Status: {metadata.get('status', 'N/A')}")
    output.append(f"Level: {metadata.get('level', 'N/A')}")
    output.append(f"Description: {metadata.get('description', 'N/A')}")
    
    if metadata.get('author'):
        output.append(f"Author: {metadata.get('author')}")
    
    if metadata.get('tags'):
        tags = metadata['tags'] if isinstance(metadata['tags'], list) else []
        output.append(f"Tags: {', '.join(str(t) for t in tags)}")
    
    if metadata.get('logsource'):
        logsource = metadata['logsource'] if isinstance(metadata['logsource'], dict) else {}
        output.append(f"Log Source: {json.dumps(logsource, ensure_ascii=False)}")
    
    if metadata.get('detection'):
        detection = metadata['detection'] if isinstance(metadata['detection'], dict) else {}
        if 'keywords' in detection and detection['keywords']:
            keywords = detection['keywords']
            keywords_preview = keywords[:5] if len(keywords) > 5 else keywords
            output.append(f"Detection Keywords: {', '.join(str(k) for k in keywords_preview)}")
            if len(keywords) > 5:
                output.append(f"  (+ {len(keywords)-5} more keywords)")
        if 'condition' in detection:
            output.append(f"Detection Condition: {detection['condition']}")
    
    if metadata.get('references'):
        refs = metadata['references'] if isinstance(metadata['references'], list) else []
        if refs:
            output.append(f"References: {len(refs)} reference(s)")
            for ref in refs[:3]:
                output.append(f"  - {ref}")
            if len(refs) > 3:
                output.append(f"  ... v√† {len(refs)-3} reference(s) kh√°c")
    
    output.append(f"\nContent:\n{doc.page_content}")
    
    if include_full_rule and metadata.get('full_rule'):
        output.append(f"\nFull Rule YAML:\n{metadata['full_rule']}")
    
    return "\n".join(output)


## 1. Load v√† Parse Sigma Rules


In [74]:
# ƒê∆∞·ªùng d·∫´n t∆∞∆°ng ƒë·ªëi t·ª´ notebook
sigma_path = os.path.join(os.path.dirname(os.getcwd()), "test", "sigma", "rules", "web", "webserver_generic")
# Ho·∫∑c ƒë∆∞·ªùng d·∫´n tuy·ªát ƒë·ªëi
if not os.path.exists(sigma_path):
    sigma_path = r"D:\MCPLLM\test\sigma\rules\web\webserver_generic"

# Ki·ªÉm tra th∆∞ m·ª•c c√≥ t·ªìn t·∫°i kh√¥ng
if not os.path.exists(sigma_path):
    print(f"Error: Th∆∞ m·ª•c kh√¥ng t·ªìn t·∫°i: {sigma_path}")
    print(f"ƒê∆∞·ªùng d·∫´n tuy·ªát ƒë·ªëi: {os.path.abspath(sigma_path)}")
    sigma_raw_docs = []
else:
    try:
        sigma_loader = DirectoryLoader(
            path=sigma_path,
            glob="**/*.yml",
            loader_cls=TextLoader,
            loader_kwargs={'encoding': 'utf-8'}
        )
        sigma_raw_docs = sigma_loader.load()
        print(f"‚úÖ ƒê√£ load {len(sigma_raw_docs)} file(s)")
        # In th√¥ng tin m·ªôt v√†i file ƒë·∫ßu ti√™n
        for i, doc in enumerate(sigma_raw_docs[:3]):
            print(f"\nFile {i+1}: {doc.metadata.get('source', 'N/A')}")
            print(f"ƒê·ªô d√†i n·ªôi dung: {len(doc.page_content)} k√Ω t·ª±")
    except Exception as e:
        print(f"‚ùå L·ªói khi load: {e}")
        sigma_raw_docs = []


‚úÖ ƒê√£ load 13 file(s)

File 1: d:\MCPLLM\test\sigma\rules\web\webserver_generic\web_f5_tm_utility_bash_api_request.yml
ƒê·ªô d√†i n·ªôi dung: 1091 k√Ω t·ª±

File 2: d:\MCPLLM\test\sigma\rules\web\webserver_generic\web_iis_tilt_shortname_scan.yml
ƒê·ªô d√†i n·ªôi dung: 902 k√Ω t·ª±

File 3: d:\MCPLLM\test\sigma\rules\web\webserver_generic\web_java_payload_in_access_logs.yml
ƒê·ªô d√†i n·ªôi dung: 1191 k√Ω t·ª±


In [75]:
# X·ª≠ l√Ω YAML v√† t·∫°o processed docs
sigma_docs_processed = []

if sigma_raw_docs:
    print("="*50)
    print("X·ª≠ l√Ω v√† parse YAML...")
    print("="*50)
    
    for doc in sigma_raw_docs:
        try:
            parsed_yaml = yaml.safe_load(doc.page_content)
            
            if parsed_yaml:
                # T·∫°o summary content chi ti·∫øt h∆°n
                title = parsed_yaml.get('title', 'N/A')
                description = parsed_yaml.get('description', 'N/A')
                level = parsed_yaml.get('level', 'N/A')
                status = parsed_yaml.get('status', 'N/A')
                
                summary_content = f"Sigma Rule: {title}\nStatus: {status} | Level: {level}\nDescription: {description}"
                
                # Extract detection keywords n·∫øu c√≥
                detection = parsed_yaml.get('detection', {})
                keywords = detection.get('keywords', []) if isinstance(detection, dict) else []
                if keywords:
                    keywords_preview = keywords[:3] if len(keywords) > 3 else keywords
                    summary_content += f"\nKeywords: {', '.join(str(k) for k in keywords_preview)}"
                    if len(keywords) > 3:
                        summary_content += f" (+{len(keywords)-3} more)"
                
                new_doc = Document(
                    page_content=summary_content,
                    metadata={
                        "source": doc.metadata.get('source'),
                        "full_rule": doc.page_content,
                        # Th√¥ng tin c∆° b·∫£n
                        "title": parsed_yaml.get('title'),
                        "id": parsed_yaml.get('id'),
                        "status": parsed_yaml.get('status'),
                        "level": parsed_yaml.get('level'),
                        "description": parsed_yaml.get('description'),
                        # Th√¥ng tin t√°c gi·∫£ v√† ng√†y th√°ng
                        "author": parsed_yaml.get('author'),
                        "date": str(parsed_yaml.get('date', '')),
                        "modified": str(parsed_yaml.get('modified', '')),
                        # Tags v√† categories
                        "tags": parsed_yaml.get('tags', []),
                        # Logsource
                        "logsource": parsed_yaml.get('logsource', {}),
                        # Detection rules
                        "detection": parsed_yaml.get('detection', {}),
                        "detection_keywords": keywords,
                        "detection_keywords_count": len(keywords),
                        # References v√† false positives
                        "references": parsed_yaml.get('references', []),
                        "falsepositives": parsed_yaml.get('falsepositives', []),
                    }
                )
                sigma_docs_processed.append(new_doc)
            else:
                print(f"Warning: Kh√¥ng parse ƒë∆∞·ª£c YAML t·ª´ {doc.metadata.get('source')}")
        except Exception as e:
            print(f"L·ªói khi parse YAML t·ª´ {doc.metadata.get('source')}: {e}")
    
    print(f"\n‚úÖ ƒê√£ x·ª≠ l√Ω {len(sigma_docs_processed)} document(s)")
else:
    print("\n‚ö†Ô∏è Kh√¥ng c√≥ documents n√†o ƒë·ªÉ x·ª≠ l√Ω.")


X·ª≠ l√Ω v√† parse YAML...

‚úÖ ƒê√£ x·ª≠ l√Ω 13 document(s)


## 2. Hi·ªÉn th·ªã Metadata (Optional)


In [76]:
# Xem m·ªôt s·ªë docs ƒë√£ processed (optional)
if sigma_docs_processed:
    for i, doc in enumerate(sigma_docs_processed[:2]):
        print(f"\n{'='*60}")
        print(f"--- Doc {i+1} ---")
        print(f"{'='*60}")
        print(f"\nüìÑ CONTENT:\n{doc.page_content}")
        print(f"\nüìã METADATA:")
        metadata_clean = {k: v for k, v in doc.metadata.items() if k != 'full_rule'}
        print(json.dumps(metadata_clean, indent=2, ensure_ascii=False))



--- Doc 1 ---

üìÑ CONTENT:
Sigma Rule: F5 BIG-IP iControl Rest API Command Execution - Webserver
Status: test | Level: medium
Description: Detects POST requests to the F5 BIG-IP iControl Rest API "bash" endpoint, which allows the execution of commands on the BIG-IP

üìã METADATA:
{
  "source": "d:\\MCPLLM\\test\\sigma\\rules\\web\\webserver_generic\\web_f5_tm_utility_bash_api_request.yml",
  "title": "F5 BIG-IP iControl Rest API Command Execution - Webserver",
  "id": "85254a62-22be-4239-b79c-2ec17e566c37",
  "status": "test",
  "level": "medium",
  "description": "Detects POST requests to the F5 BIG-IP iControl Rest API \"bash\" endpoint, which allows the execution of commands on the BIG-IP",
  "author": "Nasreddine Bencherchali (Nextron Systems), Thurein Oo",
  "date": "2023-11-08",
  "modified": "",
  "tags": [
    "attack.execution",
    "attack.t1190",
    "attack.initial-access"
  ],
  "logsource": {
    "category": "webserver"
  },
  "detection": {
    "selection": {
      "

## 3. Embedding v√† ChromaDB Setup


In [92]:
import numpy as np
from sklearn.manifold import TSNE
import plotly.graph_objects as go

In [77]:
# Kh·ªüi t·∫°o embedding model (d√πng HuggingFace local)
print("üì• ƒêang t·∫£i embedding model...")
embeddings = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2",
    model_kwargs={'device': 'cpu'},
    encode_kwargs={'normalize_embeddings': True}
)
print("‚úÖ Embedding model ƒë√£ s·∫µn s√†ng")


üì• ƒêang t·∫£i embedding model...
‚úÖ Embedding model ƒë√£ s·∫µn s√†ng


In [78]:
# T·∫°o ƒë∆∞·ªùng d·∫´n l∆∞u ChromaDB
persist_directory = r"D:\MCPLLM\test\chroma_db"
os.makedirs(persist_directory, exist_ok=True)

# Chu·∫©n b·ªã documents v·ªõi metadata t∆∞∆°ng th√≠ch ChromaDB
print(f"üíæ ƒêang t·∫°o/load ChromaDB t·∫°i: {persist_directory}")

# Ki·ªÉm tra all_docs ƒë√£ ƒë∆∞·ª£c t·∫°o ch∆∞a
if 'all_docs' not in locals() or not all_docs:
    print("‚ö†Ô∏è  Ch∆∞a c√≥ all_docs. H√£y ch·∫°y cell 'Text Splitting v√† Combine All Documents' tr∆∞·ªõc.")
    print("    ƒêang d√πng sigma_docs_processed t·∫°m th·ªùi...")
    docs_to_process = sigma_docs_processed if 'sigma_docs_processed' in locals() and sigma_docs_processed else []
else:
    docs_to_process = all_docs

if not docs_to_process:
    print("‚ùå Kh√¥ng c√≥ documents n√†o ƒë·ªÉ embed!")
    vectorstore = None
else:
    print(f"üìù ƒêang chu·∫©n b·ªã {len(docs_to_process)} documents ƒë·ªÉ embed...")
    
    # Convert list/dict trong metadata th√†nh string cho ChromaDB
    docs_for_chroma = []
    for doc in docs_to_process:
        new_metadata = {}
        for key, value in doc.metadata.items():
            if isinstance(value, (list, dict)):
                # Convert list/dict th√†nh JSON string
                new_metadata[key] = json.dumps(value, ensure_ascii=False)
            elif value is None:
                continue  # Skip None values
            else:
                new_metadata[key] = value
        
        # T·∫°o document m·ªõi v·ªõi metadata ƒë√£ x·ª≠ l√Ω
        new_doc = Document(
            page_content=doc.page_content,
            metadata=new_metadata
        )
        docs_for_chroma.append(new_doc)
    
    # Filter complex metadata m·ªôt l·∫ßn n·ªØa ƒë·ªÉ ch·∫Øc ch·∫Øn
    docs_for_chroma = filter_complex_metadata(docs_for_chroma)
    
    print(f"üìä Th·ªëng k√™ documents:")
    if 'all_docs' in locals() and all_docs:
        sigma_count = sum(1 for d in docs_for_chroma if d.metadata.get('source_type') == 'sigma_rule')
        mitre_count = sum(1 for d in docs_for_chroma if d.metadata.get('source_type') == 'mitre_attack')
        owasp_count = sum(1 for d in docs_for_chroma if d.metadata.get('source_type') == 'owasp_cheatsheet')
        print(f"   - Sigma rules: {sigma_count}")
        print(f"   - MITRE ATT&CK: {mitre_count}")
        print(f"   - OWASP cheatsheets: {owasp_count}")
    
    # T·∫°o ChromaDB vector store
    print(f"\nüîÑ ƒêang embed v√† l∆∞u v√†o ChromaDB...")
    vectorstore = Chroma.from_documents(
        documents=docs_for_chroma,
        embedding=embeddings,
        persist_directory=persist_directory,
        collection_name="security_knowledge_base"  # T√™n collection m·ªõi cho t·∫•t c·∫£
    )
    print(f"‚úÖ ƒê√£ l∆∞u {len(docs_for_chroma)} documents v√†o ChromaDB")


üíæ ƒêang t·∫°o/load ChromaDB t·∫°i: D:\MCPLLM\test\chroma_db
üìù ƒêang chu·∫©n b·ªã 3358 documents ƒë·ªÉ embed...
üìä Th·ªëng k√™ documents:
   - Sigma rules: 13
   - MITRE ATT&CK: 13
   - OWASP cheatsheets: 3332

üîÑ ƒêang embed v√† l∆∞u v√†o ChromaDB...
‚úÖ ƒê√£ l∆∞u 3358 documents v√†o ChromaDB


In [89]:
print(f"Vectorstore created with {vectorstore._collection.count()} documents")


Vectorstore created with 3358 documents


In [90]:
# L·∫•y ra b·ªô s∆∞u t·∫≠p vector t·ª´ vectorstore
collection = vectorstore._collection

# L·∫•y 1 embedding t·ª´ database
sample_embedding = collection.get(limit=1, include=["embeddings"])["embeddings"][0]

# Ki·ªÉm tra s·ªë chi·ªÅu (s·ªë ph·∫ßn t·ª≠ trong vector)
dimensions = len(sample_embedding)
print(f"The vectors have {dimensions:,} dimensions")

The vectors have 384 dimensions


In [91]:
sample_embedding

array([ 5.86870778e-03, -1.05241379e-02, -7.53515400e-04, -4.48323525e-02,
        1.37020694e-02, -5.52320853e-02, -4.34750579e-02, -2.44987961e-02,
       -1.93930585e-02,  6.64397404e-02, -3.66570130e-02, -4.55308333e-03,
       -2.87122764e-02, -5.15020937e-02,  4.28327583e-02,  4.38962458e-03,
        6.31853193e-02, -7.86684081e-02, -5.37194610e-02, -6.05001934e-02,
        3.53835449e-02, -1.62381884e-02, -1.12417797e-02, -1.24211917e-02,
       -1.33748949e-01,  1.00116376e-02, -5.86728416e-02, -3.09609696e-02,
       -2.88542069e-04, -9.58073884e-02, -3.07648070e-02, -7.13889953e-03,
       -8.03151447e-03,  5.32922857e-02,  1.12462575e-02,  6.91950396e-02,
        1.06334150e-01, -9.58079621e-02, -3.97749059e-03,  1.85524039e-02,
        2.87265740e-02, -7.92047828e-02, -2.40437035e-02, -2.27804184e-02,
        5.24348617e-02, -4.26563658e-02, -5.10982834e-02, -2.56277006e-02,
       -3.58699970e-02, -8.40453082e-04, -1.38057023e-02, -4.35068384e-02,
       -2.04492863e-02, -

In [95]:
# L·∫•y to√†n b·ªô vector, t√†i li·ªáu v√† metadata t·ª´ collection
result = collection.get(include=['embeddings', 'documents', 'metadatas'])

# ƒê∆∞a embedding v√†o m·∫£ng numpy
vectors = np.array(result['embeddings'])

# L∆∞u l·∫°i vƒÉn b·∫£n
documents = result['documents']

# Tr√≠ch lo·∫°i t√†i li·ªáu t·ª´ metadata (gi·∫£ s·ª≠ c√≥ 'doc_type')
doc_types = [metadata['source_type'] for metadata in result['metadatas']]

# G√°n m√†u s·∫Øc t√πy theo lo·∫°i t√†i li·ªáu
colors = [['blue', 'green', 'red'][['sigma_rule', 'mitre_attack', 'owasp_cheatsheet'].index(t)] for t in doc_types]

In [96]:
# Con ng∆∞·ªùi ch√∫ng ta d·ªÖ h√¨nh dung m·ªçi th·ª© trong kh√¥ng gian 2D h∆°n!
# Gi·∫£m s·ªë chi·ªÅu c·ªßa vector xu·ªëng 2D b·∫±ng t-SNE
# (T-distributed Stochastic Neighbor Embedding)

tsne = TSNE(n_components=2, random_state=42)
reduced_vectors = tsne.fit_transform(vectors)

# T·∫°o bi·ªÉu ƒë·ªì scatter 2D
fig = go.Figure(data=[go.Scatter(
    x=reduced_vectors[:, 0],
    y=reduced_vectors[:, 1],
    mode='markers',
    marker=dict(size=5, color=colors, opacity=0.8),
    text=[f"Lo·∫°i: {t}<br>VƒÉn b·∫£n: {d[:100]}..." for t, d in zip(doc_types, documents)],
    hoverinfo='text'
)])

fig.update_layout(
    title='Bi·ªÉu ƒë·ªì 2D Chroma Vector Store',
    scene=dict(xaxis_title='x', yaxis_title='y'),
    width=800,
    height=600,
    margin=dict(r=20, b=10, l=10, t=40)
)

fig.show(renderer="browser")

In [84]:
## 9. Query v√† Test ChromaDB

# Test queries v·ªõi c√°c lo·∫°i documents kh√°c nhau
if 'vectorstore' in locals() and vectorstore:
    print("="*100)
    print("üîç QUERY TEST - T√¨m ki·∫øm trong t·∫•t c·∫£ documents (Sigma, MITRE, OWASP)")
    print("="*100)
    
    # H√†m balanced query ƒë·ªÉ c√¢n b·∫±ng k·∫øt qu·∫£ t·ª´ m·ªói lo·∫°i
    def balanced_query(vectorstore, query, k_per_source=3):
        """Query c√¢n b·∫±ng: l·∫•y k·∫øt qu·∫£ t·ª´ m·ªói source type ri√™ng bi·ªát"""
        all_results = []
        source_types = ['sigma_rule', 'mitre_attack', 'owasp_cheatsheet']
        
        for source_type in source_types:
            try:
                results = vectorstore.similarity_search_with_score(
                    query=query,
                    k=k_per_source,
                    filter={"source_type": source_type}
                )
                all_results.extend(results)
            except Exception as e:
                print(f"  ‚ö†Ô∏è L·ªói khi query {source_type}: {e}")
        
        # S·∫Øp x·∫øp l·∫°i theo score (score c√†ng th·∫•p c√†ng t·ªët)
        all_results.sort(key=lambda x: x[1])
        return all_results
    
    # Test Query 1: SQL Injection
    print("\n\n[Query 1] SQL Injection")
    print("-"*100)
    query1 = "SQL injection attack detection"
    results1 = balanced_query(vectorstore, query1, k_per_source=3)
    print(f"üîé Query: '{query1}'")
    print(f"üìä T√¨m th·∫•y {len(results1)} k·∫øt qu·∫£\n")
    
    for i, (doc, score) in enumerate(results1, 1):
        source_type = doc.metadata.get('source_type', 'unknown')
        print(f"[{i}] Score: {score:.4f} | Source: {source_type}")
        
        if source_type == 'sigma_rule':
            print(f"    Title: {doc.metadata.get('title', 'N/A')}")
            print(f"    Level: {doc.metadata.get('level', 'N/A')}")
        elif source_type == 'mitre_attack':
            print(f"    Technique: {doc.metadata.get('technique_id', 'N/A')} - {doc.metadata.get('technique_name', 'N/A')}")
        elif source_type == 'owasp_cheatsheet':
            print(f"    Cheatsheet: {doc.metadata.get('doc_type', 'N/A')}")
        
        # Preview content
        content_preview = doc.page_content if len(doc.page_content) > 200 else doc.page_content
        print(f"    Preview: {content_preview}...")
        print()
    
    # Test Query 2: Authentication
    print("\n\n[Query 2] Authentication Security")
    print("-"*100)
    query2 = "authentication security best practices"
    results2 = vectorstore.similarity_search_with_score(query2, k=5)
    print(f"üîé Query: '{query2}'")
    print(f"üìä T√¨m th·∫•y {len(results2)} k·∫øt qu·∫£\n")
    
    for i, (doc, score) in enumerate(results2, 1):
        source_type = doc.metadata.get('source_type', 'unknown')
        print(f"[{i}] Score: {score:.4f} | Source: {source_type}")
        
        if source_type == 'sigma_rule':
            print(f"    Title: {doc.metadata.get('title', 'N/A')}")
        elif source_type == 'mitre_attack':
            print(f"    Technique: {doc.metadata.get('technique_id', 'N/A')} - {doc.metadata.get('technique_name', 'N/A')}")
        elif source_type == 'owasp_cheatsheet':
            print(f"    Cheatsheet: {doc.metadata.get('doc_type', 'N/A')}")
        
        content_preview = doc.page_content[:200] if len(doc.page_content) > 200 else doc.page_content
        print(f"    Preview: {content_preview}...")
        print()
    
    # Test Query 3: Credential Dumping
    print("\n\n[Query 3] Credential Dumping")
    print("-"*100)
    query3 = "credential dumping technique"
    results3 = vectorstore.similarity_search_with_score(query3, k=5)
    print(f"üîé Query: '{query3}'")
    print(f"üìä T√¨m th·∫•y {len(results3)} k·∫øt qu·∫£\n")
    
    for i, (doc, score) in enumerate(results3, 1):
        source_type = doc.metadata.get('source_type', 'unknown')
        print(f"[{i}] Score: {score:.4f} | Source: {source_type}")
        
        if source_type == 'sigma_rule':
            print(f"    Title: {doc.metadata.get('title', 'N/A')}")
        elif source_type == 'mitre_attack':
            print(f"    Technique: {doc.metadata.get('technique_id', 'N/A')} - {doc.metadata.get('technique_name', 'N/A')}")
        elif source_type == 'owasp_cheatsheet':
            print(f"    Cheatsheet: {doc.metadata.get('doc_type', 'N/A')}")
        
        content_preview = doc.page_content if len(doc.page_content) > 200 else doc.page_content
        print(f"    Preview: {content_preview}...")
        print()
    
    # Test Query v·ªõi Metadata Filtering
    print("\n\n[Query 4] Filter by Source Type - MITRE ATT&CK only")
    print("-"*100)
    query4 = "data obfuscation"
    results4 = vectorstore.similarity_search(
        query=query4,
        k=3,
        filter={"source_type": "mitre_attack"}
    )
    print(f"üîé Query: '{query4}' (ch·ªâ MITRE ATT&CK)")
    print(f"üìä T√¨m th·∫•y {len(results4)} k·∫øt qu·∫£\n")
    
    for i, doc in enumerate(results4, 1):
        print(f"[{i}] Technique: {doc.metadata.get('technique_id', 'N/A')} - {doc.metadata.get('technique_name', 'N/A')}")
        content_preview = doc.page_content[:300] if len(doc.page_content) > 300 else doc.page_content
        print(f"    Content: {content_preview}...")
        print()
    
    print("="*100)
    print("‚úÖ HO√ÄN T·∫§T QUERY TEST")
    print("="*100)
    
else:
    print("‚ö†Ô∏è Ch∆∞a c√≥ vectorstore. H√£y ch·∫°y cell ChromaDB Setup tr∆∞·ªõc.")


üîç QUERY TEST - T√¨m ki·∫øm trong t·∫•t c·∫£ documents (Sigma, MITRE, OWASP)


[Query 1] SQL Injection
----------------------------------------------------------------------------------------------------
üîé Query: 'SQL injection attack detection'
üìä T√¨m th·∫•y 9 k·∫øt qu·∫£

[1] Score: 0.4950 | Source: owasp_cheatsheet
    Cheatsheet: Injection_Prevention_Cheat_Sheet.md
    Preview: An SQL injection attack consists of insertion or "injection" of either a partial or complete SQL query via the data input or transmitted from the client (browser) to the web application.

A successful SQL injection attack can read sensitive data from the database, modify database data (insert/update/delete), execute administration operations on the database (such as shutdown the DBMS), recover the content of a given file existing on the DBMS file system or write files into the file system, and, in some cases, issue commands to the operating system. SQL injection attacks are a type of injection attack

In [85]:
## 9.1. H√†m Balanced Query (C√¢n b·∫±ng k·∫øt qu·∫£ t·ª´ m·ªói lo·∫°i)

def balanced_query(vectorstore, query, k_per_source=3):
    """
    Query c√¢n b·∫±ng: l·∫•y k·∫øt qu·∫£ t·ª´ m·ªói source type ri√™ng bi·ªát
    Tr√°nh tr∆∞·ªùng h·ª£p OWASP chi·∫øm h·∫øt k·∫øt qu·∫£ do c√≥ qu√° nhi·ªÅu chunks
    
    Args:
        vectorstore: ChromaDB vector store
        query: Query string
        k_per_source: S·ªë k·∫øt qu·∫£ l·∫•y t·ª´ m·ªói lo·∫°i (default: 3)
    
    Returns:
        List of (doc, score) tuples, ƒë√£ s·∫Øp x·∫øp theo score
    """
    all_results = []
    source_types = ['sigma_rule', 'mitre_attack', 'owasp_cheatsheet']
    
    for source_type in source_types:
        try:
            results = vectorstore.similarity_search_with_score(
                query=query,
                k=k_per_source,
                filter={"source_type": source_type}
            )
            all_results.extend(results)
        except Exception as e:
            print(f"  ‚ö†Ô∏è L·ªói khi query {source_type}: {e}")
    
    # S·∫Øp x·∫øp l·∫°i theo score (score c√†ng th·∫•p c√†ng t·ªët trong ChromaDB)
    all_results.sort(key=lambda x: x[1])
    
    return all_results

print("‚úÖ H√†m balanced_query ƒë√£ ƒë∆∞·ª£c ƒë·ªãnh nghƒ©a")
print("\nüí° S·ª≠ d·ª•ng:")
print("   results = balanced_query(vectorstore, 'SQL injection', k_per_source=3)")
print("   # S·∫Ω l·∫•y 3 k·∫øt qu·∫£ t·ª´ Sigma, 3 t·ª´ MITRE, 3 t·ª´ OWASP")


‚úÖ H√†m balanced_query ƒë√£ ƒë∆∞·ª£c ƒë·ªãnh nghƒ©a

üí° S·ª≠ d·ª•ng:
   results = balanced_query(vectorstore, 'SQL injection', k_per_source=3)
   # S·∫Ω l·∫•y 3 k·∫øt qu·∫£ t·ª´ Sigma, 3 t·ª´ MITRE, 3 t·ª´ OWASP


In [86]:
## 9.2. Query Test v·ªõi Balanced Query

# ƒê·∫£m b·∫£o h√†m balanced_query ƒë√£ ƒë∆∞·ª£c ƒë·ªãnh nghƒ©a (ch·∫°y cell tr∆∞·ªõc)
if 'vectorstore' in locals() and vectorstore:
    print("="*100)
    print("üîç QUERY TEST - C√¢n b·∫±ng k·∫øt qu·∫£ t·ª´ Sigma, MITRE, OWASP")
    print("="*100)
    
    # Test Query 1: SQL Injection
    print("\n\n[Query 1] SQL Injection")
    print("-"*100)
    query1 = "SQL injection attack detection"
    results1 = balanced_query(vectorstore, query1, k_per_source=3)
    print(f"üîé Query: '{query1}'")
    print(f"üìä T√¨m th·∫•y {len(results1)} k·∫øt qu·∫£ (3 t·ª´ m·ªói lo·∫°i)\n")
    
    # Th·ªëng k√™ theo source type
    sigma_count = sum(1 for d, _ in results1 if d.metadata.get('source_type') == 'sigma_rule')
    mitre_count = sum(1 for d, _ in results1 if d.metadata.get('source_type') == 'mitre_attack')
    owasp_count = sum(1 for d, _ in results1 if d.metadata.get('source_type') == 'owasp_cheatsheet')
    print(f"üìä Ph√¢n b·ªë: Sigma={sigma_count}, MITRE={mitre_count}, OWASP={owasp_count}\n")
    
    for i, (doc, score) in enumerate(results1[:9], 1):  # Hi·ªÉn th·ªã t·ªëi ƒëa 9 (3 t·ª´ m·ªói lo·∫°i)
        source_type = doc.metadata.get('source_type', 'unknown')
        print(f"[{i}] Score: {score:.4f} | Source: {source_type}")
        
        if source_type == 'sigma_rule':
            print(f"    Title: {doc.metadata.get('title', 'N/A')}")
            print(f"    Level: {doc.metadata.get('level', 'N/A')}")
        elif source_type == 'mitre_attack':
            print(f"    Technique: {doc.metadata.get('technique_id', 'N/A')} - {doc.metadata.get('technique_name', 'N/A')}")
        elif source_type == 'owasp_cheatsheet':
            print(f"    Cheatsheet: {doc.metadata.get('doc_type', 'N/A')}")
        
        # Preview content
        content_preview = doc.page_content[:150] if len(doc.page_content) > 150 else doc.page_content
        print(f"    Preview: {content_preview}...")
        print()
    
    # Test Query 2: XSS
    print("\n\n[Query 2] XSS Attack")
    print("-"*100)
    query2 = "cross-site scripting XSS prevention"
    results2 = balanced_query(vectorstore, query2, k_per_source=3)
    print(f"üîé Query: '{query2}'")
    print(f"üìä T√¨m th·∫•y {len(results2)} k·∫øt qu·∫£ (3 t·ª´ m·ªói lo·∫°i)\n")
    
    # Th·ªëng k√™
    sigma_count2 = sum(1 for d, _ in results2 if d.metadata.get('source_type') == 'sigma_rule')
    mitre_count2 = sum(1 for d, _ in results2 if d.metadata.get('source_type') == 'mitre_attack')
    owasp_count2 = sum(1 for d, _ in results2 if d.metadata.get('source_type') == 'owasp_cheatsheet')
    print(f"üìä Ph√¢n b·ªë: Sigma={sigma_count2}, MITRE={mitre_count2}, OWASP={owasp_count2}\n")
    
    for i, (doc, score) in enumerate(results2[:9], 1):
        source_type = doc.metadata.get('source_type', 'unknown')
        print(f"[{i}] Score: {score:.4f} | Source: {source_type}")
        
        if source_type == 'sigma_rule':
            print(f"    Title: {doc.metadata.get('title', 'N/A')}")
        elif source_type == 'mitre_attack':
            print(f"    Technique: {doc.metadata.get('technique_id', 'N/A')} - {doc.metadata.get('technique_name', 'N/A')}")
        elif source_type == 'owasp_cheatsheet':
            print(f"    Cheatsheet: {doc.metadata.get('doc_type', 'N/A')}")
        print()
    
    print("="*100)
    print("‚úÖ HO√ÄN T·∫§T QUERY TEST")
    print("="*100)
    print("\nüí° L∆∞u √Ω: Query c√¢n b·∫±ng ƒë·∫£m b·∫£o c√≥ k·∫øt qu·∫£ t·ª´ c·∫£ 3 lo·∫°i (Sigma, MITRE, OWASP)")
    print("   Tr√°nh tr∆∞·ªùng h·ª£p OWASP chi·∫øm h·∫øt do c√≥ qu√° nhi·ªÅu chunks.")
    
else:
    print("‚ö†Ô∏è Ch∆∞a c√≥ vectorstore. H√£y ch·∫°y cell ChromaDB Setup tr∆∞·ªõc.")
    print("‚ö†Ô∏è Ho·∫∑c ch∆∞a ƒë·ªãnh nghƒ©a balanced_query. H√£y ch·∫°y cell 'H√†m Balanced Query' tr∆∞·ªõc.")


üîç QUERY TEST - C√¢n b·∫±ng k·∫øt qu·∫£ t·ª´ Sigma, MITRE, OWASP


[Query 1] SQL Injection
----------------------------------------------------------------------------------------------------
üîé Query: 'SQL injection attack detection'
üìä T√¨m th·∫•y 9 k·∫øt qu·∫£ (3 t·ª´ m·ªói lo·∫°i)

üìä Ph√¢n b·ªë: Sigma=3, MITRE=3, OWASP=3

[1] Score: 0.4950 | Source: owasp_cheatsheet
    Cheatsheet: Injection_Prevention_Cheat_Sheet.md
    Preview: An SQL injection attack consists of insertion or "injection" of either a partial or complete SQL query via the data input or transmitted from the clie...

[2] Score: 0.5301 | Source: owasp_cheatsheet
    Cheatsheet: Injection_Prevention_Cheat_Sheet.md
    Preview: SQL Injection attacks can be divided into the following three classes:

- **Inband:**√Ç¬†data is extracted using the same channel that is used to inject...

[3] Score: 0.5837 | Source: owasp_cheatsheet
    Cheatsheet: SQL_Injection_Prevention_Cheat_Sheet.md
    Preview: ## What Is a SQL 

In [10]:
# Format document cho LLM
sample_query = "Java payload attack"
sample_results = vectorstore.similarity_search(sample_query, k=1)

if sample_results:
    doc = sample_results[0]
    print("üìù Document formatted for LLM:")
    print("="*60)
    formatted = format_doc_for_llm(doc, include_full_rule=False)
    print(formatted)


üìù Document formatted for LLM:
Title: Java Payload Strings
ID: 583aa0a2-30b1-4d62-8bf3-ab73689efe6c
Status: test
Level: high
Description: Detects possible Java payloads in web access logs
Author: frack113, Harjot Singh, "@cyb3rjy0t" (update)
Tags: cve.2022-26134, cve.2021-26084, attack.initial-access, attack.t1190
Log Source: {"category": "webserver"}
Detection Keywords: %24%7B%28%23a%3D%40, ${(#a=@, %24%7B%40java, ${@java, u0022java
  (+ 5 more keywords)
Detection Condition: keywords
References: 5 reference(s)
  - https://www.rapid7.com/blog/post/2022/06/02/active-exploitation-of-confluence-cve-2022-26134/
  - https://www.rapid7.com/blog/post/2021/09/02/active-exploitation-of-confluence-server-cve-2021-26084/
  - https://github.com/httpvoid/writeups/blob/62d3751945289d088ccfdf4d0ffbf61598a2cd7d/Confluence-RCE.md
  ... v√† 2 reference(s) kh√°c

Content:
Sigma Rule: Java Payload Strings
Status: test | Level: high
Description: Detects possible Java payloads in web access logs
Keywords:

In [11]:
# Query v·ªõi metadata filtering
print("üîç T√¨m rules c√≥ level='high':")
high_level_docs = vectorstore.similarity_search(
    query="security detection",
    k=5,
    filter={"level": "high"}
)
print(f"T√¨m th·∫•y {len(high_level_docs)} rules v·ªõi level=high")
for doc in high_level_docs[:3]:
    print(f"  - {doc.metadata.get('title', 'N/A')} (Level: {doc.metadata.get('level', 'N/A')})")


üîç T√¨m rules c√≥ level='high':
T√¨m th·∫•y 5 rules v·ªõi level=high
  - Suspicious Windows Strings In URI (Level: high)
  - Suspicious Windows Strings In URI (Level: high)
  - Suspicious Windows Strings In URI (Level: high)


## 5. Custom Query

Th·ª≠ query c·ªßa b·∫°n ·ªü ƒë√¢y:


## 6. Load MITRE ATT&CK Techniques


In [12]:
# Load MITRE ATT&CK Techniques t·ª´ web
def get_mitre_technique_urls(base_url="https://attack.mitre.org/techniques/enterprise/"):
    """L·∫•y danh s√°ch URL c·ªßa c√°c techniques t·ª´ MITRE ATT&CK"""
    try:
        response = requests.get(base_url, timeout=30)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, 'html.parser')
        
        urls = set()
        # T√¨m t·∫•t c·∫£ links ƒë·∫øn techniques (c√≥ pattern /techniques/Txxxx)
        for link in soup.find_all('a', href=True):
            href = link['href']
            if '/techniques/T' in href:
                full_url = urljoin(base_url, href)
                # Ch·ªâ l·∫•y techniques, kh√¥ng l·∫•y sub-techniques (c√≥ .xxx)
                if '/techniques/T' in full_url and '.001' not in full_url:
                    urls.add(full_url)
        
        return sorted(list(urls))
    except Exception as e:
        print(f"‚ùå L·ªói khi l·∫•y danh s√°ch URLs: {e}")
        return []

print("üì• ƒêang l·∫•y danh s√°ch MITRE ATT&CK technique URLs...")
mitre_urls = get_mitre_technique_urls()
print(f"‚úÖ T√¨m th·∫•y {len(mitre_urls)} techniques")

# Ch·ªâ load m·ªôt s·ªë techniques ƒë·∫ßu ti√™n ƒë·ªÉ test (c√≥ th·ªÉ b·ªè limit sau)
mitre_urls_sample = mitre_urls[:10]  # Test v·ªõi 10 techniques ƒë·∫ßu ti√™n
print(f"üìù S·∫Ω load {len(mitre_urls_sample)} techniques (sample)")


üì• ƒêang l·∫•y danh s√°ch MITRE ATT&CK technique URLs...
‚úÖ T√¨m th·∫•y 691 techniques
üìù S·∫Ω load 10 techniques (sample)


In [19]:
print(mitre_urls_sample)

['https://attack.mitre.org/techniques/T1001', 'https://attack.mitre.org/techniques/T1001/001', 'https://attack.mitre.org/techniques/T1001/002', 'https://attack.mitre.org/techniques/T1001/003', 'https://attack.mitre.org/techniques/T1003', 'https://attack.mitre.org/techniques/T1003/001', 'https://attack.mitre.org/techniques/T1003/002', 'https://attack.mitre.org/techniques/T1003/003', 'https://attack.mitre.org/techniques/T1003/004', 'https://attack.mitre.org/techniques/T1003/005']


In [48]:
import requests
from bs4 import BeautifulSoup
from langchain_core.documents import Document
import time
import re

def scrape_mitre_technique(url):
    """
    Scrape th√¥ng tin chi ti·∫øt t·ª´ng Technique t·ª´ MITRE ATT&CK
    Format t·ªëi ∆∞u cho text splitting:
    T1548
    Abuse Elevation Control Mechanism
    [Description text...]
    """
    try:
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
        response = requests.get(url, headers=headers, timeout=15)
        response.raise_for_status()
        
        soup = BeautifulSoup(response.content, 'html.parser')
        
        # Extract Technique ID t·ª´ URL
        url_parts = url.rstrip('/').split('/')
        if 'techniques' in url_parts:
            tech_idx = url_parts.index('techniques')
            if tech_idx + 1 < len(url_parts):
                tech_id_raw = url_parts[tech_idx + 1]
                if len(url_parts) > tech_idx + 2:
                    sub_id = url_parts[tech_idx + 2]
                    technique_id = f"{tech_id_raw}.{sub_id}"
                else:
                    technique_id = tech_id_raw
            else:
                technique_id = None
        else:
            technique_id = None
        
        # Extract Technique Name t·ª´ h1
        h1 = soup.find('h1')
        if h1:
            title_full = h1.get_text(strip=True)
            title_full = re.sub(r'\s*[-|]\s*MITRE ATT&CK.*$', '', title_full, flags=re.IGNORECASE)
            title_full = re.sub(r'Sub-technique\s+\w+\.\d+\s*[:]\s*', '', title_full, flags=re.IGNORECASE)
            title_full = re.sub(r'Technique\s+\w+\s*[:]\s*', '', title_full, flags=re.IGNORECASE)
            technique_name = title_full.strip()
        else:
            technique_name = "Unknown"
        
        # Extract Description
        description = ""
        desc_div = soup.find('div', class_='description-body')
        if desc_div:
            description = desc_div.get_text(separator=' ', strip=True)
            # Clean up: lo·∫°i b·ªè [1], [2] references nh∆∞ng gi·ªØ n·ªôi dung
            description = re.sub(r'\[\d+\]', '', description)
            description = ' '.join(description.split())  # Normalize whitespace
        else:
            desc_divs = soup.find_all('div', class_='card-data')
            for div in desc_divs:
                if div.get_text(strip=True):
                    description = div.get_text(separator=' ', strip=True)
                    description = re.sub(r'\[\d+\]', '', description)
                    description = ' '.join(description.split())
                    break
        
        if not description and h1:
            next_elem = h1.find_next_sibling()
            while next_elem:
                if next_elem.name in ['p', 'div']:
                    text = next_elem.get_text(strip=True)
                    if text and len(text) > 50:
                        description = ' '.join(text.split())
                        break
                next_elem = next_elem.find_next_sibling()
        
        # Format theo c·∫•u tr√∫c file m·∫´u: ID\nName\nDescription
        # T·ªëi ∆∞u cho text splitting - m·ªói technique l√† m·ªôt document ri√™ng
        if technique_id and technique_name and description:
            # Format: T1548\nName\nDescription (gi·ªëng file m·∫´u)
            content = f"{description}"
        else:
            content = f"{description}" if description else ""
        
        return {
            'technique_id': technique_id,
            'technique_name': technique_name,
            'description': description,
            'url': url,
            'content': content
        }
    except Exception as e:
        print(f"Error scraping {url}: {e}")
        import traceback
        traceback.print_exc()
        return None

# Load MITRE ATT&CK techniques v·ªõi format t·ªëi ∆∞u
print("üîÑ ƒêang load MITRE ATT&CK techniques (format t·ªëi ∆∞u cho text splitting)...")
mitre_docs = []

for i, url in enumerate(mitre_urls_sample, 1):
    url_id = url.rstrip('/').split('/')[-1]
    print(f"[{i}/{len(mitre_urls_sample)}] {url_id}", end=" ")
    
    result = scrape_mitre_technique(url)
    
    if result and result['content']:
        doc = Document(
            page_content=result['content'],
            metadata={
                'source_type': 'mitre_attack',
                'source_url': result['url'],
                'technique_id': result['technique_id'],
                'technique_name': result['technique_name'],
                'description_length': len(result['description']) if result['description'] else 0
            }
        )
        mitre_docs.append(doc)
        desc_len = len(result['description']) if result['description'] else 0
        print(f"‚úÖ {desc_len} chars")
    else:
        print("‚ùå")
    
    time.sleep(0.5)

print(f"\n‚úÖ Loaded: {len(mitre_docs)} MITRE ATT&CK documents")
if mitre_docs:
    print(f"\nüìÑ Sample document (format t·ªëi ∆∞u):")
    sample = mitre_docs[0]
    print(f"  Technique ID: {sample.metadata.get('technique_id', 'N/A')}")
    print(f"  Technique Name: {sample.metadata.get('technique_name', 'N/A')}")
    print(f"\n  Content (format gi·ªëng file m·∫´u):")
    print(f"  {'-'*60}")
    print(sample.page_content)
    print(f"  {'-'*60}")

üîÑ ƒêang load MITRE ATT&CK techniques (format t·ªëi ∆∞u cho text splitting)...
[1/10] T1001 ‚úÖ 465 chars
[2/10] 001 ‚úÖ 419 chars
[3/10] 002 ‚úÖ 452 chars
[4/10] 003 ‚úÖ 855 chars
[5/10] T1003 ‚úÖ 481 chars
[6/10] 001 ‚úÖ 2174 chars
[7/10] 002 ‚úÖ 847 chars
[8/10] 003 ‚úÖ 710 chars
[9/10] 004 ‚úÖ 437 chars
[10/10] 005 ‚úÖ 1226 chars

‚úÖ Loaded: 10 MITRE ATT&CK documents

üìÑ Sample document (format t·ªëi ∆∞u):
  Technique ID: T1001
  Technique Name: Data Obfuscation

  Content (format gi·ªëng file m·∫´u):
  ------------------------------------------------------------
Adversaries may obfuscate command and control traffic to make it more difficult to detect. Command and control (C2) communications are hidden (but not necessarily encrypted) in an attempt to make the content more difficult to discover or decipher and to make the communication less conspicuous and hide commands from being seen. This encompasses many methods, such as adding junk data to protocol traffic, using steganogr

In [33]:
# Xem n·ªôi dung document chi ti·∫øt
def view_document(doc, doc_index=None):
    """Hi·ªÉn th·ªã n·ªôi dung document m·ªôt c√°ch chi ti·∫øt"""
    print("=" * 80)
    if doc_index is not None:
        print(f"üìÑ DOCUMENT #{doc_index}")
    else:
        print(f"üìÑ DOCUMENT")
    print("=" * 80)
    
    print("\nüìã METADATA:")
    print("-" * 80)
    for key, value in doc.metadata.items():
        if isinstance(value, str) and len(value) > 500:
            print(f"  {key}: {value[:500]}...")
        else:
            print(f"  {key}: {value}")
    
    print("\nüìù CONTENT:")
    print("-" * 80)
    print(doc.page_content)
    print("=" * 80)
    print(f"\nüìä Th·ªëng k√™:")
    print(f"  - Content length: {len(doc.page_content)} chars")
    print(f"  - Content lines: {len(doc.page_content.splitlines())} lines")
    print(f"  - Metadata fields: {len(doc.metadata)}")

# Xem document ƒë·∫ßu ti√™n
if 'mitre_docs' in locals() and mitre_docs:
    print("üîç Xem document ƒë·∫ßu ti√™n:\n")
    view_document(mitre_docs[0], doc_index=0)
else:
    print("‚ö†Ô∏è Ch∆∞a c√≥ mitre_docs. H√£y ch·∫°y cell load MITRE ATT&CK techniques tr∆∞·ªõc.")

üîç Xem document ƒë·∫ßu ti√™n:

üìÑ DOCUMENT #0

üìã METADATA:
--------------------------------------------------------------------------------
  source_type: mitre_attack
  source_url: https://attack.mitre.org/techniques/T1001
  technique_id: T1001
  technique_name: Data Obfuscation
  description_length: 465

üìù CONTENT:
--------------------------------------------------------------------------------
T1001
Data Obfuscation
Adversaries may obfuscate command and control traffic to make it more difficult to detect. Command and control (C2) communications are hidden (but not necessarily encrypted) in an attempt to make the content more difficult to discover or decipher and to make the communication less conspicuous and hide commands from being seen. This encompasses many methods, such as adding junk data to protocol traffic, using steganography, or impersonating legitimate protocols.

üìä Th·ªëng k√™:
  - Content length: 488 chars
  - Content lines: 3 lines
  - Metadata fields: 5


In [34]:
# Xem nhi·ªÅu documents c√πng l√∫c
if 'mitre_docs' in locals() and mitre_docs:
    print(f"üìö T·ªïng s·ªë documents: {len(mitre_docs)}\n")
    
    # Xem t·∫•t c·∫£ documents (t√≥m t·∫Øt)
    print("üìã DANH S√ÅCH T·∫§T C·∫¢ DOCUMENTS:")
    print("=" * 80)
    for i, doc in enumerate(mitre_docs):
        tech_id = doc.metadata.get('technique_id', 'N/A')
        tech_name = doc.metadata.get('technique_name', doc.metadata.get('title', 'N/A'))
        content_len = len(doc.page_content)
        print(f"  [{i}] {tech_id:15} | {tech_name[:50]:50} | {content_len:6} chars")
    
    print("\n" + "=" * 80)
    print("\nüí° ƒê·ªÉ xem chi ti·∫øt m·ªôt document c·ª• th·ªÉ:")
    print("   view_document(mitre_docs[0])  # Xem document ƒë·∫ßu ti√™n")
    print("   view_document(mitre_docs[1])  # Xem document th·ª© hai")
    print("   print(mitre_docs[0].page_content)  # Ch·ªâ xem content")
    print("   print(mitre_docs[0].metadata)  # Ch·ªâ xem metadata")
else:
    print("‚ö†Ô∏è Ch∆∞a c√≥ mitre_docs. H√£y ch·∫°y cell load MITRE ATT&CK techniques tr∆∞·ªõc.")


üìö T·ªïng s·ªë documents: 10

üìã DANH S√ÅCH T·∫§T C·∫¢ DOCUMENTS:
  [0] T1001           | Data Obfuscation                                   |    488 chars
  [1] T1001.001       | Data Obfuscation:Junk Data                         |    456 chars
  [2] T1001.002       | Data Obfuscation:Steganography                     |    493 chars
  [3] T1001.003       | Data Obfuscation:Protocol or Service Impersonation |    916 chars
  [4] T1003           | OS Credential Dumping                              |    509 chars
  [5] T1003.001       | OS Credential Dumping:LSASS Memory                 |   2219 chars
  [6] T1003.002       | OS Credential Dumping:Security Account Manager     |    904 chars
  [7] T1003.003       | OS Credential Dumping:NTDS                         |    747 chars
  [8] T1003.004       | OS Credential Dumping:LSA Secrets                  |    481 chars
  [9] T1003.005       | OS Credential Dumping:Cached Domain Credentials    |   1284 chars


üí° ƒê·ªÉ xem chi ti·∫øt m·

In [35]:
# Xem document v·ªõi format ƒë√£ t·ªëi ∆∞u
if 'mitre_docs' in locals() and mitre_docs:
    print(f"üìö T·ªïng s·ªë documents: {len(mitre_docs)}\n")
    print("üìã FORMAT M·∫™U (t·ªëi ∆∞u cho text splitting):")
    print("=" * 80)
    
    # Hi·ªÉn th·ªã m·ªôt v√†i documents ƒë·ªÉ xem format
    for i, doc in enumerate(mitre_docs[:3]):
        print(f"\n[Document {i+1}]")
        print("-" * 80)
        print(doc.page_content)
        print(f"\nMetadata:")
        print(f"  - technique_id: {doc.metadata.get('technique_id')}")
        print(f"  - technique_name: {doc.metadata.get('technique_name')}")
        print("-" * 80)
    
    print("\nüí° Format n√†y t·ªëi ∆∞u cho text splitting v√¨:")
    print("   - M·ªói technique l√† m·ªôt document ri√™ng")
    print("   - Format: ID\\nName\\nDescription (d·ªÖ parse)")
    print("   - Text splitter s·∫Ω t√°ch theo t·ª´ng technique t·ª± nhi√™n")
else:
    print("‚ö†Ô∏è Ch∆∞a c√≥ mitre_docs. H√£y ch·∫°y cell load MITRE ATT&CK techniques tr∆∞·ªõc.")


üìö T·ªïng s·ªë documents: 10

üìã FORMAT M·∫™U (t·ªëi ∆∞u cho text splitting):

[Document 1]
--------------------------------------------------------------------------------
T1001
Data Obfuscation
Adversaries may obfuscate command and control traffic to make it more difficult to detect. Command and control (C2) communications are hidden (but not necessarily encrypted) in an attempt to make the content more difficult to discover or decipher and to make the communication less conspicuous and hide commands from being seen. This encompasses many methods, such as adding junk data to protocol traffic, using steganography, or impersonating legitimate protocols.

Metadata:
  - technique_id: T1001
  - technique_name: Data Obfuscation
--------------------------------------------------------------------------------

[Document 2]
--------------------------------------------------------------------------------
T1001.001
Data Obfuscation:Junk Data
Adversaries may add junk data to protocols used f

In [63]:
# In ra T·∫§T C·∫¢ documents ƒë·ªÉ xem
print("=" * 100)
print("üìö XEM T·∫§T C·∫¢ DOCUMENTS")
print("=" * 100)

# 1. Sigma Rules
if 'sigma_docs_processed' in locals() and sigma_docs_processed:
    print(f"\n\n{'='*100}")
    print(f"üìã SIGMA RULES ({len(sigma_docs_processed)} documents)")
    print("=" * 100)
    for i, doc in enumerate(sigma_docs_processed):
        print(f"\n[Sigma Doc {i+1}/{len(sigma_docs_processed)}]")
        print("-" * 100)
        print(f"Title: {doc.metadata.get('title', 'N/A')}")
        print(f"Level: {doc.metadata.get('level', 'N/A')}")
        print(f"Status: {doc.metadata.get('status', 'N/A')}")
        print(f"\nContent:")
        print(doc.page_content)
        print("-" * 100)
else:
    print("\n‚ö†Ô∏è Ch∆∞a c√≥ sigma_docs_processed")

# 2. MITRE ATT&CK Techniques
if 'mitre_docs' in locals() and mitre_docs:
    print(f"\n\n{'='*100}")
    print(f"üìã MITRE ATT&CK TECHNIQUES ({len(mitre_docs)} documents)")
    print("=" * 100)
    for i, doc in enumerate(mitre_docs):
        print(f"\n[MITRE Doc {i+1}/{len(mitre_docs)}]")
        print("-" * 100)
        print(f"Technique ID: {doc.metadata.get('technique_id', 'N/A')}")
        print(f"Technique Name: {doc.metadata.get('technique_name', 'N/A')}")
        print(f"Description Length: {doc.metadata.get('description_length', 0)} chars")
        print(f"\nContent:")
        print(doc.page_content)
        print("-" * 100)
else:
    print("\n‚ö†Ô∏è Ch∆∞a c√≥ mitre_docs")

# 3. OWASP Cheatsheets
if 'owasp_docs' in locals() and owasp_docs:
    print(f"\n\n{'='*100}")
    print(f"üìã OWASP CHEATSHEETS ({len(owasp_docs)} documents)")
    print("=" * 100)
    for i, doc in enumerate(owasp_docs):
        print(f"\n[OWASP Doc {i+1}/{len(owasp_docs)}]")
        print("-" * 100)
        print(f"Doc Type: {doc.metadata.get('doc_type', 'N/A')}")
        print(f"Source: {doc.metadata.get('source', 'N/A')}")
        print(f"Content Length: {len(doc.page_content)} chars")
        print(f"\nContent:")
        print(doc.page_content[:1000] if len(doc.page_content) > 1000 else doc.page_content)
        if len(doc.page_content) > 1000:
            print(f"\n... (truncated, total {len(doc.page_content)} chars)")
        print("-" * 100)
else:
    print("\n‚ö†Ô∏è Ch∆∞a c√≥ owasp_docs")

# 4. All Documents (sau khi combine)
if 'all_docs' in locals() and all_docs:
    print(f"\n\n{'='*100}")
    print(f"üìã ALL DOCUMENTS (Sau khi combine) ({len(all_docs)} documents)")
    print("=" * 100)
    for i, doc in enumerate(all_docs):
        print(f"\n[All Doc {i+1}/{len(all_docs)}]")
        print("-" * 100)
        print(f"Source Type: {doc.metadata.get('source_type', 'N/A')}")
        
        if doc.metadata.get('source_type') == 'mitre_attack':
            print(f"Technique ID: {doc.metadata.get('technique_id', 'N/A')}")
            print(f"Technique Name: {doc.metadata.get('technique_name', 'N/A')}")
        elif doc.metadata.get('source_type') == 'owasp_cheatsheet':
            print(f"Doc Type: {doc.metadata.get('doc_type', 'N/A')}")
        else:
            print(f"Title: {doc.metadata.get('title', 'N/A')}")
            print(f"Level: {doc.metadata.get('level', 'N/A')}")
        
        print(f"Content Length: {len(doc.page_content)} chars")
        print(f"\nContent:")
        print(doc.page_content[:1000] if len(doc.page_content) > 1000 else doc.page_content)
        if len(doc.page_content) > 1000:
            print(f"\n... (truncated, total {len(doc.page_content)} chars)")
        print("-" * 100)
else:
    print("\n‚ö†Ô∏è Ch∆∞a c√≥ all_docs (ch·∫°y cell Text Splitting v√† Combine ƒë·ªÉ t·∫°o all_docs)")

print(f"\n\n{'='*100}")
print("‚úÖ HO√ÄN T·∫§T XEM T·∫§T C·∫¢ DOCUMENTS")
print("=" * 100)


üìö XEM T·∫§T C·∫¢ DOCUMENTS


üìã SIGMA RULES (13 documents)

[Sigma Doc 1/13]
----------------------------------------------------------------------------------------------------
Title: F5 BIG-IP iControl Rest API Command Execution - Webserver
Level: medium
Status: test

Content:
Sigma Rule: F5 BIG-IP iControl Rest API Command Execution - Webserver
Status: test | Level: medium
Description: Detects POST requests to the F5 BIG-IP iControl Rest API "bash" endpoint, which allows the execution of commands on the BIG-IP
----------------------------------------------------------------------------------------------------

[Sigma Doc 2/13]
----------------------------------------------------------------------------------------------------
Title: Successful IIS Shortname Fuzzing Scan
Level: medium
Status: test

Content:
Sigma Rule: Successful IIS Shortname Fuzzing Scan
Status: test | Level: medium
Description: When IIS uses an old .Net Framework it's possible to enumerate folders with the sy

## 7. Load OWASP Cheatsheets


In [65]:
# Load OWASP Cheatsheets t·ª´ th∆∞ m·ª•c local
# Read in documents using LangChain's loaders
# Take everything in all the sub-folders of our knowledgebase

import os, glob
from langchain_community.document_loaders import DirectoryLoader, TextLoader

# ƒê∆∞·ªùng d·∫´n t·ªõi th∆∞ m·ª•c cheatsheets
cheatsheets_path = r"D:\MCPLLM\test\cheatsheets"

# Ki·ªÉm tra th∆∞ m·ª•c c√≥ t·ªìn t·∫°i kh√¥ng
if not os.path.exists(cheatsheets_path):
    print(f"‚ö†Ô∏è  Th∆∞ m·ª•c kh√¥ng t·ªìn t·∫°i: {cheatsheets_path}")
    print(f"   T·∫°o th∆∞ m·ª•c ho·∫∑c ki·ªÉm tra ƒë∆∞·ªùng d·∫´n")
    owasp_docs = []
else:
    # D√πng raw string ƒë·ªÉ tr√°nh l·ªói escape
    folders = glob.glob(os.path.join(cheatsheets_path, "*"))
    
    # Thi·∫øt l·∫≠p text loader kwargs
    # Th·ª≠ d√πng autodetect n·∫øu c√≥ chardet, n·∫øu kh√¥ng th√¨ d√πng utf-8
    try:
        import chardet
        text_loader_kwargs = {'autodetect_encoding': True}
    except ImportError:
        print("  ‚ö†Ô∏è  Module 'chardet' ch∆∞a ƒë∆∞·ª£c c√†i ƒë·∫∑t. D√πng encoding='utf-8' thay th·∫ø.")
        print("  üí° ƒê·ªÉ c√†i: pip install chardet")
        text_loader_kwargs = {'encoding': 'utf-8'}
    
    owasp_docs = []
    print(f"üìÇ ƒêang load OWASP cheatsheets t·ª´: {cheatsheets_path}")
    
    for folder in folders:
        doc_type = os.path.basename(folder)
        
        try:
            if os.path.isdir(folder):
                loader = DirectoryLoader(
                    folder,
                    glob="**/*.md",  # ‚úÖ lu√¥n d√πng d·∫•u /
                    loader_cls=TextLoader,
                    loader_kwargs=text_loader_kwargs
                )
                folder_docs = loader.load()
                
            elif os.path.isfile(folder) and folder.endswith('.md'):
                loader = TextLoader(folder, **text_loader_kwargs)
                folder_docs = loader.load()
            else:
                continue  # B·ªè qua n·∫øu kh√¥ng ph·∫£i file .md
            
            for doc in folder_docs:
                doc.metadata["doc_type"] = doc_type
                doc.metadata["source_type"] = "owasp_cheatsheet"
                # Th√™m source path v√†o metadata
                if 'source' not in doc.metadata:
                    doc.metadata["source"] = folder
                owasp_docs.append(doc)
            
            if folder_docs:
                print(f"  ‚úÖ Loaded {len(folder_docs)} docs t·ª´: {doc_type}")
        
        except Exception as e:
            print(f"  ‚ö†Ô∏è  L·ªói khi load {folder}: {e}")
            continue
    
    print(f"\n‚úÖ Total OWASP cheatsheet documents loaded: {len(owasp_docs)}")
    
    # Hi·ªÉn th·ªã m·ªôt v√†i documents m·∫´u
    if owasp_docs:
        print(f"\nüìã M·ªôt v√†i documents m·∫´u:")
        for i, doc in enumerate(owasp_docs[:3]):
            doc_type = doc.metadata.get('doc_type', 'N/A')
            source = doc.metadata.get('source', 'N/A')
            if isinstance(source, str):
                source_name = os.path.basename(source)
            else:
                source_name = str(source)
            print(f"  [{i+1}] {doc_type} | {source_name} | {len(doc.page_content)} chars")


üìÇ ƒêang load OWASP cheatsheets t·ª´: D:\MCPLLM\test\cheatsheets
  ‚úÖ Loaded 1 docs t·ª´: Abuse_Case_Cheat_Sheet.md
  ‚úÖ Loaded 1 docs t·ª´: Access_Control_Cheat_Sheet.md
  ‚úÖ Loaded 1 docs t·ª´: AJAX_Security_Cheat_Sheet.md
  ‚úÖ Loaded 1 docs t·ª´: Attack_Surface_Analysis_Cheat_Sheet.md
  ‚úÖ Loaded 1 docs t·ª´: Authentication_Cheat_Sheet.md
  ‚úÖ Loaded 1 docs t·ª´: Authorization_Cheat_Sheet.md
  ‚úÖ Loaded 1 docs t·ª´: Authorization_Testing_Automation_Cheat_Sheet.md
  ‚úÖ Loaded 1 docs t·ª´: Automotive_Security.md
  ‚úÖ Loaded 1 docs t·ª´: Bean_Validation_Cheat_Sheet.md
  ‚úÖ Loaded 1 docs t·ª´: Browser_Extension_Vulnerabilities_Cheat_Sheet.md
  ‚úÖ Loaded 1 docs t·ª´: C-Based_Toolchain_Hardening_Cheat_Sheet.md
  ‚úÖ Loaded 1 docs t·ª´: Choosing_and_Using_Security_Questions_Cheat_Sheet.md
  ‚úÖ Loaded 1 docs t·ª´: CI_CD_Security_Cheat_Sheet.md
  ‚úÖ Loaded 1 docs t·ª´: Clickjacking_Defense_Cheat_Sheet.md
  ‚úÖ Loaded 1 docs t·ª´: Content_Security_Policy_Cheat_Sheet.md
  ‚úÖ Lo

In [69]:
# In ra T·∫§T C·∫¢ OWASP documents
if 'owasp_docs' in locals() and owasp_docs:
    print("=" * 100)
    print(f"üìã OWASP CHEATSHEETS - T·∫§T C·∫¢ DOCUMENTS ({len(owasp_docs)} documents)")
    print("=" * 100)
    
    for i, doc in enumerate(owasp_docs):
        print(f"\n\n{'='*100}")
        print(f"[OWASP Doc {i+1}/{len(owasp_docs)}]")
        print("=" * 100)
        
        # Metadata
        print("\nüìã METADATA:")
        print("-" * 100)
        for key, value in doc.metadata.items():
            if isinstance(value, str) and len(value) > 200:
                print(f"  {key}: {value}...")
            else:
                print(f"  {key}: {value}")
        
        # Content
        print(f"\nüìù CONTENT ({len(doc.page_content)} chars):")
        print("-" * 100)
        print(doc.page_content)
        print("-" * 100)
        
        print(f"\nüìä Th·ªëng k√™:")
        print(f"  - Content length: {len(doc.page_content)} chars")
        print(f"  - Content lines: {len(doc.page_content.splitlines())} lines")
        print(f"  - Metadata fields: {len(doc.metadata)}")
    
    print(f"\n\n{'='*100}")
    print(f"‚úÖ ƒê√£ in {len(owasp_docs)} OWASP documents")
    print("=" * 100)
else:
    print("‚ö†Ô∏è Ch∆∞a c√≥ owasp_docs. H√£y ch·∫°y cell Load OWASP Cheatsheets tr∆∞·ªõc.")

üìã OWASP CHEATSHEETS - T·∫§T C·∫¢ DOCUMENTS (107 documents)


[OWASP Doc 1/107]

üìã METADATA:
----------------------------------------------------------------------------------------------------
  source: D:\MCPLLM\test\cheatsheets\Abuse_Case_Cheat_Sheet.md
  doc_type: Abuse_Case_Cheat_Sheet.md
  source_type: owasp_cheatsheet

üìù CONTENT (28002 chars):
----------------------------------------------------------------------------------------------------
# Abuse Case Cheat Sheet (Historical)

## Archive Statement

Reviewers have identified that abuse cases are rarely used in practice. Additionally, the material is presented as a "getting started tutorial" which isn't appropriate for the cheat sheet series.

## Introduction

Often when the security level of an application is mentioned in requirements, the following _expressions_ are met:

- _The application must be secure_.
- _The application must defend against all attacks targeting this category of application_.
- _The application 

## 8. Text Splitting v√† Combine All Documents


In [87]:
## 9. Xem Chunks v√† Metadata (Sau khi Split)

# In ra c√°c chunks v·ªõi metadata ƒë·ªÉ xem th·ª≠
print("="*100)
print("üìã XEM C√ÅC CHUNKS SAU KHI SPLIT")
print("="*100)

if 'all_docs' in locals() and all_docs:
    print(f"\nüìä T·ªïng s·ªë chunks: {len(all_docs)}\n")
    
    # Th·ªëng k√™ theo source type
    sigma_chunks = [d for d in all_docs if d.metadata.get('source_type') == 'sigma_rule']
    mitre_chunks = [d for d in all_docs if d.metadata.get('source_type') == 'mitre_attack']
    owasp_chunks = [d for d in all_docs if d.metadata.get('source_type') == 'owasp_cheatsheet']
    
    print(f"üìà Th·ªëng k√™ chunks:")
    print(f"   - Sigma rules: {len(sigma_chunks)} chunks")
    print(f"   - MITRE ATT&CK: {len(mitre_chunks)} chunks")
    print(f"   - OWASP cheatsheets: {len(owasp_chunks)} chunks")
    print()
    
    # Xem m·ªôt v√†i chunks t·ª´ m·ªói lo·∫°i
    print("="*100)
    print("üìÑ SAMPLE CHUNKS V·ªöI METADATA")
    print("="*100)
    
    # 1. Sigma chunks
    if sigma_chunks:
        print(f"\n\n[1] SIGMA RULE CHUNKS (Sample {min(2, len(sigma_chunks))} chunks)")
        print("-"*100)
        for i, chunk in enumerate(sigma_chunks[:2]):
            print(f"\n[Sigma Chunk {i+1}]")
            print(f"Content Length: {len(chunk.page_content)} chars")
            print(f"Metadata: {json.dumps(chunk.metadata, indent=2, ensure_ascii=False, default=str)}")
            print(f"\nContent:")
            print(chunk.page_content[:300] if len(chunk.page_content) > 300 else chunk.page_content)
            if len(chunk.page_content) > 300:
                print(f"... (truncated, total {len(chunk.page_content)} chars)")
            print("-"*100)
    
    # 2. MITRE chunks
    if mitre_chunks:
        print(f"\n\n[2] MITRE ATT&CK CHUNKS (Sample {min(2, len(mitre_chunks))} chunks)")
        print("-"*100)
        for i, chunk in enumerate(mitre_chunks[:2]):
            print(f"\n[MITRE Chunk {i+1}]")
            print(f"Technique ID: {chunk.metadata.get('technique_id', 'N/A')}")
            print(f"Technique Name: {chunk.metadata.get('technique_name', 'N/A')}")
            print(f"Content Length: {len(chunk.page_content)} chars")
            print(f"\nMetadata: {json.dumps(chunk.metadata, indent=2, ensure_ascii=False, default=str)}")
            print(f"\nContent:")
            print(chunk.page_content)
            print("-"*100)
    
    # 3. OWASP chunks
    if owasp_chunks:
        print(f"\n\n[3] OWASP CHEATSHEET CHUNKS (Sample {min(3, len(owasp_chunks))} chunks)")
        print("-"*100)
        for i, chunk in enumerate(owasp_chunks[:3]):
            print(f"\n[OWASP Chunk {i+1}]")
            print(f"Doc Type: {chunk.metadata.get('doc_type', 'N/A')}")
            print(f"Source: {chunk.metadata.get('source', 'N/A')}")
            print(f"Content Length: {len(chunk.page_content)} chars")
            print(f"\nMetadata: {json.dumps(chunk.metadata, indent=2, ensure_ascii=False, default=str)}")
            print(f"\nContent Preview (500 chars):")
            print(chunk.page_content[:500] if len(chunk.page_content) > 500 else chunk.page_content)
            if len(chunk.page_content) > 500:
                print(f"... (truncated, total {len(chunk.page_content)} chars)")
            print("-"*100)
    
    # T√≥m t·∫Øt
    print("\n\n" + "="*100)
    print("üìä T·ªîNG K·∫æT CHUNKS")
    print("="*100)
    print(f"T·ªïng s·ªë chunks: {len(all_docs)}")
    print(f"  - Sigma: {len(sigma_chunks)} chunks")
    print(f"  - MITRE: {len(mitre_chunks)} chunks")
    print(f"  - OWASP: {len(owasp_chunks)} chunks")
    print(f"  - Unknown: {len(all_docs) - len(sigma_chunks) - len(mitre_chunks) - len(owasp_chunks)} chunks")
    
    # Th·ªëng k√™ k√≠ch th∆∞·ªõc
    avg_len = sum(len(d.page_content) for d in all_docs) / len(all_docs) if all_docs else 0
    min_len = min(len(d.page_content) for d in all_docs) if all_docs else 0
    max_len = max(len(d.page_content) for d in all_docs) if all_docs else 0
    print(f"\nK√≠ch th∆∞·ªõc chunks:")
    print(f"  - Trung b√¨nh: {avg_len:.0f} chars")
    print(f"  - Nh·ªè nh·∫•t: {min_len} chars")
    print(f"  - L·ªõn nh·∫•t: {max_len} chars")
    
else:
    print("‚ö†Ô∏è Ch∆∞a c√≥ all_docs. H√£y ch·∫°y cell 'Text Splitting v√† Combine All Documents' tr∆∞·ªõc.")


üìã XEM C√ÅC CHUNKS SAU KHI SPLIT

üìä T·ªïng s·ªë chunks: 3358

üìà Th·ªëng k√™ chunks:
   - Sigma rules: 13 chunks
   - MITRE ATT&CK: 13 chunks
   - OWASP cheatsheets: 3332 chunks

üìÑ SAMPLE CHUNKS V·ªöI METADATA


[1] SIGMA RULE CHUNKS (Sample 2 chunks)
----------------------------------------------------------------------------------------------------

[Sigma Chunk 1]
Content Length: 238 chars
Metadata: {
  "source": "d:\\MCPLLM\\test\\sigma\\rules\\web\\webserver_generic\\web_f5_tm_utility_bash_api_request.yml",
  "full_rule": "title: F5 BIG-IP iControl Rest API Command Execution - Webserver\nid: 85254a62-22be-4239-b79c-2ec17e566c37\nrelated:\n    - id: b59c98c6-95e8-4d65-93ee-f594dfb96b17\n      type: similar\nstatus: test\ndescription: Detects POST requests to the F5 BIG-IP iControl Rest API \"bash\" endpoint, which allows the execution of commands on the BIG-IP\nreferences:\n    - https://f5-sdk.readthedocs.io/en/latest/apidoc/f5.bigip.tm.util.html#module-f5.bigip.tm.util.

In [70]:
# Text Splitting v√† Combine All Documents
# T·ªëi ∆∞u cho t·ª´ng lo·∫°i document: Sigma, MITRE, OWASP

# 1. Text splitter cho OWASP (Markdown files - c√≥ th·ªÉ r·∫•t d√†i)
# OWASP cheatsheets th∆∞·ªùng l√† markdown, c·∫ßn split ƒë·ªÉ qu·∫£n l√Ω
owasp_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,  # Ph√π h·ª£p cho markdown content
    chunk_overlap=200,
    length_function=len,
    separators=["\n\n## ", "\n\n### ", "\n\n", "\n", ". ", " ", ""]  # ∆Øu ti√™n split theo markdown headers
)

# 2. Text splitter cho MITRE (Format: T1548\nName\nDescription)
# Gi·ªØ nguy√™n technique n·∫øu c√≥ th·ªÉ, ch·ªâ split n·∫øu qu√° d√†i
mitre_splitter = RecursiveCharacterTextSplitter(
    chunk_size=800,  # ƒê·ªß l·ªõn cho h·∫ßu h·∫øt techniques
    chunk_overlap=200,
    length_function=len,
    separators=["\n\n", "\n", ". "]  # ∆Øu ti√™n gi·ªØ nguy√™n format ID\nName\nDescription
)

# Combine t·∫•t c·∫£ documents
all_docs = []

# =================================================================
# 1. SIGMA RULES
# =================================================================
# Sigma rules ƒë√£ ƒë∆∞·ª£c processed, format ng·∫Øn g·ªçn, kh√¥ng c·∫ßn split
if 'sigma_docs_processed' in locals() and sigma_docs_processed:
    for doc in sigma_docs_processed:
        # ƒê·∫£m b·∫£o metadata c√≥ source_type
        if 'source_type' not in doc.metadata:
            doc.metadata['source_type'] = 'sigma_rule'
        all_docs.append(doc)
    print(f"‚úÖ Added {len(sigma_docs_processed)} Sigma rule documents (kh√¥ng split)")

# =================================================================
# 2. MITRE ATT&CK TECHNIQUES
# =================================================================
# Format: T1548\nName\nDescription
# M·ªói technique l√† m·ªôt document ri√™ng, ch·ªâ split n·∫øu description qu√° d√†i
if 'mitre_docs' in locals() and mitre_docs:
    mitre_split = []
    for doc in mitre_docs:
        # ƒê·∫£m b·∫£o metadata c√≥ source_type
        if 'source_type' not in doc.metadata:
            doc.metadata['source_type'] = 'mitre_attack'
        
        # N·∫øu description qu√° d√†i (>2000 chars), split theo paragraph
        if len(doc.page_content) > 2000:
            chunks = mitre_splitter.split_documents([doc])
            # Gi·ªØ metadata cho m·ªói chunk
            for chunk in chunks:
                chunk.metadata['source_type'] = 'mitre_attack'
            mitre_split.extend(chunks)
        else:
            # Gi·ªØ nguy√™n document - m·ªói technique l√† m·ªôt chunk
            mitre_split.append(doc)
    
    all_docs.extend(mitre_split)
    long_docs = sum(1 for doc in mitre_docs if len(doc.page_content) > 2000)
    print(f"‚úÖ Added {len(mitre_split)} MITRE ATT&CK documents (from {len(mitre_docs)} techniques)")
    if long_docs > 0:
        print(f"   - {long_docs} techniques ƒë√£ ƒë∆∞·ª£c split do description qu√° d√†i")

# =================================================================
# 3. OWASP CHEATSHEETS
# =================================================================
# OWASP cheatsheets l√† markdown files, c√≥ th·ªÉ r·∫•t d√†i, c·∫ßn split
if 'owasp_docs' in locals() and owasp_docs:
    owasp_split = []
    for doc in owasp_docs:
        # ƒê·∫£m b·∫£o metadata c√≥ source_type
        if 'source_type' not in doc.metadata:
            doc.metadata['source_type'] = 'owasp_cheatsheet'
        
        # Split OWASP documents (markdown c√≥ th·ªÉ r·∫•t d√†i)
        chunks = owasp_splitter.split_documents([doc])
        # Gi·ªØ metadata cho m·ªói chunk
        for chunk in chunks:
            chunk.metadata['source_type'] = 'owasp_cheatsheet'
            # Gi·ªØ nguy√™n doc_type v√† source t·ª´ document g·ªëc
            if 'doc_type' in doc.metadata:
                chunk.metadata['doc_type'] = doc.metadata['doc_type']
            if 'source' in doc.metadata:
                chunk.metadata['source'] = doc.metadata['source']
        owasp_split.extend(chunks)
    
    all_docs.extend(owasp_split)
    print(f"‚úÖ Added {len(owasp_split)} OWASP cheatsheet document chunks (from {len(owasp_docs)} docs)")

# =================================================================
# T·ªîNG K·∫æT
# =================================================================
print(f"\nüìä T·ªïng c·ªông: {len(all_docs)} documents ƒë·ªÉ embed v√† l∆∞u v√†o ChromaDB")
print(f"\nüí° Chi·∫øn l∆∞·ª£c splitting:")
print(f"   - Sigma rules: Gi·ªØ nguy√™n (kh√¥ng split)")
print(f"   - MITRE techniques: Gi·ªØ nguy√™n, ch·ªâ split n·∫øu >2000 chars")
print(f"   - OWASP cheatsheets: Split theo markdown headers v√† paragraphs")


‚úÖ Added 13 Sigma rule documents (kh√¥ng split)
‚úÖ Added 13 MITRE ATT&CK documents (from 10 techniques)
   - 1 techniques ƒë√£ ƒë∆∞·ª£c split do description qu√° d√†i
‚úÖ Added 3332 OWASP cheatsheet document chunks (from 107 docs)

üìä T·ªïng c·ªông: 3358 documents ƒë·ªÉ embed v√† l∆∞u v√†o ChromaDB

üí° Chi·∫øn l∆∞·ª£c splitting:
   - Sigma rules: Gi·ªØ nguy√™n (kh√¥ng split)
   - MITRE techniques: Gi·ªØ nguy√™n, ch·ªâ split n·∫øu >2000 chars
   - OWASP cheatsheets: Split theo markdown headers v√† paragraphs


In [71]:
print(owasp_split[0])

page_content='# Abuse Case Cheat Sheet (Historical)

## Archive Statement

Reviewers have identified that abuse cases are rarely used in practice. Additionally, the material is presented as a "getting started tutorial" which isn't appropriate for the cheat sheet series.' metadata={'source': 'D:\\MCPLLM\\test\\cheatsheets\\Abuse_Case_Cheat_Sheet.md', 'doc_type': 'Abuse_Case_Cheat_Sheet.md', 'source_type': 'owasp_cheatsheet'}


In [88]:
## 10. Custom Query - Th·ª≠ query c·ªßa b·∫°n

# Thay ƒë·ªïi query ·ªü ƒë√¢y ƒë·ªÉ t√¨m ki·∫øm
if 'vectorstore' in locals() and vectorstore:
    my_query = "XSS cross-site scripting prevention"
    k_results = 5
    
    print(f"üîé Custom Query: '{my_query}'")
    print(f"üìä S·ªë k·∫øt qu·∫£: {k_results}")
    print("="*100)
    
    results = vectorstore.similarity_search_with_score(my_query, k=k_results)
    
    for i, (doc, score) in enumerate(results, 1):
        print(f"\n[{i}] Score: {score:.4f}")
        print("-"*100)
        
        source_type = doc.metadata.get('source_type', 'unknown')
        print(f"Source Type: {source_type}")
        
        if source_type == 'sigma_rule':
            print(f"Title: {doc.metadata.get('title', 'N/A')}")
            print(f"Level: {doc.metadata.get('level', 'N/A')}")
            print(f"Status: {doc.metadata.get('status', 'N/A')}")
        elif source_type == 'mitre_attack':
            print(f"Technique ID: {doc.metadata.get('technique_id', 'N/A')}")
            print(f"Technique Name: {doc.metadata.get('technique_name', 'N/A')}")
        elif source_type == 'owasp_cheatsheet':
            print(f"Doc Type: {doc.metadata.get('doc_type', 'N/A')}")
            print(f"Source: {doc.metadata.get('source', 'N/A')}")
        
        print(f"\nContent:")
        print(doc.page_content[:500] if len(doc.page_content) > 500 else doc.page_content)
        if len(doc.page_content) > 500:
            print(f"... (truncated, total {len(doc.page_content)} chars)")
        print("-"*100)
else:
    print("‚ö†Ô∏è Ch∆∞a c√≥ vectorstore. H√£y ch·∫°y cell ChromaDB Setup tr∆∞·ªõc.")


üîé Custom Query: 'XSS cross-site scripting prevention'
üìä S·ªë k·∫øt qu·∫£: 5

[1] Score: 0.3043
----------------------------------------------------------------------------------------------------
Source Type: owasp_cheatsheet
Doc Type: Cross_Site_Scripting_Prevention_Cheat_Sheet.md
Source: D:\MCPLLM\test\cheatsheets\Cross_Site_Scripting_Prevention_Cheat_Sheet.md

Content:
# Cross Site Scripting Prevention Cheat Sheet

## Introduction

This cheat sheet helps developers prevent XSS vulnerabilities.

Cross-Site Scripting (XSS) is a misnomer. Originally this term was derived from early versions of the attack that were primarily focused on stealing data cross-site. Since then, the term has widened to include injection of basically any content. XSS attacks are serious and can lead to account impersonation, observing user behaviour, loading external content, stealing se
... (truncated, total 731 chars)
-------------------------------------------------------------------------------------

In [67]:
# Thay ƒë·ªïi query ·ªü ƒë√¢y
my_query = "SQL injection"
k_results = 3

results = vectorstore.similarity_search_with_score(my_query, k=k_results)

print(f"üîé Query: '{my_query}'")
print(f"K·∫øt qu·∫£: {len(results)} documents")
print("="*60)

for i, (doc, score) in enumerate(results, 1):
    print(f"\n[{i}] Score: {score:.4f}")
    formatted = format_doc_for_llm(doc)
    print(formatted)
    print("-"*60)


üîé Query: 'SQL injection'
K·∫øt qu·∫£: 3 documents

[1] Score: 0.9444
Title: SQL Injection Strings In URI
ID: 5513deaf-f49a-46c2-a6c8-3f111b5cb453
Status: test
Level: high
Description: Detects potential SQL injection attempts via GET requests in access logs.
Author: Saw Win Naung, Nasreddine Bencherchali (Nextron Systems), Thurein Oo (Yoma Bank)
Tags: attack.initial-access, attack.t1190
Log Source: {"category": "webserver"}
Detection Keywords: @@version, %271%27%3D%271, =select , =select(, =select%20
  (+ 25 more keywords)
Detection Condition: selection and keywords and not 1 of filter_main_*
References: 5 reference(s)
  - https://www.acunetix.com/blog/articles/exploiting-sql-injection-example/
  - https://www.acunetix.com/blog/articles/using-logs-to-investigate-a-web-application-attack/
  - https://brightsec.com/blog/sql-injection-payloads/
  ... v√† 2 reference(s) kh√°c

Content:
Sigma Rule: SQL Injection Strings In URI
Status: test | Level: high
Description: Detects potential SQL 