⚙️ Config-Driven Legal RAG Indexer with LanceDB + Legal-BERT

In [2]:
import os
import re
import fitz
import lancedb
import numpy as np
from typing import List, Dict
from lancedb.pydantic import Vector, LanceModel
from legalbert_embedder import LegalBERTEmbedder
import csv

In [7]:
# 🔧 CONFIGURATION
config = {
    "model_name": "nlpaueb/legal-bert-base-uncased",
    "comp_pdf": "CompaniesAct.pdf",
    "bank_pdf": "BankruptcyAct.pdf",
    "db_path": "./Data",

    # Table names for LanceDB
    "comp_table": "CompaniesAct",
    "bank_table": "BankruptcyAct",
    "constitution_table": "IndianConstitution",

    # CSV path for Indian Constitution
    "constitution_csv": "Indian_Constitution.csv"
}


In [8]:
def load_pdf_text(path: str) -> List[str]:
    doc = fitz.open(path)
    return [page.get_text() for page in doc]

In [9]:
def extract_sections_bankruptcy_act(full_text: str, config: Dict) -> List[Dict]:
    section_pattern = re.compile(r"(?i)(SECTION\s*\d+[A-Z]?(?:\.\d+)?(?:[A-Z]*)?)")
    part_pattern = re.compile(r"(?i)^\s*(PART\s+[A-Z]+.*?)$", re.MULTILINE)
    chapter_pattern = re.compile(r"(?i)^\s*(CHAPTER\s+[IVXLC]+.*?)$", re.MULTILINE)

    # Find headings
    section_matches = list(section_pattern.finditer(full_text))
    part_matches = list(part_pattern.finditer(full_text))
    chapter_matches = list(chapter_pattern.finditer(full_text))

    # Mapping start locations
    part_map = {m.start(): m.group(1).strip() for m in part_matches}
    chapter_map = {m.start(): m.group(1).strip() for m in chapter_matches}
    part_starts = sorted(part_map.keys())
    chapter_starts = sorted(chapter_map.keys())

    chunks = []
    for i, match in enumerate(section_matches):
        start = match.start()
        end = section_matches[i+1].start() if i+1 < len(section_matches) else len(full_text)
        chunk_text = full_text[start:end].strip()
        section_title = match.group(1).strip()

        # Find closest PART
        part_title = None
        for p_start in reversed(part_starts):
            if p_start <= start:
                part_title = part_map[p_start]
                break

        # Find closest CHAPTER
        chapter_title = None
        for ch_start in reversed(chapter_starts):
            if ch_start <= start:
                chapter_title = chapter_map[ch_start]
                break

        chunks.append({
            "id": f"bankruptcy_section_{i}",
            "chunk": chunk_text,
            "section_title": section_title,
            "chapter_title": chapter_title,
            "part_title": part_title,
            "page": None,
            "source": "Bankruptcy Act",
        })

    return chunks


In [10]:
def extract_sections_with_meta_comp(full_text: str, config: Dict) -> List[Dict]:
    section_pattern = re.compile(r"(?i)(SECTION\s*\d+[A-Z]?(?:\.\d+)?[A-Z]*)")
    chapter_pattern = re.compile(r"(?i)^\s*(CHAPTER\s+[IVXLC]+.*?)$", re.MULTILINE)

    matches = list(section_pattern.finditer(full_text))
    chapter_matches = list(chapter_pattern.finditer(full_text))

    # Map chapter start positions to titles
    chapter_map = {m.start(): m.group(1).strip() for m in chapter_matches}
    chapter_starts = sorted(chapter_map.keys())

    chunks = []
    for i, match in enumerate(matches):
        start = match.start()
        end = matches[i + 1].start() if i + 1 < len(matches) else len(full_text)
        chunk_text = full_text[start:end].strip()
        section_title = match.group(1).strip()

        # Get closest preceding chapter
        chapter_title = None
        for ch_start in reversed(chapter_starts):
            if ch_start <= start:
                chapter_title = chapter_map[ch_start]
                break

        chunks.append({
            "id": f"section_{i}",
            "chunk": chunk_text,
            "section_title": section_title,
            "chapter_title": chapter_title,
            "page": None,
            "source": "Companies Act",
        })

    return chunks

In [11]:
def create_lancedb_index_constitution(chunks: List[Dict], embeddings: np.ndarray, db_path: str, table_name: str):
    class ConstitutionArticle(LanceModel):
        id: str
        chunk: str
        embedding: Vector(768)
        section_title: str
        chapter_title: str
        page: int = None
        source: str

    try:
        if not os.path.exists(db_path):
            os.makedirs(db_path)

        db = lancedb.connect(db_path)

        for i in range(len(chunks)):
            chunks[i]["embedding"] = embeddings[i].tolist()

        table = db.create_table(table_name, data=chunks, schema=ConstitutionArticle, mode="overwrite")
        return table

    except Exception as e:
        print(f"❌ Error creating LanceDB index for Constitution: {e}")
        return None

In [12]:
def create_lancedb_index_bankruptcy(chunks: List[Dict], embeddings: np.ndarray, db_path: str, table_name: str):
    class Document1(LanceModel):
        id: str
        chunk: str
        embedding: Vector(768)
        part_title: str = None
        chapter_title: str = None
        section_title: str = None
        page: int
        source: str
    if not os.path.exists(db_path): os.makedirs(db_path)
    db = lancedb.connect(db_path)
    for i in range(len(chunks)):
        chunks[i]["embedding"] = embeddings[i].tolist()
    table = db.create_table(table_name, data=chunks, schema=Document1, mode="overwrite")
    return table

In [13]:
def create_lancedb_index_company(chunks: List[Dict], embeddings: np.ndarray, db_path: str, table_name: str):
    class Document2(LanceModel):
        id: str
        chunk: str
        embedding: Vector(768)
        section_title: str = None
        chapter_title: str = None
        page: int = None
        source: str
    if not os.path.exists(db_path): os.makedirs(db_path)
    db = lancedb.connect(db_path)
    for i in range(len(chunks)):
        chunks[i]["embedding"] = embeddings[i].tolist()
    table = db.create_table(table_name, data=chunks, schema=Document2, mode="overwrite")
    return table

In [14]:
def load_constitution_articles(file_path: str) -> List[Dict]:
    articles = []
    try:
        with open(file_path, newline='', encoding='utf-8') as csvfile:
            reader = csv.DictReader(csvfile)
            for i, row in enumerate(reader):
                articles.append({
                    "id": f"constitution_article_{i}",
                    "chunk": row["article_desc"].strip(),
                    "section_title": row["article_id"].strip(),
                    "chapter_title": "Indian Constitution",
                    "page": None,
                    "source": "Indian Constitution"
                })
        return articles
    except FileNotFoundError:
        print(f"❌ CSV file not found: {file_path}")
    except KeyError as ke:
        print(f"❌ Missing column in CSV: {ke}")
    except Exception as e:
        print(f"❌ Error loading Constitution articles: {e}")
    return []

In [None]:
def index_constitution_articles(config: Dict) -> None:
    try:
        model = LegalBERTEmbedder()
        chunks = load_constitution_articles(config["constitution_csv"])

        if not chunks:
            print("⚠️ No articles loaded. Skipping indexing.")
            return
        print(f"Extracted {len(chunks)} articles in the Indian Constitution CSV.")
        embeddings = model.encode([c["chunk"] for c in chunks])
        result = create_lancedb_index_constitution(chunks, embeddings, config["db_path"], config["constitution_table"])

        if result:
            print("✅ LanceDB index created for Indian Constitution.")
    except Exception as e:
        print(f"❌ Error indexing Constitution articles: {e}")

In [20]:
def index_bankruptcy_act(config: Dict) -> None:
    model = LegalBERTEmbedder()
    pages = load_pdf_text(config["bank_pdf"])
    full_text = "\n".join(pages)
    chunks = extract_sections_bankruptcy_act(full_text, config)
    print(f"Extracted {len(chunks)} semantic sections.")
    embeddings = model.encode([c["chunk"] for c in chunks])
    create_lancedb_index_bankruptcy(chunks, embeddings, config["db_path"], config["bank_table"])
    print("✅ LanceDB index created for Bankruptcy Act.")

In [21]:
# RUN PIPELINE
def index_company_act(config: Dict) -> None:
    model = LegalBERTEmbedder()
    pages = load_pdf_text(config["comp_pdf"])
    full_text = "\n".join(pages)
    chunks = extract_sections_with_meta_comp(full_text, config)
    print(f"Extracted {len(chunks)} semantic sections.")
    embeddings = model.encode([c["chunk"] for c in chunks])
    create_lancedb_index_company(chunks, embeddings, config["db_path"], config["comp_table"])
    print("✅ LanceDB index created for Company Act.")

In [22]:
index_bankruptcy_act(config)
index_company_act(config)
index_constitution_articles(config)

Extracted 507 semantic sections.
✅ LanceDB index created for Bankruptcy Act.
Extracted 566 semantic sections.
✅ LanceDB index created for Company Act.
📄 Found 454 articles in the Indian Constitution CSV.
✅ LanceDB index created for Indian Constitution.


In [4]:
import lancedb
from legalbert_embedder import LegalBERTEmbedder
import pandas as pd

In [44]:
class LegalRetriever:
    def __init__(self, top_k: int = 5):
        self.model = LegalBERTEmbedder()
        self.top_k = top_k
        self.dbs = {}

    def _get_table(self, db_path: str, table_name: str):
        if db_path not in self.dbs:
            try:
                self.dbs[db_path] = lancedb.connect(db_path)
            except Exception as e:
                raise RuntimeError(f"Failed to connect to DB at {db_path}: {str(e)}")
        try:
            return self.dbs[db_path].open_table(table_name)
        except Exception as e:
            raise RuntimeError(f"Failed to open table '{table_name}' in DB '{db_path}': {str(e)}")

    def query_multiple(self, query_text: str, tables: list[dict]) -> list:
        try:
            query_vec = self.model.encode([query_text])[0].tolist()
        except Exception as e:
            raise RuntimeError(f"Failed to embed query: {str(e)}")

        all_results = []

        for tbl in tables:
            try:
                table = self._get_table(tbl["db_path"], tbl["table_name"])
                df = table.search(query_vec).limit(self.top_k).to_df()
                all_results.append(df)
            except Exception as e:
                print(f"Warning: Failed to query table '{tbl['table_name']}' in DB '{tbl['db_path']}': {str(e)}")
        if not all_results:
            return []

        try:
            merged_df = pd.concat(all_results, ignore_index=True)
            if "_distance" in merged_df.columns:
                merged_df = merged_df.sort_values(by="_distance", ascending=True)
        except Exception as e:
            raise RuntimeError(f"Failed to process merged results: {str(e)}")
        print(merged_df)
        return [
            {
                "chunk": row.get("chunk"),
                "part_title": row.get("part_title"),
                "chapter_title": row.get("chapter_title"),
                "section_title": row.get("section_title"),
                "page": row.get("page"),
                "source": row.get("source"),
                "score": row.get("_distance")
            }
            for _, row in merged_df.iterrows()
        ][:self.top_k]

In [None]:
retriever = LegalRetriever(top_k=5)
def query_legal_documents(query: str) -> List[Dict]:
    tables = [
        {"db_path": config["db_path"], "table_name": config["comp_table"]},
        {"db_path": config["db_path"], "table_name": config["bank_table"]},
        {"db_path": config["db_path"], "table_name": config["constitution_table"]}
    ]
    return retriever.query_multiple(query, tables)

In [46]:
query_legal_documents(query = "What are the requirements for corporate insolvency resolution under the Companies Act and the Insolvency and Bankruptcy Code?")

                       id                                              chunk  \
5  bankruptcy_section_389  section 196; \n(zv) the intervals in which the...   
6   bankruptcy_section_74  section 62 or such an \nappeal is not time bar...   
7  bankruptcy_section_317  section 130; \n\n \n \n \n143 \n \n \n \n \n(w...   
8   bankruptcy_section_82  section 30, it shall by order approve the reso...   
9   bankruptcy_section_80  section 29A as amended by the Insolvency and \...   
0             section_204  section 3 of the \nChartered Accountants Act, ...   
1             section_561  section 197. \n3[(b) where the company— \n(i) ...   
2             section_480  section 10E of the \nCompanies Act, 1956 (1 of...   
3             section_144  section 179 in the ordinary course of its busi...   
4             section_258  section 134 shall disclose the composition of ...   

  section_title                                      chapter_title  page  \
5   section 196                            

  df = table.search(query_vec).limit(self.top_k).to_df()


[{'chunk': 'section 196; \n(zv) the intervals in which the periodic study, research and audit of the functioning \nand performance of the insolvency professional agencies, insolvency professionals and \ninformation utilities under clause (r), and mechanism for disposal of assets under clause \n(t), of sub-section (1) of',
  'part_title': 'PART V',
  'chapter_title': 'CHAPTER VII',
  'section_title': 'section 196',
  'page': nan,
  'source': 'Bankruptcy Act',
  'score': 19.47675895690918},
 {'chunk': 'section 62 or such an \nappeal is not time barred under any provision of law for the time being in force; or \n                (iii) where a legal proceeding has been initiated in any court against the decision of \nthe Adjudicating Authority in respect of a resolution plan;]  \n(c) provides for the management of the affairs of the Corporate debtor after \napproval of the resolution plan;  \n(d) The implementation and supervision of the resolution plan;  \n(e) does not contravene any of th

In [47]:
query_legal_documents(query = "Provided that a company in respect of which such appeal or reference or inquiry stands abated under this clause may make reference to the National Company Law Tribunal under the Insolvency and Bankruptcy Code, 2016 within one hundred and eighty days from the commencement of the Insolvency and Bankruptcy Code, 2016 in accordance with the provisions of the Insolvency and Bankruptcy Code, 2016:")

                       id                                              chunk  \
5  bankruptcy_section_449  section 4, for sub-clause (b), the following s...   
6  bankruptcy_section_502  section 10E of the Companies Act, 1956 (1 of 1...   
7   bankruptcy_section_74  section 62 or such an \nappeal is not time bar...   
0             section_561  section 197. \n3[(b) where the company— \n(i) ...   
8   bankruptcy_section_35  section \n16. \n(2) The public announcement re...   
1             section_480  section 10E of the \nCompanies Act, 1956 (1 of...   
9   bankruptcy_section_82  section 30, it shall by order approve the reso...   
2              section_20  section 2 of the Company Secretaries Act, 1980...   
3             section_406  section 232 or otherwise, the Tribunal may at ...   
4             section_204  section 3 of the \nChartered Accountants Act, ...   

  section_title                                      chapter_title  page  \
5     section 4                            

  df = table.search(query_vec).limit(self.top_k).to_df()


[{'chunk': 'section 4, for sub-clause (b), the following sub-clause shall be substituted, namely—   \n     \n" (b) On such date as may be notified by the Central Government in \nthis behalf, any appeal preferred to the Appellate Authority or any reference made or \ninquiry pending to or before the Board or any proceeding of whatever nature pending \nbefore the Appellate Authority or the Board under the Sick Industrial Companies \n(Special Provisions) Act,1985 (1 of 1986) shall stand abated:  \n \nProvided that a company in respect of which such appeal or reference or inquiry \nstands abated under this clause may make reference to the National Company Law \nTribunal under the Insolvency and Bankruptcy Code, 2016 within one hundred and \neighty days from the commencement of the Insolvency and Bankruptcy Code, 2016 in \naccordance with the provisions of the Insolvency and Bankruptcy Code, 2016:  \n \nProvided further that no fees shall be payable for making such reference under \nInsolven