**Setup and Imports**

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
# Install necessary packages
!pip install langchain langchain-community tiktoken faiss-cpu huggingface_hub sentence-transformers spacy

# Download spaCy model
!python -m spacy download en_core_web_lg

# Create required directories
import os
os.makedirs("data/cleaned_texts", exist_ok=True)
os.makedirs("chunks", exist_ok=True)
os.makedirs("faiss_index", exist_ok=True)

Collecting langchain-community
  Downloading langchain_community-0.3.21-py3-none-any.whl.metadata (2.4 kB)
Collecting tiktoken
  Downloading tiktoken-0.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Collecting faiss-cpu
  Downloading faiss_cpu-1.10.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (4.4 kB)
Collecting dataclasses-json<0.7,>=0.5.7 (from langchain-community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting pydantic-settings<3.0.0,>=2.4.0 (from langchain-community)
  Downloading pydantic_settings-2.8.1-py3-none-any.whl.metadata (3.5 kB)
Collecting httpx-sse<1.0.0,>=0.4.0 (from langchain-community)
  Downloading httpx_sse-0.4.0-py3-none-any.whl.metadata (9.0 kB)
Collecting marshmallow<4.0.0,>=3.18.0 (from dataclasses-json<0.7,>=0.5.7->langchain-community)
  Downloading marshmallow-3.26.1-py3-none-any.whl.metadata (7.3 kB)
Collecting typing-inspect<1,>=0.4.0 (from dataclasses-json<0.7,>=0.5.7->langchain-commun

In [3]:
import os
os.makedirs("data/cleaned_texts", exist_ok=True)
os.makedirs("chunks", exist_ok=True)
os.makedirs("faiss_index", exist_ok=True)

In [4]:
import os
import shutil
from pathlib import Path

# Create the target directory if it doesn't exist
os.makedirs("data/cleaned_texts", exist_ok=True)

# Source directory in Google Drive
source_dir = "/content/drive/MyDrive/cleaned_texts"

# Check if source directory exists
if not os.path.exists(source_dir):
    print(f"Source directory {source_dir} does not exist. Please check the path.")
else:
    # Get list of files in source directory
    files = os.listdir(source_dir)
    print(f"Found {len(files)} files in {source_dir}")

    # Copy each file to the target directory
    copied_count = 0
    for filename in files:
        source_path = os.path.join(source_dir, filename)
        target_path = os.path.join("data/cleaned_texts", filename)

        # Only copy files, not directories
        if os.path.isfile(source_path):
            shutil.copy2(source_path, target_path)
            copied_count += 1
            print(f"Copied: {filename}")

    print(f"\nSuccessfully copied {copied_count} files to data/cleaned_texts")

    # Verify files in target directory
    target_files = os.listdir("data/cleaned_texts")
    print(f"Target directory now has {len(target_files)} files")

Found 1428 files in /content/drive/MyDrive/cleaned_texts
Copied: Smartshare.txt
Copied: Spendcoin.txt
Copied: SONM.txt
Copied: SHIELD.txt
Copied: STEX.txt
Copied: Robotina.txt
Copied: Sharder.txt
Copied: SalPay.txt
Copied: SakuraCoin.txt
Copied: SpiderVPS.txt
Copied: Switcheo.txt
Copied: Synthetix Network.txt
Copied: SyncFab.txt
Copied: Oasis City.txt
Copied: Patron.txt
Copied: Primalbase.txt
Copied: Poseidon Network.txt
Copied: Privatix.txt
Copied: Power Ledger.txt
Copied: Odyssey.txt
Copied: osadc en.txt
Copied: Polis.txt
Copied: Probit.txt
Copied: OWNDATA.txt
Copied: Pillar.txt
Copied: Pluton.txt
Copied: Oyster.txt
Copied: Pirate Chain.txt
Copied: OTCBTC Token.txt
Copied: PotCoin.txt
Copied: Pesetacoin.txt
Copied: PayPie.txt
Copied: Project Coin.txt
Copied: Pibble.txt
Copied: ORS Group.txt
Copied: Peerplays.txt
Copied: Polymath.txt
Copied: PKG Token.txt
Copied: PlusCoin.txt
Copied: Piction.txt
Copied: OneGame.txt
Copied: Opiria PDATA.txt
Copied: Playgroundz.txt
Copied: Provoco.txt
C

**Finance Utility Class**

In [5]:
class FinanceUtility:
    """Utility functions for financial and cryptocurrency text processing"""

    FINANCE_TERMS = {
        "cryptocurrency": [
            "crypto", "bitcoin", "ethereum", "blockchain", "token", "coin", "altcoin",
            "defi", "mining", "wallet", "exchange", "nft", "smart contract", "node", "hash"
        ],
        "compliance": [
            "kyc", "aml", "know your customer", "anti-money laundering", "cft",
            "regulations", "regtech", "audit", "sanctions", "pep", "fincen", "ofac"
        ],
        "due_diligence": [
            "dd", "edd", "cdd", "risk assessment", "screening", "onboarding", "identity verification"
        ],
        "risk_analysis": [
            "risk", "fraud", "vulnerability", "score", "exposure", "mitigation", "sar", "suspicious activity"
        ],
        "transactions": [
            "payment", "transfer", "transaction", "wire", "p2p", "volume", "liquidity", "custody"
        ],
        "financial_crime": [
            "money laundering", "terrorism financing", "phishing", "darknet", "tumbler", "illicit", "sanction evasion"
        ]
    }

    @classmethod
    def get_domain_specific_terms(cls):
        """Flatten terms across all categories"""
        terms = []
        for group in cls.FINANCE_TERMS.values():
            terms.extend(group)
        return terms

    @classmethod
    def is_relevant_chunk(cls, chunk_text, min_terms=2):
        """Determine if a text chunk contains at least `min_terms` finance-domain words"""
        terms = cls.get_domain_specific_terms()
        return sum(1 for term in terms if term.lower() in chunk_text.lower()) >= min_terms

    @classmethod
    def enhance_query(cls, query):
        """Append additional related terms based on original query"""
        query = query.lower()
        extras = []

        for category, terms in cls.FINANCE_TERMS.items():
            for term in terms:
                if term in query:
                    related = [t for t in terms if t != term][:3]
                    extras.extend(related)

        if extras:
            return f"{query} {' '.join(extras[:5])}"
        return query

**Entity Extraction Module**

In [6]:
import spacy
from spacy.matcher import PhraseMatcher
import re
from typing import List, Dict, Tuple, Set
from collections import defaultdict

class EntityExtractor:
    """Extract named entities and other features from financial texts"""

    # Load NLP model
    nlp = spacy.load("en_core_web_lg")
    nlp.add_pipe("sentencizer")

    # Define entity configurations
    KNOWN_ENTITIES = {
        "crypto_projects": [
            "Bitcoin", "Ethereum", "Solana", "Ripple", "Cardano", "Binance",
            "Uniswap", "Compound", "Aave", "MakerDAO", "Chainlink", "Polygon",
            "Avalanche", "Tezos", "Polkadot", "Algorand", "Stellar", "Cosmos"
        ],
        "crypto_terms": [
            "blockchain", "cryptocurrency", "token", "protocol", "mining", "staking",
            "defi", "dao", "consensus", "wallet", "exchange", "yield", "liquidity"
        ],
        "organizations": [
            "Circle", "Coinbase", "Kraken", "FTX", "Binance", "Chainalysis", "Fireblocks",
            "BlockFi", "BitGo", "Gemini", "ConsenSys", "Tether", "CoinMarketCap"
        ],
        "locations": [
            "United States", "China", "Russia", "European Union", "Singapore", "Switzerland",
            "Malta", "Dubai", "Hong Kong", "Japan", "South Korea", "Cayman Islands"
        ],
        "risk_terms": {
            "regulatory": [
                "compliance violation", "unregistered", "non-compliant", "illegal", "banned",
                "unlicensed", "regulatory", "oversight", "fine", "penalty", "investigation"
            ],
            "technical": [
                "hack", "breach", "exploit", "vulnerability", "backdoor", "exposure",
                "bug", "malware", "phishing", "stolen keys", "51% attack"
            ],
            "financial": [
                "bankruptcy", "insolvency", "liquidity crisis", "bank run", "rugpull",
                "collapse", "fraud", "ponzi", "scam", "unsustainable", "hyperinflation"
            ],
            "operational": [
                "downtime", "outage", "suspended", "locked", "frozen assets",
                "withdrawal issues", "technical difficulties", "service disruption"
            ]
        }
    }

    BLACKLISTS = {
        "person_blacklist": ["administrator", "user", "customer", "client", "founder", "ceo"],
        "person_suffixes": ["corp", "inc", "llc", "ltd", "foundation", "group", "team"],
        "org_blacklist": ["the", "and", "that", "this", "these", "those"],
        "org_suffixes": ["ing", "ed", "ly", "day", "time", "year"],
        "org_prefixes": ["mr", "mrs", "ms", "dr", "prof"],
        "crypto_blacklist": ["the", "blockchain", "a", "an", "crypto", "mining"],
        "crypto_suffixes": ["ing", "ed", "s", "ly"]
    }

    # Patterns
    EMAIL_PATTERN = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
    CRYPTO_ADDRESS_PATTERN = r"\b(0x)?[0-9a-fA-F]{40}\b"
    PHONE_PATTERN = r"\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}"

    def __init__(self):
        """Initialize entity extraction with phrase matchers"""
        self.matchers = self._setup_matchers()

    def _setup_matchers(self) -> Dict[str, PhraseMatcher]:
        """Create phrase matchers for known entities"""
        matchers = {}

        # Crypto project matcher
        crypto_matcher = PhraseMatcher(self.nlp.vocab)
        patterns = [self.nlp(text) for text in self.KNOWN_ENTITIES["crypto_projects"]]
        crypto_matcher.add("CRYPTO_PROJECT", patterns)
        matchers["crypto"] = crypto_matcher

        # Organization matcher
        org_matcher = PhraseMatcher(self.nlp.vocab)
        patterns = [self.nlp(text) for text in self.KNOWN_ENTITIES["organizations"]]
        org_matcher.add("ORG", patterns)
        matchers["org"] = org_matcher

        return matchers

    def is_valid_person(self, text: str) -> bool:
        """Validate person names with strict rules"""
        text_lower = text.lower()

        # Must have at least 2 words, proper capitalization, no numbers
        conditions = [
            len(text.split()) >= 2,
            text.istitle(),
            not any(c.isdigit() for c in text),
            not any(term in text_lower for term in self.BLACKLISTS["person_blacklist"]),
            not any(text_lower.endswith(suffix) for suffix in self.BLACKLISTS["person_suffixes"])
        ]

        return all(conditions)

    def is_valid_org(self, text: str) -> bool:
        """Validate organization names"""
        text_lower = text.lower()

        conditions = [
            3 <= len(text) <= 50,
            text[0].isupper(),
            not any(b in text_lower for b in self.BLACKLISTS["org_blacklist"]),
            not any(text_lower.endswith(suffix) for suffix in self.BLACKLISTS["org_suffixes"]),
            not any(text_lower.startswith(prefix) for prefix in self.BLACKLISTS["org_prefixes"])
        ]

        return all(conditions)

    def is_valid_crypto_project(self, text: str) -> bool:
        """Validate cryptocurrency projects"""
        text_lower = text.lower()

        # Check against known projects first
        if any(proj.lower() == text_lower for proj in self.KNOWN_ENTITIES["crypto_projects"]):
            return True

        # Generic project validation
        conditions = [
            2 <= len(text.split()) <= 4,
            not any(b in text_lower for b in self.BLACKLISTS["crypto_blacklist"]),
            not any(text_lower.endswith(suffix) for suffix in self.BLACKLISTS["crypto_suffixes"])
        ]

        # Must contain at least one known crypto term
        conditions.append(
            any(term in text_lower for term in self.KNOWN_ENTITIES["crypto_terms"])
        )

        return all(conditions)

    def is_valid_location(self, text: str) -> bool:
        """Validate locations against known list"""
        return text.lower() in {loc.lower() for loc in self.KNOWN_ENTITIES["locations"]}

    def normalize_entity(self, entity_type: str, text: str) -> str:
        """Standardize entity formatting"""
        # Apply type-specific normalization
        if entity_type == "person":
            # Standardize name formatting (Title Case)
            return " ".join(word.capitalize() for word in text.split())

        elif entity_type in ["crypto_project", "organization"]:
            # Remove common suffixes and standardize casing
            text = re.sub(r'\b(LLC|Inc|Ltd|Foundation|Labs|DAO|DeFi|Network|Protocol)\b', '', text, flags=re.IGNORECASE)
            return text.strip()

        return text

    def post_process_entities(self, entities: Dict[str, Set[str]]) -> Dict[str, List[str]]:
        """Clean and deduplicate extracted entities"""
        processed = {}

        for entity_type, entity_set in entities.items():
            # Normalize each entity
            normalized = {self.normalize_entity(entity_type, e) for e in entity_set}

            # Remove subsumed entities (shorter versions of longer entities)
            final_entities = set()
            for entity in sorted(normalized, key=len, reverse=True):
                if not any(e != entity and entity.lower() in e.lower() for e in final_entities):
                    if entity:  # Skip empty strings
                        final_entities.add(entity)

            processed[entity_type] = sorted(final_entities)

        return processed

    def extract_named_entities(self, text: str) -> Dict[str, List[str]]:
        """Comprehensive entity extraction pipeline"""
        doc = self.nlp(text)
        entities = defaultdict(set)

        # Stage 1: spaCy NER extraction with strict validation
        for ent in doc.ents:
            clean_text = ' '.join(ent.text.strip().split())

            if ent.label_ == "PERSON" and self.is_valid_person(clean_text):
                entities["person"].add(clean_text)

            elif ent.label_ == "ORG":
                if self.is_valid_crypto_project(clean_text):
                    entities["crypto_project"].add(clean_text)
                elif self.is_valid_org(clean_text):
                    entities["organization"].add(clean_text)

            elif ent.label_ == "GPE" and self.is_valid_location(clean_text):
                entities["location"].add(clean_text)

        # Stage 2: Phrase matching for known entities
        for match_id, start, end in self.matchers["crypto"](doc):
            entities["crypto_project"].add(doc[start:end].text)

        for match_id, start, end in self.matchers["org"](doc):
            entities["organization"].add(doc[start:end].text)

        # Stage 3: Pattern-based extraction
        entities["email"] = set(re.findall(self.EMAIL_PATTERN, text))
        entities["crypto_address"] = set(re.findall(self.CRYPTO_ADDRESS_PATTERN, text))

        # Stage 4: Post-processing
        processed_entities = self.post_process_entities(entities)

        return processed_entities

    def extract_risk_features(self, text: str) -> Tuple[Dict[str, List[str]], float]:
        """Enhanced risk term extraction"""
        risk_categories = {
            "regulatory": self.KNOWN_ENTITIES["risk_terms"]["regulatory"],
            "technical": self.KNOWN_ENTITIES["risk_terms"]["technical"],
            "financial": self.KNOWN_ENTITIES["risk_terms"]["financial"],
            "operational": self.KNOWN_ENTITIES["risk_terms"]["operational"]
        }

        found = {k: set() for k in risk_categories}
        doc = self.nlp(text.lower())

        for category, terms in risk_categories.items():
            for term in terms:
                if term.lower() in doc.text:
                    found[category].add(term)

        # Calculate weighted risk score
        weights = {"regulatory": 1.2, "technical": 1.1, "financial": 1.0, "operational": 0.9}
        score = min(sum(len(v) * 10 * weights[k] for k, v in found.items()), 100)

        return {k: sorted(v) for k, v in found.items()}, round(score, 2)

**Smart Chunker Class**

In [7]:
import os
import glob
import re
import hashlib
import json
from pathlib import Path
from typing import List, Dict, Union, Any
from langchain.text_splitter import RecursiveCharacterTextSplitter

class SmartChunker:
    """Advanced document processing with domain-specific chunking and search capabilities"""

    def __init__(self, chunk_size=1000, chunk_overlap=200, data_dir="faiss_index"):
        """Initialize chunker with optional configs"""
        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", ". ", " ", ""]
        )
        self.data_dir = data_dir
        self.entity_extractor = EntityExtractor()
        self.utility = FinanceUtility()
        self.vectordb = None
        self.embeddings = None

    def preprocess_text(self, text: str) -> str:
        """Cleans up raw text before chunking."""
        text = re.sub(r'\s+', ' ', text)
        text = re.sub(r'\b\d+\s*\|\s*[pP]age\b', '', text)
        text = re.sub(r'\[\d+\]', '', text)
        return text.replace('•', '* ').strip()

    def chunk_text(self, text: str, metadata: Dict[str, str] = None) -> List[Dict[str, Union[str, Dict]]]:
        """Splits and filters text into meaningful domain-specific chunks with entity extraction."""
        chunks = self.splitter.split_text(text)
        relevant_chunks = []

        for i, chunk in enumerate(chunks):
            if self.utility.is_relevant_chunk(chunk):
                # Extract entities and risk features
                extracted_entities = self.entity_extractor.extract_named_entities(chunk)
                risk_features, risk_score = self.entity_extractor.extract_risk_features(chunk)

                # Combine metadata
                chunk_metadata = {
                    "chunk_id": hashlib.md5(chunk.encode()).hexdigest(),
                    "entities": extracted_entities,
                    "risk_features": risk_features,
                    "risk_score": risk_score,
                    **(metadata or {})
                }

                relevant_chunks.append({
                    "text": f"[Section {i+1}] {chunk}",
                    "metadata": chunk_metadata
                })

        print(f"✅ Kept {len(relevant_chunks)} relevant chunks from {len(chunks)} total.")
        return relevant_chunks

    def save_chunks_to_json(self, chunks, output_path, drive_output_folder="/content/drive/MyDrive/chunks"):
        """Saves chunks to both local and drive directories."""
        # Ensure the local directory exists
        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
        # Save to local path
        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(chunks, f, indent=2, ensure_ascii=False)
        print(f"📦 Saved chunks to {output_path}")

        # Ensure the drive directory exists
        drive_folder = Path(drive_output_folder)
        drive_folder.mkdir(parents=True, exist_ok=True)  # Make sure the folder exists

        # Save to Google Drive path
        drive_path = os.path.join(drive_output_folder, os.path.basename(output_path))
        with open(drive_path, "w", encoding="utf-8") as f:
            json.dump(chunks, f, indent=2, ensure_ascii=False)
        print(f"📦 Saved chunks to {drive_path}")

    def load_chunks_from_json(self, input_path):
        """Loads previously saved chunks."""
        with open(input_path, "r", encoding="utf-8") as f:
            return json.load(f)

    def load_cleaned_texts(self, folder="data/cleaned_texts") -> Dict[str, str]:
        """Loads all .txt files from a folder."""
        texts = {}
        for file_path in glob.glob(os.path.join(folder, "*.txt")):
            try:
                with open(file_path, "r", encoding="utf-8") as f:
                    texts[os.path.basename(file_path)] = f.read()
            except UnicodeDecodeError:
                # Try with different encoding if UTF-8 fails
                with open(file_path, "r", encoding="latin-1") as f:
                    texts[os.path.basename(file_path)] = f.read()
        return texts

    def process_all_texts(self, input_folder="data/cleaned_texts", output_folder="chunks"):
        """Process all texts in a folder and save chunks"""
        print(f"Looking for text files in: {input_folder}")
        texts = self.load_cleaned_texts(input_folder)

        if not texts:
            print(f"⚠️ No text files found in {input_folder}. Please check the path and file extensions.")
            return 0

        print(f"Found {len(texts)} text files to process")
        Path(output_folder).mkdir(parents=True, exist_ok=True)

        for filename, content in texts.items():
            print(f"\n📄 Processing {filename} (content length: {len(content)})")
            cleaned = self.preprocess_text(content)
            chunks = self.chunk_text(cleaned, metadata={"source": filename})
            self.save_chunks_to_json(chunks, f"{output_folder}/{filename.replace('.txt', '_chunks.json')}")

        print(f"\n✅ Completed processing {len(texts)} files")
        return len(texts)


**Vector Store and Embeddings**

In [8]:
from langchain.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.schema import Document

def load_chunks_from_folder(folder: str) -> List[Document]:
    """Load all chunks from JSON files in a folder and convert to LangChain Documents"""
    documents = []
    folder_path = Path(folder)

    for json_file in folder_path.glob("*_chunks.json"):
        with open(json_file, "r", encoding="utf-8") as f:
            chunks = json.load(f)
            for chunk in chunks:
                documents.append(
                    Document(
                        page_content=chunk["text"],
                        metadata=chunk["metadata"]
                    )
                )
        print(f"✅ Loaded {len(chunks)} chunks from {json_file.name}")
    return documents

# Add vector store methods to SmartChunker class
def add_vector_store_methods(SmartChunker):
    def _init_embeddings(self):
        """Initialize embeddings model if not already done"""
        if self.embeddings is None:
            print("🔍 Initializing embeddings model...")
            self.embeddings = HuggingFaceEmbeddings(
                model_name="sentence-transformers/all-mpnet-base-v2",
                model_kwargs={'device': 'cpu'}  # Use GPU if available by changing to 'cuda'
            )

    def create_vectorstore(self, chunks_folder="chunks", save_path=None):
        """Create FAISS vectorstore from processed chunks"""
        save_path = save_path or self.data_dir
        documents = load_chunks_from_folder(chunks_folder)

        if not documents:
            raise ValueError(f"No documents found in {chunks_folder}. Please process texts first.")

        # Initialize embeddings
        self._init_embeddings()

        print(f"🔍 Creating vectorstore with {len(documents)} documents...")
        self.vectordb = FAISS.from_documents(documents, self.embeddings)

        # Save FAISS index
        os.makedirs(save_path, exist_ok=True)
        self.vectordb.save_local(save_path)
        print(f"✅ FAISS index saved to '{save_path}/'")

        # Save metadata as JSON
        metadata_path = os.path.join(save_path, "index_metadata.json")
        with open(metadata_path, "w", encoding="utf-8") as f:
            json.dump([doc.metadata for doc in documents], f, indent=2, ensure_ascii=False)
        print(f"✅ Metadata saved to '{metadata_path}' (JSON)")

        return len(documents)

    def load_index(self, path=None):
        """Load existing FAISS index"""
        path = path or self.data_dir

        if not os.path.exists(path):
            raise FileNotFoundError(f"Index directory '{path}' not found.")

        # Initialize embeddings
        self._init_embeddings()

        print(f"📂 Loading FAISS index from {path}...")
        self.vectordb = FAISS.load_local(path, self.embeddings)

        # Load metadata
        metadata_path = os.path.join(path, "index_metadata.json")
        if os.path.exists(metadata_path):
            with open(metadata_path, "r", encoding="utf-8") as f:
                metadata = json.load(f)
                print(f"✅ Loaded metadata for {len(metadata)} documents")
        else:
            print("⚠️ No metadata file found.")

        return self.vectordb is not None

    def search(self, query: str, k: int = 3, enhance_query: bool = True) -> List[Dict]:
        """Search the vectorstore with an enhanced query"""
        if not self.vectordb:
            raise ValueError("No vectorstore loaded. Call load_index() first.")

        # Enhance query with domain terms if enabled
        if enhance_query:
            enhanced = self.utility.enhance_query(query)
            print(f"Enhanced query: '{enhanced}'")
            query = enhanced

        # Perform search
        results = self.vectordb.similarity_search_with_score(query, k=k)

        # Format results
        formatted_results = []
        for doc, score in results:
            result = {
                "text": doc.page_content,
                "score": 1.0 - score,  # Convert distance to similarity score
                "file_name": doc.metadata.get("source", "Unknown"),
                "entities": doc.metadata.get("entities", {}),
                "risk_score": doc.metadata.get("risk_score", 0)
            }
            formatted_results.append(result)

        return formatted_results

    # Add methods to class
    SmartChunker._init_embeddings = _init_embeddings
    SmartChunker.create_vectorstore = create_vectorstore
    SmartChunker.load_index = load_index
    SmartChunker.search = search

    return SmartChunker

# Extend SmartChunker with vector store methods
SmartChunker = add_vector_store_methods(SmartChunker)

**Processing Pipeline**

**Cell 1: Text Processing and Chunking**

In [None]:
def process_texts_to_chunks(input_folder="data/cleaned_texts",
                           chunks_folder="chunks"):
    """Process text files into chunks and save to JSON"""
    print("🚀 Starting document chunking pipeline...")

    # Create directories
    for folder in [input_folder, chunks_folder]:
        os.makedirs(folder, exist_ok=True)

    # Initialize chunker
    chunker = SmartChunker()

    # Process texts
    print(f"\n📑 Processing texts from {input_folder}...")
    num_processed = chunker.process_all_texts(input_folder, chunks_folder)
    print(f"✅ Processed {num_processed} document(s) into chunks")

    return chunker

# Test with a small text file
with open("data/cleaned_texts/sample.txt", "w") as f:
    f.write("""
    Cryptocurrency compliance has become a major concern for blockchain projects. Bitcoin and Ethereum transactions require proper KYC and AML procedures.
    Financial institutions partnering with crypto exchanges need to implement risk assessment protocols to prevent money laundering.
    Blockchain technology enables transparency but also presents unique regulatory challenges.
    Many DeFi protocols operate in regulatory gray areas which increases their risk profile.
    """)

# Run the chunking pipeline
chunker = process_texts_to_chunks()

This cell imports tge chunks from the directory chunks save in the drive ("use thsi if you did not run the cell Text Processing and Chunking")

In [9]:
import os
import shutil
from pathlib import Path

def sync_chunks_from_drive():
    """
    Copy all files from the 'chunks' folder in Google Drive to the local 'chunks' folder.
    Creates required directories if they don't exist.
    """
    # Define source and target directories
    source_dir = "/content/drive/MyDrive/chunks"
    target_dir = "chunks"

    # Create the target directory if it doesn't exist
    os.makedirs(target_dir, exist_ok=True)

    # Check if source directory exists
    if not os.path.exists(source_dir):
        print(f"Source directory {source_dir} does not exist. Please check the path.")
        return False

    # Get list of files in source directory
    try:
        files = os.listdir(source_dir)
        print(f"Found {len(files)} files in {source_dir}")
    except Exception as e:
        print(f"Error accessing source directory: {e}")
        return False

    # Copy each file to the target directory
    copied_count = 0
    for filename in files:
        source_path = os.path.join(source_dir, filename)
        target_path = os.path.join(target_dir, filename)

        # Only copy files, not directories
        if os.path.isfile(source_path):
            try:
                shutil.copy2(source_path, target_path)
                copied_count += 1
                print(f"Copied: {filename}")
            except Exception as e:
                print(f"Error copying {filename}: {e}")

    print(f"\nSuccessfully copied {copied_count} files to {target_dir}")

    # Verify files in target directory
    target_files = os.listdir(target_dir)
    print(f"Target directory now has {len(target_files)} files")

    return True

# Run the synchronization
sync_result = sync_chunks_from_drive()
print(f"Sync completed: {'✅ Success' if sync_result else '❌ Failed'}")

Found 1428 files in /content/drive/MyDrive/chunks
Copied: Ether Zero_chunks.json
Copied: CoinMetro_chunks.json
Copied: DeusCoin_chunks.json
Copied: Curriculum Vitae_chunks.json
Copied: ArtByte_chunks.json
Copied: aave-v2-whitepaper_extracted_chunks.json
Copied: BOScoin_chunks.json
Copied: MinexCoin_chunks.json
Copied: VNT Chain_chunks.json
Copied: Beetle Coin_chunks.json
Copied: Safein_chunks.json
Copied: DCTO_chunks.json
Copied: ZeusCrowdfunding_chunks.json
Copied: Opennity_chunks.json
Copied: ValueChain_chunks.json
Copied: Restart Energy MWAT_chunks.json
Copied: Ultra_chunks.json
Copied: FootballCoin_chunks.json
Copied: FTX Token_chunks.json
Copied: REDFOXLABS_chunks.json
Copied: Bitblocks_chunks.json
Copied: Bitmax Token_chunks.json
Copied: Ripple_chunks.json
Copied: HelloGold_chunks.json
Copied: HireMatch_chunks.json
Copied: patientory_chunks.json
Copied: Blockport_chunks.json
Copied: INS Ecosystem_chunks.json
Copied: Dero_chunks.json
Copied: Stratis_chunks.json
Copied: BlockMesh_c

**Cell 2: Vectorization and Embeddings**

In [11]:
def create_vector_index_batched(chunks_folder="chunks",
                               index_folder="faiss_index",
                               batch_size=5000):
    """Create vector embeddings and index from chunks using batching"""
    print("\n🔍 Starting vectorization pipeline with batching...")

    # Create index directory
    os.makedirs(index_folder, exist_ok=True)

    # Initialize chunker with data directory
    chunker = SmartChunker(data_dir=index_folder)
    chunker._init_embeddings()

    # Load all documents
    documents = load_chunks_from_folder(chunks_folder)
    total_docs = len(documents)
    print(f"Loaded {total_docs} documents for vectorization")

    # Create empty FAISS index first time
    vectordb = None
    processed = 0

    # Process in batches
    for i in range(0, total_docs, batch_size):
        batch = documents[i:i+batch_size]
        print(f"Processing batch {i//batch_size + 1}/{(total_docs-1)//batch_size + 1} with {len(batch)} documents")

        # Create new index or add to existing
        if vectordb is None:
            vectordb = FAISS.from_documents(batch, chunker.embeddings)
        else:
            batch_db = FAISS.from_documents(batch, chunker.embeddings)
            vectordb.merge_from(batch_db)

        processed += len(batch)
        print(f"Progress: {processed}/{total_docs} documents ({processed/total_docs*100:.1f}%)")

        # Save checkpoint after each batch
        checkpoint_path = f"{index_folder}_checkpoint_{processed}"
        os.makedirs(checkpoint_path, exist_ok=True)
        vectordb.save_local(checkpoint_path)
        print(f"Saved checkpoint at {processed}/{total_docs} documents")

    # Save final index
    chunker.vectordb = vectordb
    chunker.vectordb.save_local(index_folder)
    print(f"✅ Indexed {total_docs} chunks into vector database")

    # Save metadata
    metadata_path = os.path.join(index_folder, "index_metadata.json")
    with open(metadata_path, "w", encoding="utf-8") as f:
        json.dump([doc.metadata for doc in documents], f, indent=2, ensure_ascii=False)
    print(f"✅ Metadata saved to '{metadata_path}' (JSON)")

    return chunker

# TPU Optimization for embeddings
def optimize_for_tpu(SmartChunker):
    """Replace the _init_embeddings method with TPU-optimized version"""

    original_init_embeddings = SmartChunker._init_embeddings

    def _init_embeddings_tpu(self):
        """Initialize embeddings model optimized for TPU if available"""
        if self.embeddings is None:
            print("🔍 Initializing embeddings model with hardware acceleration...")

            # Check if TPU is available
            device = 'cpu'
            try:
                import torch_xla
                import torch_xla.core.xla_model as xm
                device = xm.xla_device()
                print("✅ Using XLA TPU device")
            except ImportError:
                try:
                    import torch
                    if torch.cuda.is_available():
                        device = 'cuda'
                        print("✅ Using CUDA GPU device")
                except:
                    print("⚠️ No GPU/TPU detected, using CPU")

            self.embeddings = HuggingFaceEmbeddings(
                model_name="sentence-transformers/all-mpnet-base-v2",
                model_kwargs={'device': device}
            )

    # Replace the method
    SmartChunker._init_embeddings = _init_embeddings_tpu
    return SmartChunker

# Apply TPU optimization to the SmartChunker class
SmartChunker = optimize_for_tpu(SmartChunker)

# Function to run the full pipeline
def run_vector_indexing():
    # Run the batched vectorization pipeline
    chunker = create_vector_index_batched(
        chunks_folder="chunks",
        index_folder="faiss_index",
        batch_size=2500  # Adjust based on your memory constraints
    )

    # Test a search query to verify functionality
    print("\n🔍 Testing search functionality...")
    test_search(chunker, "What are KYC requirements for crypto exchanges?", k=3)

    return chunker

# Execute the pipeline
vector_db = run_vector_indexing()


🔍 Starting vectorization pipeline with batching...
🔍 Initializing embeddings model with hardware acceleration...
✅ Using CUDA GPU device
✅ Loaded 9 chunks from MicroBitcoin_chunks.json
✅ Loaded 51 chunks from Netrum_chunks.json
✅ Loaded 39 chunks from IZIChain_chunks.json
✅ Loaded 36 chunks from BitDegree_chunks.json
✅ Loaded 20 chunks from Galactrum_chunks.json
✅ Loaded 20 chunks from gratzio wp_chunks.json
✅ Loaded 15 chunks from suqa_chunks.json
✅ Loaded 16 chunks from Herbalist Token_chunks.json
✅ Loaded 34 chunks from Monaco_chunks.json
✅ Loaded 28 chunks from IoT Chain_chunks.json
✅ Loaded 29 chunks from Bread_chunks.json
✅ Loaded 55 chunks from Gatcoin_chunks.json
✅ Loaded 20 chunks from Electrify.Asia_chunks.json
✅ Loaded 27 chunks from Experty_chunks.json
✅ Loaded 50 chunks from Ladder Network_chunks.json
✅ Loaded 22 chunks from Galaxy eSolutions_chunks.json
✅ Loaded 44 chunks from Sessia_chunks.json
✅ Loaded 8 chunks from Titan Coin_chunks.json
✅ Loaded 49 chunks from TimeBo

NameError: name 'test_search' is not defined

In [13]:
def load_index(self, path=None):
    """Load existing FAISS index"""
    path = path or self.data_dir

    if not os.path.exists(path):
        raise FileNotFoundError(f"Index directory '{path}' not found.")

    # Initialize embeddings
    self._init_embeddings()

    print(f"📂 Loading FAISS index from {path}...")
    # Add the allow_dangerous_deserialization parameter
    self.vectordb = FAISS.load_local(path, self.embeddings, allow_dangerous_deserialization=True)

    # Load metadata
    metadata_path = os.path.join(path, "index_metadata.json")
    if os.path.exists(metadata_path):
        with open(metadata_path, "r", encoding="utf-8") as f:
            metadata = json.load(f)
            print(f"✅ Loaded metadata for {len(metadata)} documents")
    else:
        print("⚠️ No metadata file found.")

    return self.vectordb is not None

In [16]:
pip install -U langchain-community



**Testing the Vector Store**

In [17]:
from langchain_community.vectorstores import FAISS

In [22]:
import os
import json
import faiss
import numpy as np
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from typing import List, Dict, Any, Optional

class SmartChunker:
    def __init__(self, data_dir="faiss_index", embedding_model=None):
        """
        Initialize the SmartChunker with the specified data directory and embedding model.
        If embedding_model is None, it will be determined from index inspection.
        """
        self.data_dir = data_dir
        self.embedding_model_name = embedding_model  # Can be None initially
        self.embeddings = None
        self.vectordb = None
        self.dimension = None

    def _inspect_index_dimension(self):
        """Inspect the FAISS index file to determine its dimension"""
        index_path = os.path.join(self.data_dir, "index.faiss")
        if not os.path.exists(index_path):
            raise FileNotFoundError(f"FAISS index file not found at {index_path}")

        # Read the index to get dimension
        index = faiss.read_index(index_path)
        dimension = index.d
        print(f"📏 FAISS index dimension: {dimension}")
        return dimension

    def _select_appropriate_model(self, dimension):
        """Select embedding model based on dimension"""
        dimension_to_model = {
            384: "sentence-transformers/all-MiniLM-L6-v2",
            768: "sentence-transformers/all-mpnet-base-v2",
            1024: "sentence-transformers/paraphrase-MiniLM-L12-v2",
            512: "sentence-transformers/paraphrase-multilingual-mpnet-base-v2"
        }

        if dimension in dimension_to_model:
            model = dimension_to_model[dimension]
            print(f"✅ Auto-selected model '{model}' based on dimension {dimension}")
            return model
        else:
            print(f"⚠️ Unusual dimension {dimension}. Using default model, but this may fail.")
            return "sentence-transformers/all-MiniLM-L6-v2"  # Default fallback

    def _init_embeddings(self):
        """Initialize the embeddings model with hardware acceleration if available."""
        # If we don't have an embedding model specified, determine it from the index dimension
        if self.embedding_model_name is None:
            try:
                dimension = self._inspect_index_dimension()
                self.embedding_model_name = self._select_appropriate_model(dimension)
            except Exception as e:
                print(f"⚠️ Error inspecting index: {e}")
                print("🔄 Using default embedding model: sentence-transformers/all-MiniLM-L6-v2")
                self.embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2"

        print(f"🔍 Initializing embeddings model: {self.embedding_model_name}")

        # Check for CUDA availability
        device = "cuda" if faiss.get_num_gpus() > 0 else "cpu"

        if device == "cuda":
            print("✅ Using CUDA GPU device")
        else:
            print("ℹ️ Using CPU (no GPU acceleration available)")

        # Initialize the embeddings model
        self.embeddings = HuggingFaceEmbeddings(
            model_name=self.embedding_model_name,
            model_kwargs={"device": device}
        )

        # Test the embeddings to get the dimension
        test_embedding = self.embeddings.embed_query("test")
        self.dimension = len(test_embedding)
        print(f"📏 Embedding dimension: {self.dimension}")

        return self.embeddings

    def load_index(self, path=None):
        """Load existing FAISS index"""
        path = path or self.data_dir

        if not os.path.exists(path):
            raise FileNotFoundError(f"Index directory '{path}' not found.")

        # First, inspect the index to get dimension if no model specified
        if self.embedding_model_name is None:
            try:
                dimension = self._inspect_index_dimension()
                self.embedding_model_name = self._select_appropriate_model(dimension)
            except Exception as e:
                print(f"⚠️ Error inspecting index: {e}")

        # Initialize embeddings with the appropriate model
        self._init_embeddings()

        # Get index dimension directly as a double-check
        try:
            index_dimension = self._inspect_index_dimension()
            if index_dimension != self.dimension:
                print(f"⚠️ Dimension mismatch: Embeddings ({self.dimension}) vs Index ({index_dimension})")
                print("🔄 Reinitializing with corrected embedding model...")

                # Re-select model based on actual index dimension
                self.embedding_model_name = self._select_appropriate_model(index_dimension)
                self._init_embeddings()

                # Verify dimensions match now
                if self.dimension != index_dimension:
                    print(f"❌ Still have dimension mismatch after correction attempt. Search may fail.")
        except Exception as e:
            print(f"⚠️ Error during dimension check: {e}")

        print(f"📂 Loading FAISS index from {path}...")

        try:
            # Using allow_dangerous_deserialization parameter
            self.vectordb = FAISS.load_local(path, self.embeddings, allow_dangerous_deserialization=True)
            print("✅ Successfully loaded FAISS index")
        except Exception as e:
            print(f"❌ Error loading index: {e}")
            return False

        # Load metadata
        metadata_path = os.path.join(path, "index_metadata.json")
        if os.path.exists(metadata_path):
            with open(metadata_path, "r", encoding="utf-8") as f:
                metadata = json.load(f)
                print(f"✅ Loaded metadata for {len(metadata)} documents")
        else:
            print("⚠️ No metadata file found.")

        return self.vectordb is not None

    def search(self, query: str, k: int = 3) -> List[Dict[str, Any]]:
        """
        Search the vector database for similar documents.

        Args:
            query: The search query
            k: Number of results to return

        Returns:
            List of document dictionaries with text and metadata
        """
        if self.vectordb is None:
            if not self.load_index():
                raise ValueError("Vector database is not initialized. Please load or create an index first.")

        try:
            # Perform similarity search
            raw_results = self.vectordb.similarity_search_with_score(query, k=k)

            # Format the results
            results = []
            for doc, score in raw_results:
                # Normalize score (FAISS returns distance, lower is better)
                # Convert to similarity score (higher is better)
                similarity = 1.0 / (1.0 + score)

                result = {
                    "text": doc.page_content,
                    "score": similarity,
                    "file_name": doc.metadata.get("source", "Unknown"),
                }

                # Add other metadata if available
                if "entities" in doc.metadata:
                    result["entities"] = doc.metadata["entities"]

                if "risk_score" in doc.metadata:
                    result["risk_score"] = doc.metadata["risk_score"]

                results.append(result)

            return results
        except Exception as e:
            print(f"⚠️ Search error: {e}")
            print("🔄 The embedding model used to search is different from the one used to create the index.")
            print("   You need to use the same embedding model that was used to create the index.")
            print("   Current model dimension:", self.dimension)
            try:
                index_dimension = self._inspect_index_dimension()
                print("   Index dimension:", index_dimension)
            except:
                pass
            return []


def test_search_functionality():
    """
    Load the completed FAISS index and test search functionality,
    automatically selecting the correct embedding model based on index dimensions.
    """
    print("🔍 Loading FAISS index from faiss_index directory...")

    # Initialize chunker with automatic model detection
    chunker = SmartChunker(data_dir="faiss_index", embedding_model=None)

    # Load the existing index
    if chunker.load_index():
        print("✅ Successfully loaded FAISS index")

        # Run the test queries
        test_queries = [
            "What is cryptocurrency?",
            "How do cryptocurrency wallets work?",
            "What are the regulatory requirements for crypto?",
            "Explain blockchain technology",
            "What are the risks of cryptocurrency investment?",
            "Identify potential fraud in crypto transactions",
            "What are KYC requirements for exchanges?"
        ]

        # Run test searches
        for query in test_queries:
            test_search(chunker, query, k=3)

        return chunker
    else:
        print("❌ Failed to load FAISS index - please check the path and embedding model")
        return None

def test_search(chunker: SmartChunker, query: str, k: int = 3) -> None:
    """
    Test search functionality and print results.

    Args:
        chunker: Initialized SmartChunker instance
        query: Search query
        k: Number of results to return
    """
    print(f"\nSearching for: {query}")
    print("=" * 50)

    results = chunker.search(query, k=k)

    if not results:
        print("\n❌ No results found or search error occurred.")
        return

    print(f"\nFound {len(results)} results:")
    for i, result in enumerate(results, 1):
        print(f"\n{i}. File: {result['file_name']}")
        print(f"   Score: {result['score']:.4f}")
        print(f"   Risk Score: {result.get('risk_score', 'N/A')}")

        # Print entities if available
        if result.get('entities'):
            print("   Entities:")
            for etype, elist in result['entities'].items():
                if elist:
                    print(f"     - {etype.capitalize()}: {', '.join(elist[:3])}")

        # Print text preview
        print(f"   Text: {result['text'][:200]}...")
        print("-" * 50)

# Execute the test searches
if __name__ == "__main__":
    chunker = test_search_functionality()

    # Optional: Try some additional custom queries
    if chunker:
        print("\n--- Custom Queries ---\n")

        custom_queries = [
            "How does blockchain prevent fraud?",
            "What are the key risks in DeFi platforms?",
            "Explain money laundering concerns in cryptocurrency",
            "How do stablecoins maintain their value?"
        ]

        for query in custom_queries:
            test_search(chunker, query, k=3)

🔍 Loading FAISS index from faiss_index directory...
📏 FAISS index dimension: 768
✅ Auto-selected model 'sentence-transformers/all-mpnet-base-v2' based on dimension 768
🔍 Initializing embeddings model: sentence-transformers/all-mpnet-base-v2
ℹ️ Using CPU (no GPU acceleration available)
📏 Embedding dimension: 768
📏 FAISS index dimension: 768
📂 Loading FAISS index from faiss_index...
✅ Successfully loaded FAISS index
✅ Loaded metadata for 53034 documents
✅ Successfully loaded FAISS index

Searching for: What is cryptocurrency?

Found 3 results:

1. File: Experience points.txt
   Score: 0.5702
   Risk Score: 10.0
   Entities:
     - Location: united states
   Text: [Section 8] online brief hypercondensed explanation exactly cryptocurrency generated follows cryptocurrency bitcoin developed person persons known satoshi nakamoto cryptocurrency stroke revolutionized...
--------------------------------------------------

2. File: Alibabacoin.txt
   Score: 0.5563
   Risk Score: 0.0
   Entities:
