In [4]:
from git import Repo
import os

wiki_repo_path = "https://github.com/malware-dev/MDK-SE.wiki.git"
local_repo_path = "documentation"

if not os.path.exists(local_repo_path):
    print(f"Cloning {wiki_repo_path} into {local_repo_path}...")
    Repo.clone_from(wiki_repo_path, local_repo_path)
else:
    print(f"Updating {local_repo_path}...")
    repo = Repo(local_repo_path)
    repo.remotes.origin.pull()

Cloning https://github.com/malware-dev/MDK-SE.wiki.git into documentation...


In [11]:
import markdown
from bs4 import BeautifulSoup
import nltk 

nltk.download('punkt')
nltk.download('punkt_tab')

#we are now going to convert the markdown files to html and then extract the text from them. Chunking the text with meta data whether it is text or code.
def load_and_chunk_wiki_mdkse_code_aware(wiki_folder, chunk_size=1000, chunk_overlap=50):
    chunks = []

    # Process files in the root wiki folder
    for filename in os.listdir(wiki_folder):
        if filename.endswith(".md"):
            filepath = os.path.join(wiki_folder, filename)
            page_chunks = process_markdown_file(filepath, filename, chunk_size, chunk_overlap)
            chunks.extend(page_chunks)

    # Process files in the "api" subfolder (if it exists)
    api_folder_path = os.path.join(wiki_folder, "api")
    if os.path.exists(api_folder_path) and os.path.isdir(api_folder_path):
        for filename in os.listdir(api_folder_path):
            if filename.endswith(".md"): # Or other file types you want to process in "api"
                filepath = os.path.join(api_folder_path, filename)
                page_chunks = process_markdown_file(filepath, os.path.join("api", filename), chunk_size, chunk_overlap) # Source includes "api/" prefix
                chunks.extend(page_chunks)

    return chunks


def process_markdown_file(filepath, source_filename, chunk_size, chunk_overlap):
    """Processes a single Markdown file to extract text and code chunks."""
    chunks = []
    chunk_count = 0
    current_chunk = ""

    with open(filepath, "r", encoding="utf-8") as f:
        markdown_content = f.read()
        html_content = markdown.markdown(markdown_content)
        soup = BeautifulSoup(html_content, "html.parser")

        for element in soup.descendants:
            if element.name == 'code':
                code_text = element.get_text(separator='\n', strip=True)
                if code_text.strip():
                    code_chunk = {
                        "content": code_text,
                        "source": source_filename, # Use the provided source_filename
                        "chunk_id": f"{source_filename}_chunk_{chunk_count}_code", # Source in chunk ID too
                        "block_type": "code"
                    }
                    chunks.append(code_chunk)
                    chunk_count += 1
            elif element.name not in ['pre', 'code']:
                text = element.get_text(separator='\n', strip=True)
                if text.strip():
                    sentences = nltk.sent_tokenize(text)
                    for sentence in sentences:
                        if len(current_chunk) + len(sentence) + 1 <= chunk_size:
                            current_chunk += sentence + " "
                        else:
                            if current_chunk.strip():
                                text_chunk = {
                                    "content": current_chunk.strip(),
                                    "source": source_filename, # Use the provided source_filename
                                    "chunk_id": f"{source_filename}_chunk_{chunk_count}_text", # Source in chunk ID too
                                    "block_type": "text"
                                }
                                chunks.append(text_chunk)
                                chunk_count += 1
                            current_chunk = sentence + " "

        if current_chunk.strip():
            text_chunk = {
                "content": current_chunk.strip(),
                "source": source_filename, # Use the provided source_filename
                "chunk_id": f"{source_filename}_chunk_{chunk_count}_text", # Source in chunk ID too
                "block_type": "text"
            }
            chunks.append(text_chunk)

    return chunks


mdk_se_wiki_folder = "documentation" # Your cloned wiki folder
mdk_se_wiki_chunks = load_and_chunk_wiki_mdkse_code_aware(mdk_se_wiki_folder)
print(f"Generated {len(mdk_se_wiki_chunks)} chunks for MDK-SE Wiki (including 'api' folder).")

[nltk_data] Downloading package punkt to C:\Users\James
[nltk_data]     Labadorf\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to C:\Users\James
[nltk_data]     Labadorf\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


Generated 16448 chunks for MDK-SE Wiki (including 'api' folder).


In [12]:
from sentence_transformers import SentenceTransformer

def embed_chunks(chunks, embedding_model):
    """Embeds text chunks using a Sentence Transformer model."""
    embeddings_data = []
    for chunk in chunks:
        embedding_vector = embedding_model.encode(chunk["content"])
        embeddings_data.append({
            "chunk_id": chunk["chunk_id"],
            "embedding": embedding_vector.tolist(),  # Store as list for JSON serialization if needed
            "content": chunk["content"],
            "source": chunk["source"],
            "block_type": chunk["block_type"] # Keep block_type metadata
        })
    return embeddings_data
# --- Use the GPU if available ---
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# --- Initialize Embedding Model ---
embedding_model_name = "all-mpnet-base-v2"  # Or "all-MiniLM-L6-v2" for faster, smaller embeddings
embedding_model = SentenceTransformer(embedding_model_name)

# --- Embed the Chunks ---
embedded_chunks = embed_chunks(mdk_se_wiki_chunks, embedding_model) # Assuming 'mdk_se_wiki_chunks' is from Step 2
print(f"Generated embeddings for {len(embedded_chunks)} chunks.")


Generated embeddings for 16448 chunks.


In [13]:
import faiss
import numpy as np
import json  # To save metadata
from tqdm.notebook import tqdm
import os  # To handle file paths

# --- Configuration ---
index_file_path = "faiss_index.bin"
metadata_file_path = "metadata.json"

def index_embeddings_faiss(embedded_chunks):
    """Indexes embeddings in FAISS and saves the index and metadata to files."""
    num_chunks = len(embedded_chunks)
    if num_chunks == 0:
        print("No chunks to index.")
        return

    # Extract embeddings and metadata
    embeddings_list = [chunk_data["embedding"] for chunk_data in embedded_chunks]
    # Convert embeddings to numpy array of float32 (FAISS requirement)
    embeddings_np = np.array(embeddings_list).astype('float32')
    ids = [chunk_data["chunk_id"] for chunk_data in embedded_chunks]
    metadatas = [{"source": chunk_data["source"], "block_type": chunk_data["block_type"]} for chunk_data in embedded_chunks]
    documents = [chunk_data["content"] for chunk_data in embedded_chunks]

    # --- Build FAISS Index ---
    dimension = embeddings_np.shape[1]  # Get embedding dimension
    index = faiss.IndexFlatL2(dimension)  # Using L2 distance (Euclidean), you can choose other index types
    index.add(embeddings_np)  # Add embeddings to the index

    # --- Save FAISS Index to File ---
    faiss.write_index(index, index_file_path)
    print(f"FAISS index saved to: {index_file_path}")

    # --- Save Metadata to JSON File ---
    metadata_to_save = []
    for i in range(num_chunks):
        metadata_to_save.append({
            "id": ids[i],
            "metadata": metadatas[i],
            "document": documents[i]
        })

    with open(metadata_file_path, 'w') as f:
        json.dump(metadata_to_save, f, indent=4) # indent for pretty printing
    print(f"Metadata saved to: {metadata_file_path}")
    print(f"Finished indexing {num_chunks} documents in FAISS.")


index_embeddings_faiss(embedded_chunks)


FAISS index saved to: faiss_index.bin
Metadata saved to: metadata.json
Finished indexing 16448 documents in FAISS.


In [7]:
import faiss
import numpy as np
import json  # To load metadata
from sentence_transformers import SentenceTransformer # Import SentenceTransformer

index_file_path = "faiss_index.bin" # Make sure this matches where you saved your index
metadata_file_path = "metadata.json" # Make sure this matches where you saved your metadata

# --- Load FAISS Index ---
loaded_index = faiss.read_index(index_file_path)

# --- Load Metadata ---
with open(metadata_file_path, 'r') as f:
    loaded_metadata = json.load(f)

def retrieve_relevant_chunks_faiss(query, embedding_model, loaded_index, loaded_metadata, top_k=15):
    """
    Retrieves relevant chunks from FAISS index based on a user query.

    Args:
        query (str): The user query.
        embedding_model: The Sentence Transformer embedding model.
        loaded_index: The loaded FAISS index.
        loaded_metadata: The loaded metadata from JSON.
        top_k (int, optional): The number of top chunks to retrieve. Defaults to 15.

    Returns:
        list: A list of dictionaries, where each dictionary represents a retrieved chunk
              and contains 'content' and 'source' keys.
    """
    query_embedding = embedding_model.encode(query).astype('float32').reshape(1, -1) # Encode query and reshape for FAISS
    distances, indices = loaded_index.search(query_embedding, top_k)

    retrieved_chunks = []
    if indices.any() and indices[0][0] != -1: # Check if results are found
        for i in range(len(indices[0])):
            index_result = indices[0][i]
            if index_result != -1: # Double check for valid index, although should be valid in this setup
                distance = distances[0][i]
                metadata_result = loaded_metadata[index_result] # Access metadata using the FAISS index
                retrieved_chunks.append({
                    "content": metadata_result['document'], # Get document content from metadata
                    "source": metadata_result['metadata']['source'], # Get source from metadata
                    "block_type": metadata_result['metadata']['block_type'], # Get block_type from metadata
                    "distance": distance
                })
    else:
        print("No relevant chunks found.")

    return retrieved_chunks


def test_retrieval_faiss(query, embedding_model, loaded_index, loaded_metadata):
    """
    Tests the retrieval function and prints the retrieved chunks using FAISS.
    """
    print(f"\n--- FAISS Retrieval Test ---")
    print(f"Query: '{query}'")

    retrieved_chunks = retrieve_relevant_chunks_faiss(query, embedding_model, loaded_index, loaded_metadata)

    if retrieved_chunks:
        print(f"\nRetrieved Chunks (Top {len(retrieved_chunks)}):")
        for i, chunk in enumerate(retrieved_chunks):
            print(f"\nChunk {i+1}:")
            print(f"  Source: {chunk['source']}")
            print(f"  Block Type: {chunk['block_type']}") # Print block_type metadata
            print(f"  Distance: {chunk['distance']:.4f}") # Print distance score
            print(f"  Content:\n{chunk['content']}")
    else:
        print("No relevant chunks retrieved for this query.")


# --- Load Sentence Transformer Embedding Model ---
# Choose a pre-trained model from Sentence Transformers.
# 'all-mpnet-base-v2' is a good general-purpose model that balances performance and speed.
embedding_model_name = 'all-mpnet-base-v2' # You can change this to other models
embedding_model = SentenceTransformer(embedding_model_name)

# --- Example Query - Change this to a question relevant to MDK-SE Wiki
test_query = "program a script that will open a door every 5 seconds in space engineers"

test_retrieval_faiss(test_query, embedding_model, loaded_index, loaded_metadata) # Run the FAISS retrieval test


--- FAISS Retrieval Test ---
Query: 'program a script that will open a door every 5 seconds in space engineers'

Retrieved Chunks (Top 15):

Chunk 1:
  Source: _Sidebar.md
  Block Type: text
  Distance: 0.9012
  Content:
Quick Introduction to Space Engineers Ingame Scripts Quick Introduction to Space Engineers Ingame Scripts The Anatomy of a Script
Your First Script
Continuous Running No Timers Needed
The Grid Terminal System
Block Groups
Handling Script Arguments
The Runtime
The Storage String
Get The Running Programmable Block
Do's and Don'ts The Anatomy of a Script The Anatomy of a Script The Anatomy of a Script Your First Script Your First Script Your First Script Continuous Running No Timers Needed

Chunk 2:
  Source: _Sidebar.md
  Block Type: text
  Distance: 0.9281
  Content:
List of Available Properties and Actions List of Available Properties and Actions List of Available Properties and Actions List of Available Properties and Actions Quick Introduction to Space Engineers Ing

In [34]:
import importlib 
import gemini_helper
importlib.reload(gemini_helper)
from gemini_helper import generate_answer_gemini

test_query = "program a script that will open a door every 5 seconds in space engineers"


# 1. Retrieve relevant chunks (using your existing function)
retrieved_context = retrieve_relevant_chunks(test_query, embedding_model, collection)

# 2. Generate answer using Gemini API
if retrieved_context:
    gemini_answer = generate_answer_gemini(test_query, retrieved_context)

    print(f"\n--- Gemini API Answer ---")
    print(f"Question: {test_query}")
    print(f"Answer: {gemini_answer}")
    print("\nRetrieved Context Sources:")
    for chunk in retrieved_context:
        print(f"- {chunk['source']}")
else:
    print("No relevant context retrieved, cannot generate answer.")


--- Gemini API Answer ---
Question: program a script that will open a door every 5 seconds in space engineers
Answer: Answer:
```csharp
public void Main(string argument, UpdateType updateSource)
{
    TimeSpan interval = TimeSpan.FromSeconds(5); // Set the interval to 5 seconds
    DateTime lastRunTime;

    if (Storage != null && Storage != "") // Load last run time from storage if available
    {
        if (DateTime.TryParse(Storage, out lastRunTime))
        {
            // Successfully parsed last run time.
        }
        else
        {
            lastRunTime = DateTime.Now; // If parsing fails, initialize to now
        }
    }
    else
    {
        lastRunTime = DateTime.Now; // Initialize last run time if no storage
    }

    if (DateTime.Now - lastRunTime >= interval)
    {
        List<IMyDoor> doorList = new List<IMyDoor>();
        GridTerminalSystem.GetBlocksOfType<IMyDoor>(doorList);

        if (doorList.Count > 0)
        {
            IMyDoor door = doorList[0]