os - Lets us work with files and folders

json - Helps handle JSON data (a common data format)

re - For working with text patterns (regular expressions)

typing - Helps specify what types of data our functions work with

Path - A better way to handle file paths

openai - The official OpenAI library

dotenv - Reads secret keys from a .env file

In [5]:
# os - Lets us work with files and folders

# json - Helps handle JSON data (a common data format)

# re - For working with text patterns (regular expressions)

# typing - Helps specify what types of data our functions work with

# Path - A better way to handle file paths

# openai - The official OpenAI library

# dotenv - Reads secret keys from a .env file

import os
import json
import re
from typing import List, Dict, Optional
from pathlib import Path
import openai
from dotenv import load_dotenv

In [6]:
# Loads hidden environment variables (like API keys) from a .env file
load_dotenv()

True

In [None]:
class ContentProcessor:
    
    def __init__(self):
        # Creates a new OpenAI client using our secret API key
        # Think of this like logging into your OpenAI account
        self.openai_client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    
    def process_markdown_file(self, file_path: str) -> Dict:
        # Opens a markdown file and reads all its text
        # 'r' means "read mode"
        # 'utf-8' handles special characters correctly
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()

        # Looks for the main title (line starting with #)
        # If no title found, uses the filename (without extension) as title
        title_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
        title = title_match.group(1) if title_match else Path(file_path).stem

        # Splits content into sections (by ## headings)
        # Extracts useful info about the content (topics, examples etc.)
        sections = self.split_into_sections(content)
        metadata = self.extract_metadata(content, file_path)

        # Returns all the organized information in a dictionary
        return {
            "title": title,
            "file_path": file_path,
            "sections": sections,
            "metadata": metadata,
            "full_content": content
        }
    

        # Prepares empty containers to hold sections we'll find
    def split_into_sections(self, content: str) -> List[Dict]:
        sections = []
        current_section = ""
        current_heading = ""

        # When we find a section heading (##):
        # Saves the previous section (if any exists)
        # Records the heading, content, and word count
        lines = content.split('\n')
        for line in lines:
            if line.startswith('## '):
                if current_section and current_heading:
                    sections.append({
                        "heading": current_heading,
                        "content": current_section.strip(),
                        "word_count": len(current_section.split())
                    })
                    current_heading = line[3:].strip()
                    current_section = ""
                else:
                    current_section += line + '\n'

            # Makes sure the last section gets saved
            # Returns all sections found
            if current_section and current_heading:
                sections.append({
                    "heading": current_heading,
                    "content": current_section.strip(),
                    "word_count": len(current_section.split())
                })
            return sections
    
    
    def extract_metadata(self, content: str, file_path: str) -> Dict:
            # Sets up default metadata values
            metadata = {
                "platform": "picoCTF",
                "difficulty": "beginner",
                "topics": [],
                "examples": []
            }

            # Prepares to search for topic keywords (all in lowercase)
            content_lower = content.lower()
            topic_keywords = {
            "web_security": ["sql injection", "xss", "csrf", "web", "http", "cookie"],
            "cryptography": ["encryption", "cipher", "crypto", "hash", "rsa", "aes"],
            "reverse_engineering": ["assembly", "disassembly", "binary", "executable"],
            "forensics": ["steganography", "file analysis", "metadata", "recovery"],
            "pwn": ["buffer overflow", "stack", "heap", "memory", "exploit"],
            "networking": ["tcp", "udp", "packet", "wireshark", "network"]
            }

            # If any keyword matches, adds that topic to metadata
            for topic, keywords in topic_keywords.items():
                if any(keyword in content_lower for keyword in keywords):
                    metadata["topics"].append(topic)


            # Finds all code blocks (between triple backticks ```)
            # Keeps max 3 examples
            # Returns complete metadata
            examples = re.findall(r'```[\s\S]*?```', content)
            metadata["examples"] = examples[:3]
            return metadata

    def create_embedding(self, text: str) -> List[float]:
        try:
            # Sends text to OpenAI to get its "embedding" (numerical representation)
            # Limits to first 8000 characters to avoid errors
            # Returns the embedding numbers
            response = self.openai_client.embeddings.create(
                model="text-embedding-ada-002",
                input=text[:8000]
            )
            return response.data[0].embedding
        
        # If something fails, prints error and returns empty list
        except Exception as e:
            print(f"Error creating embedding: {e}")
            return []
    
    
    def process_ctf_primer_directory(self, primer_path: str) -> List[Dict]:
        processed_content = []
        # Looks through all folders and subfolders
        # Finds all .adoc files
        # Prints which file is being processed  
        for root, dirs, files in os.walk(primer_path):
            for file in files:
                if file.endswith('.adoc'):
                    file_path = os.path.join(root, file)
                    print(f"Processing: {file_path}")

                    try:
                        # Processes each file
                        # Creates embeddings for each section
                        # Adds to final results list
                        content_data = self.process_markdown_file(file_path)
                        for section in content_data["sections"]:
                            embedding = self.create_embedding(section["content"])
                            section["embedding"] = embedding
                        processed_content.append(content_data)

                        # Skips files that cause errors but keeps processing others
                    except Exception as e:
                        print(f"Error processing {file_path}: {e}")
                        continue
        return processed_content
    
    def save_processed_content(self, processed_content: List[Dict], output_path: str):
        # Creates folder if it doesn't exist
        # Saves data as nicely formatted JSON file
        # Prints where it saved the file
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(processed_content, f, indent=2, ensure_ascii=False)
        print(f"Processed content saved to: {output_path}")

In [8]:
if __name__ == "__main__":
    # Creates processor

    # Checks if CTF content exists

    # Processes all files if they exist

    # Or shows helpful message if they don't
    
    processor = ContentProcessor()
    ctf_primer_path = "./content/raw/ctf-primer"
    
    if os.path.exists(ctf_primer_path):
        processed_content = processor.process_ctf_primer_directory(ctf_primer_path)
        processor.save_processed_content(
            processed_content, 
            "./content/processed/ctf_primer_processed.json"
        )
        print(f"Processed {len(processed_content)} files")
    else:
        print(f"Path not found: {ctf_primer_path}")
        print("Please clone the repository first:")
        print("git clone https://github.com/picoCTF/ctf-primer.git ./content/raw/ctf-primer")

Processing: ./content/raw/ctf-primer\book.adoc
Processing: ./content/raw/ctf-primer\chapters\assembly.adoc
Processing: ./content/raw/ctf-primer\chapters\binary.adoc
Processing: ./content/raw/ctf-primer\chapters\c.adoc
Processing: ./content/raw/ctf-primer\chapters\careers.adoc
Processing: ./content/raw/ctf-primer\chapters\code.adoc
Processing: ./content/raw/ctf-primer\chapters\crypto.adoc
Processing: ./content/raw/ctf-primer\chapters\environment.adoc
Processing: ./content/raw/ctf-primer\chapters\forensics.adoc
Processing: ./content/raw/ctf-primer\chapters\git.adoc
Processing: ./content/raw/ctf-primer\chapters\intro.adoc
Processing: ./content/raw/ctf-primer\chapters\network.adoc
Processing: ./content/raw/ctf-primer\chapters\python.adoc
Processing: ./content/raw/ctf-primer\chapters\regex.adoc
Processing: ./content/raw/ctf-primer\chapters\reversing.adoc
Processing: ./content/raw/ctf-primer\chapters\shell.adoc
Processing: ./content/raw/ctf-primer\chapters\sql.adoc
Processing: ./content/raw/