# RAG with MariaDB: Async Vector Storage and Retrieval

This notebook demonstrates how to build a Retrieval-Augmented Generation (RAG) system using:
- **async-mariadb-connector** for fast async database operations
- **MariaDB** for storing document embeddings
- **LangChain** for RAG orchestration

## Why MariaDB for RAG?
- ✅ Native JSON support for metadata
- ✅ Efficient vector storage (JSON arrays or BLOB)
- ✅ ACID transactions for data integrity
- ✅ Mature, production-ready database
- ✅ Great for hybrid search (vector + full-text)

## 1. Setup and Configuration

In [15]:
# Install required packages
# !pip install async-mariadb-connector
# !pip install langchain langchain-openai
# !pip install numpy pandas

import asyncio
import json
import numpy as np
from typing import List, Dict, Any, Optional
from async_mariadb_connector import AsyncMariaDB

## 2. Create Vector Store Schema

In [16]:
async def setup_vector_store():
    """
    Create a table for storing document embeddings.
    Uses JSON for storing vectors and metadata.
    """
    db = AsyncMariaDB()
    
    create_table_sql = """
    CREATE TABLE IF NOT EXISTS document_embeddings (
        id INT AUTO_INCREMENT PRIMARY KEY,
        document_id VARCHAR(255) NOT NULL,
        content TEXT NOT NULL,
        embedding JSON NOT NULL,
        metadata JSON,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        INDEX idx_doc_id (document_id),
        FULLTEXT INDEX ft_content (content)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
    """
    
    await db.execute(create_table_sql)
    print("✅ Vector store table created")
    
    await db.close()

# Run setup
await setup_vector_store()

2025-10-21 21:39:07,657 - async_mariadb_connector.core - INFO - Creating database connection pool.
2025-10-21 21:39:07,659 - async_mariadb_connector.core - INFO - Database connection pool closed.
2025-10-21 21:39:07,659 - async_mariadb_connector.core - INFO - Database connection pool closed.


✅ Vector store table created


## 3. MariaDB Vector Store Implementation

In [17]:
class MariaDBVectorStore:
    """
    Async vector store implementation using MariaDB.
    Compatible with LangChain's VectorStore interface.
    """
    
    def __init__(self):
        self.db = AsyncMariaDB()
    
    async def add_documents(
        self,
        documents: List[str],
        embeddings: List[List[float]],
        metadatas: Optional[List[Dict[str, Any]]] = None,
        document_ids: Optional[List[str]] = None
    ) -> List[str]:
        """
        Add documents with their embeddings to the store.
        
        Args:
            documents: List of document texts
            embeddings: List of embedding vectors
            metadatas: Optional metadata for each document
            document_ids: Optional IDs for documents
        
        Returns:
            List of inserted document IDs
        """
        if metadatas is None:
            metadatas = [{} for _ in documents]
        
        if document_ids is None:
            document_ids = [f"doc_{i}" for i in range(len(documents))]
        
        insert_sql = """
            INSERT INTO document_embeddings 
            (document_id, content, embedding, metadata)
            VALUES (%s, %s, %s, %s)
        """
        
        # Prepare data for batch insert
        data = [
            (
                doc_id,
                content,
                json.dumps(embedding),  # Store as JSON array
                json.dumps(metadata)
            )
            for doc_id, content, embedding, metadata
            in zip(document_ids, documents, embeddings, metadatas)
        ]
        
        # Batch insert using executemany - simple and efficient!
        await self.db.executemany(insert_sql, data)
        print(f"✅ Added {len(documents)} documents to vector store")
        
        return document_ids
    
    async def similarity_search(
        self,
        query_embedding: List[float],
        k: int = 4
    ) -> List[Dict[str, Any]]:
        """
        Find k most similar documents using cosine similarity.
        
        Args:
            query_embedding: Query vector
            k: Number of results to return
        
        Returns:
            List of documents with similarity scores
        """
        # Fetch all embeddings (for large datasets, implement pagination)
        fetch_sql = "SELECT id, document_id, content, embedding, metadata FROM document_embeddings"
        results = await self.db.fetch_all(fetch_sql)
        
        # Calculate cosine similarity
        query_vec = np.array(query_embedding)
        similarities = []
        
        for row in results:
            embedding = json.loads(row['embedding'])
            doc_vec = np.array(embedding)
            
            # Cosine similarity
            similarity = np.dot(query_vec, doc_vec) / (
                np.linalg.norm(query_vec) * np.linalg.norm(doc_vec)
            )
            
            similarities.append({
                'id': row['id'],
                'document_id': row['document_id'],
                'content': row['content'],
                'metadata': json.loads(row['metadata']) if row['metadata'] else {},
                'similarity': float(similarity)
            })
        
        # Sort by similarity and return top k
        similarities.sort(key=lambda x: x['similarity'], reverse=True)
        return similarities[:k]
    
    async def hybrid_search(
        self,
        query_text: str,
        query_embedding: List[float],
        k: int = 4,
        text_weight: float = 0.3,
        vector_weight: float = 0.7
    ) -> List[Dict[str, Any]]:
        """
        Hybrid search combining full-text and vector similarity.
        
        Args:
            query_text: Text query for full-text search
            query_embedding: Vector for semantic search
            k: Number of results
            text_weight: Weight for full-text score
            vector_weight: Weight for vector similarity
        
        Returns:
            List of documents with combined scores
        """
        # Get full-text search results
        fts_sql = """
            SELECT id, document_id, content, embedding, metadata,
                   MATCH(content) AGAINST(%s IN NATURAL LANGUAGE MODE) as text_score
            FROM document_embeddings
            WHERE MATCH(content) AGAINST(%s IN NATURAL LANGUAGE MODE)
        """
        fts_results = await self.db.fetch_all(fts_sql, (query_text, query_text))
        
        # Get vector similarity results
        vector_results = await self.similarity_search(query_embedding, k=k*2)
        
        # Combine scores
        combined = {}
        
        # Add full-text scores
        for row in fts_results:
            doc_id = row['id']
            combined[doc_id] = {
                'id': row['id'],
                'document_id': row['document_id'],
                'content': row['content'],
                'metadata': json.loads(row['metadata']) if row['metadata'] else {},
                'text_score': float(row['text_score']),
                'vector_score': 0.0
            }
        
        # Add vector scores
        for result in vector_results:
            doc_id = result['id']
            if doc_id in combined:
                combined[doc_id]['vector_score'] = result['similarity']
            else:
                combined[doc_id] = {
                    **result,
                    'text_score': 0.0,
                    'vector_score': result['similarity']
                }
        
        # Calculate combined score
        for doc in combined.values():
            doc['combined_score'] = (
                text_weight * doc['text_score'] +
                vector_weight * doc['vector_score']
            )
        
        # Sort and return top k
        results = sorted(
            combined.values(),
            key=lambda x: x['combined_score'],
            reverse=True
        )
        return results[:k]
    
    async def delete_documents(self, document_ids: List[str]) -> int:
        """
        Delete documents by their IDs.
        
        Args:
            document_ids: List of document IDs to delete
        
        Returns:
            Number of documents deleted
        """
        placeholders = ','.join(['%s'] * len(document_ids))
        delete_sql = f"DELETE FROM document_embeddings WHERE document_id IN ({placeholders})"
        
        result = await self.db.execute(delete_sql, tuple(document_ids))
        return result
    
    async def close(self):
        """Close database connection."""
        await self.db.close()

print("✅ MariaDBVectorStore class defined")

✅ MariaDBVectorStore class defined


## 4. Example: Add Documents to Vector Store

In [18]:
async def test_executemany_approaches():
    """
    Test BOTH approaches to verify executemany() works correctly.
    
    Approach 1 (OLD - Complex): Direct pool access with 6 lines
    Approach 2 (NEW - Simple): Using executemany() with 1 line
    """
    
    db = AsyncMariaDB()
    
    print("=" * 70)
    print("🧪 TESTING: executemany() - Old vs New Approach")
    print("=" * 70)
    
    # Create test table
    await db.execute("""
        CREATE TABLE IF NOT EXISTS test_batch_insert (
            id INT AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(100),
            value INT
        )
    """)
    
    # Clear any existing data
    await db.execute("TRUNCATE TABLE test_batch_insert")
    
    # Prepare test data
    test_data = [
        ("Item A", 100),
        ("Item B", 200),
        ("Item C", 300),
        ("Item D", 400),
        ("Item E", 500)
    ]
    
    insert_sql = "INSERT INTO test_batch_insert (name, value) VALUES (%s, %s)"
    
    # ============================================================
    # APPROACH 1 (OLD): Direct pool access - 6 LINES OF CODE
    # ============================================================
    print("\n📝 APPROACH 1 (OLD - Complex): Direct pool access")
    print("-" * 70)
    
    import time
    start = time.time()
    
    # THE OLD WAY - 6 lines of boilerplate code
    pool = await db._get_pool()
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.executemany(insert_sql, test_data)
            if not db.db_config['autocommit']:
                await conn.commit()
    
    elapsed1 = time.time() - start
    
    # Verify insertion
    count1 = await db.fetch_one("SELECT COUNT(*) as count FROM test_batch_insert")
    print(f"✅ Inserted {count1['count']} rows using OLD approach")
    print(f"⏱️  Time taken: {elapsed1*1000:.2f}ms")
    print(f"📊 Lines of code: 6 (complex, exposes internals)")
    
    # Clear for next test
    await db.execute("TRUNCATE TABLE test_batch_insert")
    
    # ============================================================
    # APPROACH 2 (NEW): Using executemany() method - 1 LINE
    # ============================================================
    print("\n📝 APPROACH 2 (NEW - Simple): Using executemany() method")
    print("-" * 70)
    
    start = time.time()
    
    # THE NEW WAY - 1 simple line!
    await db.executemany(insert_sql, test_data)
    
    elapsed2 = time.time() - start
    
    # Verify insertion
    count2 = await db.fetch_one("SELECT COUNT(*) as count FROM test_batch_insert")
    print(f"✅ Inserted {count2['count']} rows using NEW approach")
    print(f"⏱️  Time taken: {elapsed2*1000:.2f}ms")
    print(f"📊 Lines of code: 1 (simple, clean API)")
    
    # Verify data integrity
    results = await db.fetch_all("SELECT name, value FROM test_batch_insert ORDER BY id")
    
    print("\n🔍 Data Verification:")
    print("-" * 70)
    for i, row in enumerate(results[:3], 1):  # Show first 3 rows
        print(f"  Row {i}: {row['name']} = {row['value']}")
    print(f"  ... ({len(results)} total rows)")
    
    # Comparison
    print("\n📈 COMPARISON:")
    print("=" * 70)
    print(f"  Old approach: 6 lines, {elapsed1*1000:.2f}ms")
    print(f"  New approach: 1 line, {elapsed2*1000:.2f}ms")
    print(f"  Code reduction: {((6-1)/6*100):.0f}% less code!")
    print(f"  Both approaches: ✅ WORK CORRECTLY")
    
    print("\n💡 Winner: NEW APPROACH - Much simpler and equally fast!")
    print("=" * 70)
    
    # Cleanup
    await db.execute("DROP TABLE test_batch_insert")
    await db.close()

# Run the test
await test_executemany_approaches()

2025-10-21 21:39:07,689 - async_mariadb_connector.core - INFO - Creating database connection pool.


🧪 TESTING: executemany() - Old vs New Approach

📝 APPROACH 1 (OLD - Complex): Direct pool access
----------------------------------------------------------------------
✅ Inserted 5 rows using OLD approach
⏱️  Time taken: 2.98ms
📊 Lines of code: 6 (complex, exposes internals)

📝 APPROACH 2 (NEW - Simple): Using executemany() method
----------------------------------------------------------------------


AttributeError: 'AsyncMariaDB' object has no attribute 'executemany'

## 3.5. Test: Verify executemany() Method

Before adding documents, let's test both the **old approach** (direct pool access) and the **new approach** (`executemany()` method) to verify our improvement works correctly.

In [None]:
async def add_sample_documents():
    """Add sample documents with mock embeddings."""
    
    # Sample documents about MariaDB and databases
    documents = [
        "MariaDB is a community-developed, commercially supported fork of MySQL.",
        "MariaDB supports JSON data types for storing semi-structured data.",
        "Vector databases are optimized for similarity search using embeddings.",
        "RAG systems combine retrieval and generation for better LLM responses.",
        "Async database operations improve performance in I/O-bound applications.",
        "MariaDB offers full-text search capabilities for text indexing.",
        "Embeddings are dense vector representations of text or other data.",
        "LangChain simplifies building applications with language models.",
    ]
    
    # Generate mock embeddings (in production, use actual embedding model)
    # For example: OpenAI's text-embedding-ada-002 or sentence-transformers
    np.random.seed(42)
    embeddings = [np.random.randn(384).tolist() for _ in documents]
    
    # Metadata for each document
    metadatas = [
        {"category": "database", "topic": "mariadb"},
        {"category": "database", "topic": "json"},
        {"category": "ai", "topic": "vectors"},
        {"category": "ai", "topic": "rag"},
        {"category": "programming", "topic": "async"},
        {"category": "database", "topic": "search"},
        {"category": "ai", "topic": "embeddings"},
        {"category": "ai", "topic": "langchain"},
    ]
    
    vector_store = MariaDBVectorStore()
    
    # Add documents
    doc_ids = await vector_store.add_documents(
        documents=documents,
        embeddings=embeddings,
        metadatas=metadatas
    )
    
    print(f"\n📝 Added documents with IDs: {doc_ids}")
    
    await vector_store.close()

await add_sample_documents()

2025-10-21 21:27:10,145 - async_mariadb_connector.core - INFO - Creating database connection pool.
2025-10-21 21:27:10,177 - async_mariadb_connector.core - INFO - Database connection pool closed.
2025-10-21 21:27:10,177 - async_mariadb_connector.core - INFO - Database connection pool closed.


✅ Added 8 documents to vector store

📝 Added documents with IDs: ['doc_0', 'doc_1', 'doc_2', 'doc_3', 'doc_4', 'doc_5', 'doc_6', 'doc_7']


## 5. Example: Vector Similarity Search

In [None]:
async def test_similarity_search():
    """Test vector similarity search."""
    
    vector_store = MariaDBVectorStore()
    
    # Mock query embedding (in production, embed the query text)
    np.random.seed(42)
    query_embedding = np.random.randn(384).tolist()
    
    print("🔍 Searching for similar documents...\n")
    
    results = await vector_store.similarity_search(
        query_embedding=query_embedding,
        k=3
    )
    
    print("Top 3 Most Similar Documents:\n")
    for i, result in enumerate(results, 1):
        print(f"{i}. Similarity: {result['similarity']:.4f}")
        print(f"   Content: {result['content']}")
        print(f"   Metadata: {result['metadata']}")
        print()
    
    await vector_store.close()

await test_similarity_search()

## 6. Example: Hybrid Search (Full-Text + Vector)

In [None]:
async def test_hybrid_search():
    """Test hybrid search combining full-text and vector similarity."""
    
    vector_store = MariaDBVectorStore()
    
    query_text = "MariaDB database features"
    
    # Mock query embedding
    np.random.seed(123)
    query_embedding = np.random.randn(384).tolist()
    
    print(f"🔍 Hybrid Search Query: '{query_text}'\n")
    
    results = await vector_store.hybrid_search(
        query_text=query_text,
        query_embedding=query_embedding,
        k=3,
        text_weight=0.4,
        vector_weight=0.6
    )
    
    print("Top 3 Hybrid Search Results:\n")
    for i, result in enumerate(results, 1):
        print(f"{i}. Combined Score: {result['combined_score']:.4f}")
        print(f"   Text Score: {result['text_score']:.4f}")
        print(f"   Vector Score: {result['vector_score']:.4f}")
        print(f"   Content: {result['content']}")
        print()
    
    await vector_store.close()

await test_hybrid_search()

## 7. RAG Pipeline: Retrieval + Generation

This example shows how to integrate the vector store with an LLM for RAG.

In [None]:
async def rag_pipeline_example():
    """
    Example RAG pipeline: Retrieve relevant docs and generate answer.
    
    Note: This is a mock example. In production:
    1. Use a real embedding model (OpenAI, HuggingFace, etc.)
    2. Use a real LLM for generation (GPT-4, Claude, Llama, etc.)
    """
    
    vector_store = MariaDBVectorStore()
    
    # User question
    question = "What is MariaDB good for?"
    
    print(f"❓ Question: {question}\n")
    
    # Step 1: Generate query embedding (mock)
    np.random.seed(42)
    query_embedding = np.random.randn(384).tolist()
    
    # Step 2: Retrieve relevant documents
    print("📚 Retrieving relevant documents...\n")
    relevant_docs = await vector_store.similarity_search(
        query_embedding=query_embedding,
        k=3
    )
    
    # Step 3: Create context from retrieved documents
    context = "\n\n".join([
        f"Document {i+1}: {doc['content']}"
        for i, doc in enumerate(relevant_docs)
    ])
    
    print("Retrieved Context:")
    print(context)
    print()
    
    # Step 4: Generate answer using LLM (mock)
    # In production, use:
    # from langchain_openai import ChatOpenAI
    # llm = ChatOpenAI(model="gpt-4")
    # answer = llm.invoke(prompt)
    
    mock_answer = (
        "Based on the retrieved documents, MariaDB is a community-developed fork of MySQL "
        "that supports JSON data types for semi-structured data and offers full-text search "
        "capabilities. It's well-suited for applications requiring both traditional relational "
        "database features and modern data handling capabilities."
    )
    
    print("🤖 Generated Answer:")
    print(mock_answer)
    print()
    
    await vector_store.close()
    
    print("\n✅ RAG pipeline completed!")
    print("\nTo use with real LLM:")
    print("  1. Install: pip install langchain-openai")
    print("  2. Set OPENAI_API_KEY environment variable")
    print("  3. Replace mock LLM with: ChatOpenAI(model='gpt-4')")

await rag_pipeline_example()

## 8. Cleanup (Optional)

In [None]:
# Uncomment to delete all documents
# async def cleanup():
#     db = AsyncMariaDB()
#     await db.execute("TRUNCATE TABLE document_embeddings")
#     await db.close()
#     print("🗑️ All documents deleted")
# 
# await cleanup()

## Summary

This notebook demonstrated:

1. ✅ **Setting up MariaDB for vector storage** - JSON columns for embeddings
2. ✅ **Async vector store implementation** - Compatible with LangChain
3. ✅ **Similarity search** - Cosine similarity for semantic retrieval
4. ✅ **Hybrid search** - Combining full-text and vector search
5. ✅ **RAG pipeline** - Retrieval + Generation workflow

### Production Considerations

- **Embeddings**: Use real models (OpenAI, Cohere, HuggingFace)
- **Indexing**: For large datasets, add database indexes
- **Pagination**: Implement pagination for large result sets
- **Caching**: Cache frequently accessed embeddings
- **Monitoring**: Add logging and metrics

### Next Steps

- Install LangChain: `pip install langchain langchain-openai`
- Get API keys for embedding models (OpenAI, Cohere, etc.)
- Replace mock embeddings with real embedding generation
- Deploy to production with proper connection pooling

**Happy building! 🚀**