# Text File Ingestion with LangChain and PGVector

This notebook demonstrates how to ingest text files (.txt) into a PostgreSQL vector database using LangChain and pgvector for RAG applications.

## 1. Import Required Libraries

In [1]:
# Import required libraries
import os
import glob
import psycopg2
from langchain_community.vectorstores import PGVector
from langchain_community.document_loaders import TextLoader, DirectoryLoader
from langchain_text_splitters import CharacterTextSplitter, RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
from langchain_community.embeddings import SentenceTransformerEmbeddings
import numpy as np
import pandas as pd
from pathlib import Path

print("✅ Libraries imported successfully!")

✅ Libraries imported successfully!


## 2. Database Connection Setup

In [2]:
# Database configuration
DB_CONFIG = {
    'host': 'localhost',
    'port': '5432',
    'database': 'rag_db',
    'user': 'rag_user',
    'password': 'rag_password'
}

# Create connection string for LangChain
CONNECTION_STRING = f"postgresql+psycopg2://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"

print("✅ Database connection string configured")
print(f"Connection: {CONNECTION_STRING.replace(DB_CONFIG['password'], '***')}")

# Test database connection
try:
    conn = psycopg2.connect(**DB_CONFIG)
    conn.close()
    print("✅ Database connection test successful!")
except Exception as e:
    print(f"❌ Database connection failed: {e}")

✅ Database connection string configured
Connection: postgresql+psycopg2://rag_user:***@localhost:5432/rag_db
✅ Database connection test successful!


## 3. Initialize Embeddings

In [3]:
# Option 1: Use OpenAI embeddings (requires API key)
# os.environ["OPENAI_API_KEY"] = "your-openai-api-key-here"
# embeddings = OpenAIEmbeddings()

# Option 2: Use local sentence transformers (free, no API key needed)
embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")

print("✅ Embeddings initialized")
print(f"Model: {embeddings.model_name}")

# Test embeddings with a sample text
test_text = "This is a test document for embeddings."
test_embedding = embeddings.embed_query(test_text)
print(f"✅ Test embedding generated with dimension: {len(test_embedding)}")

  embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")


✅ Embeddings initialized
Model: all-MiniLM-L6-v2
✅ Test embedding generated with dimension: 384


## 4. Create PGVector Store

In [None]:
# Create PGVector store for text ingestion
COLLECTION_NAME = "text_ingestion_docs"

try:
    vectorstore = PGVector(
        connection_string=CONNECTION_STRING,
        embedding_function=embeddings,
        collection_name=COLLECTION_NAME,
        pre_delete_collection=True  # Delete existing collection if it exists
    )
    print("✅ PGVector store created successfully!")
    print(f"Collection name: {COLLECTION_NAME}")
except Exception as e:
    print(f"❌ Error creating vector store: {e}")

## 5. Load Text Files

In [None]:
# Specify the directory containing text files
TEXT_FILES_DIR = "./text_files"  # You can change this to your text files directory

# Create directory if it doesn't exist
Path(TEXT_FILES_DIR).mkdir(exist_ok=True)

print(f"📁 Text files directory: {TEXT_FILES_DIR}")

# Option 1: Load all text files from directory
try:
    loader = DirectoryLoader(
        TEXT_FILES_DIR,
        glob="**/*.txt",
        loader_cls=TextLoader,
        show_progress=True
    )
    documents = loader.load()
    print(f"✅ Loaded {len(documents)} text documents from directory")
except Exception as e:
    print(f"❌ Error loading documents from directory: {e}")
    documents = []

# Option 2: Load individual text files
if not documents:
    # Create sample text files for demonstration
    sample_files = [
        ("sample_doc1.txt", "This is a sample document about artificial intelligence. AI refers to the simulation of human intelligence in machines that are programmed to think like humans and mimic their actions."),
        ("sample_doc2.txt", "Machine learning is a subset of artificial intelligence that enables computers to learn without being explicitly programmed. It uses algorithms to identify patterns in data."),
        ("sample_doc3.txt", "Natural language processing (NLP) is a field of AI that focuses on the interaction between computers and humans through natural language.")
    ]

    documents = []
    for filename, content in sample_files:
        filepath = os.path.join(TEXT_FILES_DIR, filename)
        with open(filepath, 'w', encoding='utf-8') as f:
            f.write(content)

        # Load the created file
        loader = TextLoader(filepath)
        docs = loader.load()
        for doc in docs:
            doc.metadata['source'] = filename
            doc.metadata['filepath'] = filepath
        documents.extend(docs)

    print(f"✅ Created and loaded {len(documents)} sample text documents")

# Display loaded documents info
for i, doc in enumerate(documents, 1):
    print(f"{i}. {doc.metadata.get('source', 'Unknown')}: {len(doc.page_content)} characters")

## 6. Split Documents

In [None]:
# Split documents into chunks for better retrieval
CHUNK_SIZE = 1000  # Characters per chunk
CHUNK_OVERLAP = 200  # Overlap between chunks

print(f"📏 Chunk size: {CHUNK_SIZE} characters")
print(f"🔗 Chunk overlap: {CHUNK_OVERLAP} characters")

# Option 1: Use RecursiveCharacterTextSplitter (recommended for text)
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=CHUNK_SIZE,
    chunk_overlap=CHUNK_OVERLAP,
    separators=["\n\n", "\n", " ", ""],  # Try splitting by paragraphs, lines, words, characters
    length_function=len
)

# Option 2: Use CharacterTextSplitter (simpler)
# text_splitter = CharacterTextSplitter(
#     chunk_size=CHUNK_SIZE,
#     chunk_overlap=CHUNK_OVERLAP,
#     separator="\n"
# )

# Split the documents
try:
    split_documents = text_splitter.split_documents(documents)
    print(f"✅ Split {len(documents)} documents into {len(split_documents)} chunks")

    # Display chunk statistics
    chunk_lengths = [len(doc.page_content) for doc in split_documents]
    print("📊 Chunk Statistics:")
    print(f"   Average chunk length: {sum(chunk_lengths) / len(chunk_lengths):.0f} characters")
    print(f"   Min chunk length: {min(chunk_lengths)} characters")
    print(f"   Max chunk length: {max(chunk_lengths)} characters")

except Exception as e:
    print(f"❌ Error splitting documents: {e}")
    split_documents = documents  # Fallback to original documents

# Display sample chunks
print("\n📄 Sample chunks:")
for i, chunk in enumerate(split_documents[:3], 1):
    print(f"\nChunk {i}:")
    print(f"Source: {chunk.metadata.get('source', 'Unknown')}")
    print(f"Length: {len(chunk.page_content)} characters")
    print(f"Content preview: {chunk.page_content[:100]}...")

## 7. Add Documents to Vector Store

In [None]:
# Add split documents to vector store
print("🚀 Starting document ingestion...")
print(f"📊 Adding {len(split_documents)} document chunks to vector store")

try:
    # Add documents in batches to handle large datasets
    batch_size = 100
    total_added = 0

    for i in range(0, len(split_documents), batch_size):
        batch = split_documents[i:i + batch_size]
        vectorstore.add_documents(batch)
        total_added += len(batch)
        print(f"✅ Added batch {i//batch_size + 1}: {len(batch)} documents (Total: {total_added})")

    print(f"\n🎉 Successfully ingested {total_added} document chunks!")
    print(f"📚 Collection: {COLLECTION_NAME}")
    print(f"🤖 Embedding model: {embeddings.model_name}")

except Exception as e:
    print(f"❌ Error adding documents to vector store: {e}")

# Optional: Add metadata to track ingestion
try:
    ingestion_metadata = {
        "ingestion_timestamp": pd.Timestamp.now().isoformat(),
        "total_documents": len(split_documents),
        "chunk_size": CHUNK_SIZE,
        "chunk_overlap": CHUNK_OVERLAP,
        "embedding_model": embeddings.model_name,
        "collection_name": COLLECTION_NAME
    }

    # You could store this metadata in the database or a separate file
    print("📋 Ingestion metadata:")
    for key, value in ingestion_metadata.items():
        print(f"   {key}: {value}")

except Exception as e:
    print(f"⚠️ Could not create ingestion metadata: {e}")

## 8. Verify Ingestion

In [None]:
# Verify ingestion by querying the vector store
print("🔍 Verifying document ingestion...")

# Test queries
test_queries = [
    "What is artificial intelligence?",
    "How does machine learning work?",
    "What is natural language processing?"
]

for query in test_queries:
    try:
        results = vectorstore.similarity_search(query, k=2)
        print(f"\n🔍 Query: '{query}'")
        print(f"📊 Found {len(results)} similar documents:")

        for i, doc in enumerate(results, 1):
            print(f"   {i}. Source: {doc.metadata.get('source', 'Unknown')}")
            print(f"      Content: {doc.page_content[:100]}...")
            print(f"      Similarity: {doc.metadata.get('score', 'N/A')}")

    except Exception as e:
        print(f"❌ Error querying for '{query}': {e}")

# Get collection statistics
print("\n📊 Collection Statistics:")
try:
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()

    # Get document count
    cursor.execute(f"""
        SELECT COUNT(*)
        FROM langchain_pg_embedding
        WHERE collection_id = (
            SELECT uuid FROM langchain_pg_collection WHERE name = '{COLLECTION_NAME}'
        )
    """)

    doc_count = cursor.fetchone()[0]
    print(f"   Documents in collection: {doc_count}")
    print(f"   Collection name: {COLLECTION_NAME}")

    # Get embedding dimension
    cursor.execute(f"""
        SELECT embedding
        FROM langchain_pg_embedding
        WHERE collection_id = (
            SELECT uuid FROM langchain_pg_collection WHERE name = '{COLLECTION_NAME}'
        )
        LIMIT 1
    """)

    sample_embedding = cursor.fetchone()
    if sample_embedding:
        embedding_dim = len(sample_embedding[0])
        print(f"   Embedding dimension: {embedding_dim}")

    cursor.close()
    conn.close()

except Exception as e:
    print(f"❌ Error getting statistics: {e}")

print("\n✅ Ingestion verification completed!")
print("\n💡 Next steps:")
print("- Use this collection for RAG applications")
print("- Fine-tune chunk size and overlap for your use case")
print("- Add more text files to expand your knowledge base")
print("- Implement document filtering and metadata enrichment")