# Simple RAG System with Table Extraction and Query Enhancement

A streamlined RAG system with essential features:
- Table detection and description
- Query rewriting for better retrieval
- Hybrid retrieval from German business reports
- Simple setup and execution

In [1]:
# Install packages
!pip install langchain-community langchain-google-genai faiss-cpu sentence-transformers python-dotenv




[notice] A new release of pip is available: 24.3.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import os
import re
from pathlib import Path
from typing import List, Dict
from dotenv import load_dotenv

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import SentenceTransformerEmbeddings
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.schema import Document
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser

load_dotenv()
print("✅ Libraries loaded")

✅ Libraries loaded


In [3]:
# Simple configuration
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
MARKDOWN_FOLDER = "./Extrahierter_Text_Markdown"
VECTOR_STORE_PATH = "./faiss_index"
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200

# Initialize models
embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")
llm = ChatGoogleGenerativeAI(model="gemini-pro", temperature=0.1, google_api_key=GOOGLE_API_KEY)

print("✅ Configuration set")

  embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")


✅ Configuration set


## Table Detection and Description

In [4]:
def detect_tables(text: str) -> List[str]:
    """Simple table detection"""
    tables = []
    
    # Markdown tables
    table_pattern = r'(\|[^\n]*\|\s*\n\|[-\s\|:]+\|\s*\n(?:\|[^\n]*\|\s*\n?)*)'
    tables.extend(re.findall(table_pattern, text, re.MULTILINE))
    
    # Financial data blocks
    financial_pattern = r'((?:in Mio\. €|in %|in Tsd\.|€ Mio\.|Mio\. EUR)[^\n]*\n(?:[^\n]*\d+[^\n]*\n?)+)'
    tables.extend(re.findall(financial_pattern, text, re.MULTILINE))
    
    return tables

def describe_table(table_text: str) -> str:
    """Generate table description using LLM"""
    prompt = ChatPromptTemplate.from_template(
        "Beschreibe diese Tabelle/Daten in 1-2 Sätzen auf Deutsch:\n{table}\n\nBeschreibung:"
    )
    
    try:
        chain = prompt | llm | StrOutputParser()
        description = chain.invoke({"table": table_text})
        return description.strip()
    except:
        return "Tabelle mit Finanz- oder Geschäftsdaten."

print("✅ Table functions ready")

✅ Table functions ready


## Query Enhancement

In [5]:
def enhance_query(query: str) -> List[str]:
    """Rewrite query for better retrieval"""
    prompt = ChatPromptTemplate.from_template(
        "Erstelle 3 alternative Suchbegriffe für diese Frage in deutschen Geschäftsberichten:\n"
        "'{query}'\n\n"
        "Gib nur die 3 Alternativen zurück, eine pro Zeile:"
    )
    
    try:
        chain = prompt | llm | StrOutputParser()
        response = chain.invoke({"query": query})
        alternatives = [line.strip() for line in response.split('\n') if line.strip()]
        return [query] + alternatives[:3]  # Original + 3 alternatives
    except:
        return [query]  # Return original if enhancement fails

print("✅ Query enhancement ready")

✅ Query enhancement ready


## Document Processing

In [6]:
def process_document(file_path: str) -> List[Document]:
    """Process single markdown file with table enhancement"""
    
    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read()
    
    # Get filename info
    filename = Path(file_path).stem
    company = filename.replace('_2023', '').replace('_', ' ')
    
    # Detect and describe tables
    tables = detect_tables(content)
    
    # Add table descriptions to content
    enhanced_content = content
    for table in tables:
        description = describe_table(table)
        enhanced_content += f"\n\n[Tabellenbeschreibung]: {description}\n"
    
    # Split into chunks
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP
    )
    
    chunks = text_splitter.split_text(enhanced_content)
    
    # Create documents
    documents = []
    for chunk in chunks:
        doc = Document(
            page_content=chunk,
            metadata={'company': company, 'source': filename}
        )
        documents.append(doc)
    
    return documents

def process_all_documents() -> List[Document]:
    """Process all markdown files"""
    
    markdown_files = list(Path(MARKDOWN_FOLDER).glob('*.md'))
    print(f"Found {len(markdown_files)} files")
    
    all_documents = []
    
    for file_path in markdown_files:
        print(f"Processing {file_path.name}...")
        try:
            documents = process_document(str(file_path))
            all_documents.extend(documents)
            print(f"  → {len(documents)} chunks")
        except Exception as e:
            print(f"  ❌ Error: {e}")
    
    print(f"Total: {len(all_documents)} documents")
    return all_documents

print("✅ Document processing ready")

✅ Document processing ready


## Vector Store Creation

In [None]:
# Create or load vector store
def load_or_create_vectorstore():
    """Load existing or create new vector store"""
    
    # Check if vector store exists
    if (os.path.exists(VECTOR_STORE_PATH) and 
        os.path.exists(os.path.join(VECTOR_STORE_PATH, "index.faiss"))):
        
        print("Loading existing vector store...")
        try:
            vectorstore = FAISS.load_local(
                VECTOR_STORE_PATH, 
                embeddings, 
                allow_dangerous_deserialization=True
            )
            print("✅ Vector store loaded")
            return vectorstore
        except:
            print("Failed to load, creating new...")
    
    # Create new vector store
    print("Creating new vector store...")
    documents = process_all_documents()
    
    if not documents:
        print("❌ No documents found")
        return None
    
    vectorstore = FAISS.from_documents(documents, embeddings)
    vectorstore.save_local(VECTOR_STORE_PATH)
    print("✅ Vector store created")
    
    return vectorstore

# Initialize vector store
vectorstore = load_or_create_vectorstore()

Creating new vector store...
Found 1 files
Processing BMW_2023.md...
  → 1650 chunks
Total: 1650 documents


## Hybrid Retrieval

In [None]:
def hybrid_retrieve(query: str, k: int = 5) -> List[Document]:
    """Retrieve using multiple query variants"""
    
    if not vectorstore:
        return []
    
    # Get enhanced queries
    enhanced_queries = enhance_query(query)
    print(f"Searching with {len(enhanced_queries)} query variants...")
    
    all_results = []
    seen_content = set()
    
    # Search with each query variant
    for enhanced_query in enhanced_queries:
        try:
            results = vectorstore.similarity_search(enhanced_query, k=k)
            
            for doc in results:
                # Avoid duplicates
                content_hash = hash(doc.page_content[:100])
                if content_hash not in seen_content:
                    all_results.append(doc)
                    seen_content.add(content_hash)
        except Exception as e:
            print(f"Search error: {e}")
    
    return all_results[:k*2]  # Return more for better context

print("✅ Hybrid retrieval ready")

## RAG Question Answering

In [None]:
def answer_question(question: str) -> Dict:
    """Answer question using RAG"""
    
    # Retrieve relevant documents
    docs = hybrid_retrieve(question)
    
    if not docs:
        return {
            'answer': 'Keine relevanten Informationen gefunden.',
            'sources': []
        }
    
    # Prepare context
    context = "\n\n".join([
        f"[{doc.metadata['company']}]: {doc.page_content}"
        for doc in docs[:8]  # Limit context
    ])
    
    # Create RAG prompt
    rag_prompt = ChatPromptTemplate.from_template(
        "Beantworte die Frage basierend auf dem Kontext aus deutschen Geschäftsberichten.\n\n"
        "Kontext:\n{context}\n\n"
        "Frage: {question}\n\n"
        "Antwort:"
    )
    
    # Generate answer
    try:
        chain = rag_prompt | llm | StrOutputParser()
        answer = chain.invoke({
            'context': context,
            'question': question
        })
        
        # Extract sources
        sources = list(set([doc.metadata['company'] for doc in docs[:5]]))
        
        return {
            'answer': answer.strip(),
            'sources': sources
        }
        
    except Exception as e:
        return {
            'answer': f'Fehler bei der Antwortgenerierung: {e}',
            'sources': []
        }

print("✅ RAG system ready")

## Test the System

In [None]:
# Test with sample questions
test_questions = [
    "Wie hat sich der Umsatz von BMW entwickelt?",
    "Was sind die wichtigsten Kennzahlen von Volkswagen?",
    "Welche Informationen gibt es über Elektromobilität?"
]

for question in test_questions:
    print(f"\n{'='*60}")
    print(f"Frage: {question}")
    print('='*60)
    
    result = answer_question(question)
    
    print(f"\nAntwort:")
    print(result['answer'])
    
    print(f"\nQuellen: {', '.join(result['sources'])}")

## Interactive Query Interface

In [None]:
# Simple interactive interface
def interactive_rag():
    """Simple question-answer interface"""
    
    print("\n🤖 RAG System für deutsche Geschäftsberichte")
    print("Geben Sie 'quit' ein zum Beenden.\n")
    
    while True:
        question = input("❓ Ihre Frage: ").strip()
        
        if question.lower() in ['quit', 'exit', 'q']:
            print("Auf Wiedersehen!")
            break
        
        if not question:
            continue
        
        print("\n🔍 Suche...")
        result = answer_question(question)
        
        print(f"\n💡 Antwort:")
        print(result['answer'])
        
        if result['sources']:
            print(f"\n📚 Quellen: {', '.join(result['sources'])}")
        
        print("\n" + "-"*50 + "\n")

# Start interactive mode
interactive_rag()

## System Summary

This simplified RAG system includes:
- ✅ Table detection and description
- ✅ Query enhancement for better retrieval
- ✅ Hybrid retrieval with multiple query variants
- ✅ Simple document processing
- ✅ Streamlined configuration

No complex error handling, minimal configuration, straightforward execution!