In [None]:
pip install numpy opencv-python Pillow matplotlib python-dotenv face-recognition ipykernel transformers scikit-learn torch  torchvision  torchaudio sentence-transformers textblob python-docx pymupdf pytesseract

In [1]:
"""
Universal Search Engine for PC - Functional Programming Version.

This module provides functionality to search for images, faces, text in images, and files
on your local computer. It uses various techniques like object detection, face recognition,
OCR, and text embedding to power the search capabilities.

The code has been refactored to use functional programming principles:
- Pure functions where possible
- Immutability of data
- Higher-order functions
- Function composition
- Parallel processing for I/O operations
"""

import os
import pickle
import cv2
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import face_recognition
import torch
from transformers import BertTokenizer, BertModel
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from pathlib import Path
import fitz
import docx
import concurrent.futures
import multiprocessing as mp
from functools import partial
from textblob import TextBlob
from tqdm import tqdm
import dotenv
from typing import Any, Dict, List, Optional, Set, Tuple
import re
import pytesseract
import time
dotenv.load_dotenv()

True

In [2]:


# Cache paths
CACHE_DIR = 'C:\\Users\\Rishu\\Downloads'

# Cache file names
CACHE_FILES = {
    'text': 'text_features_cache.pkl',
    'image': 'image_features_cache.pkl',
    'face': 'face_features_cache.pkl',
    'image_text': 'image_text_cache.pkl'
}

# Exclusion lists
DEFAULT_EXCLUDE_KEYWORDS = [
    "venv", "env", "node_modules", "__pycache__",
    "dist-info", "macosx", "_vendor", "thirdpartynotices"
]

DEFAULT_EXCLUDE_FILENAMES = [
    "lgpl.txt", "vendor.txt", "thirdpartysoftwarenotice.txt", "entry_points.txt"
]

IMAGE_EXTENSIONS = [".png", ".jpg", ".jpeg"]
YOLO_CONFIG = {
    'weights': os.environ.get('YOLO_WEIGHTS', 'yolov4.weights'),
    'config': os.environ.get('YOLO_CONFIG', 'yolov4.cfg'),
    'classes': os.environ.get('YOLO_CLASSES', 'coco.names')
}
FACE_MATCH_TOLERANCE = float(os.environ.get('FACE_MATCH_TOLERANCE', '0.7'))
OBJECT_CONFIDENCE_THRESHOLD = float(os.environ.get('OBJECT_CONFIDENCE_THRESHOLD', '0.8'))

# Supported file extensions
FILE_EXTENSIONS = [".pdf", ".txt", ".docx"]

# OCR Configuration
TESSERACT_CONFIG = os.environ.get('TESSERACT_CONFIG', '')
OCR_DPI = int(os.environ.get('OCR_DPI', '300'))

# Models configuration
DEFAULT_MODEL = "all-MiniLM-L6-v2"
MODEL_ALIASES = {
    "bge": "BAAI/bge-m3",
    "jina": "jinaai/jina-embeddings-v2-base-en",
    "default": DEFAULT_MODEL
}

# Processing configuration
DEFAULT_TIMEOUT = 600  # seconds
DEFAULT_BATCH_SIZE = 100

# Initialize configuration
os.makedirs(CACHE_DIR, exist_ok=True)

In [None]:
# ================ Face Recognition Functions ================

def extract_face_features(image_path):
    """
    Extract face features from an image using face_recognition library.
    
    """
    try:
        image = face_recognition.load_image_file(image_path)
        face_encodings = face_recognition.face_encodings(image)
        return face_encodings
    except Exception as e:
        print(f"Error extracting face features for {image_path}: {e}")
        return []

In [6]:
# ================ Image Recognition Functions ================

def load_yolo_model(model_weights='yolov4.weights', config_file='yolov4.cfg', classes_file='coco.names'):
    """
    Load YOLO model and class names.
    
    Args:
        model_weights (str): Path to YOLOv4 weights file
        config_file (str): Path to YOLOv4 configuration file
        classes_file (str): Path to file containing class names
        
    Returns:
        tuple: (net, output_layers, classes)
    """
    net = cv2.dnn.readNet(model_weights, config_file)
    layer_names = net.getLayerNames()
    unconnected_out_layers = net.getUnconnectedOutLayers().flatten()
    output_layers = [layer_names[i - 1] for i in unconnected_out_layers]
    
    with open(classes_file, "r") as f:
        classes = [line.strip() for line in f.readlines()]
    
    return net, output_layers, classes

def load_and_preprocess_image(image_path):
    """
    Load and preprocess an image for YOLO detection.
    
    Args:
        image_path (str): Path to the image file
        
    Returns:
        tuple: (blob, img, width, height)
    """
    img = cv2.imread(image_path)
    if img is None:
        return None, None, 0, 0
        
    height, width, _ = img.shape
    blob = cv2.dnn.blobFromImage(img, 0.00392, (416, 416), (0, 0, 0), True, crop=False)
    return blob, img, width, height

def extract_objects(net, output_layers, blob, confidence_threshold, image_width, image_height):
    """
    Extract objects from an image using YOLO model.
    
    Args:
        net: YOLO neural network
        output_layers: Output layers of the network
        blob: Preprocessed image blob
        confidence_threshold: Minimum confidence for object detection
        image_width: Width of the original image
        image_height: Height of the original image
        
    Returns:
        tuple: (bounding_boxes, class_ids, confidences)
    """
    net.setInput(blob)
    outputs = net.forward(output_layers)
    
    bounding_boxes = []
    class_ids = []
    confidences = []
    
    for output in outputs:
        for detection in output:
            scores = detection[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]
            
            if confidence > confidence_threshold:
                x_center = int(detection[0] * image_width)
                y_center = int(detection[1] * image_height)
                width = int(detection[2] * image_width)
                height = int(detection[3] * image_height)
                
                x = int(x_center - width / 2)
                y = int(y_center - height / 2)
                
                bounding_boxes.append([x, y, width, height])
                class_ids.append(class_id)
                confidences.append(confidence)
                
    return bounding_boxes, class_ids, confidences

def detect_objects(img_path, net, output_layers, confidence_threshold):
    """
    Detect objects in an image using YOLO.
    
    Args:
        img_path (str): Path to the image file
        net: YOLO neural network
        output_layers: Output layers of the network
        confidence_threshold: Minimum confidence for object detection
        
    Returns:
        list or None: List of detected object features or None if no objects detected
    """
    blob, img, width, height = load_and_preprocess_image(img_path)
    
    if img is None:
        return None
        
    bounding_boxes, class_ids, confidences = extract_objects(
        net, output_layers, blob, confidence_threshold, width, height)
    
    if bounding_boxes and class_ids and confidences:
        features = []
        for bbox, cls_id, conf in zip(bounding_boxes, class_ids, confidences):
            features.append({
                'bounding_box': bbox,
                'class_id': cls_id,
                'confidence': conf
            })
        return features
    return None


In [None]:
# ================ Image Search Functions ================

def load_or_extract_objects(folder_path, net, output_layers, confidence_threshold):
    """
    Load cached object features if they exist, otherwise extract and cache them.
    
    """
    image_features = load_cache(IMAGE_CACHE)
    cached_images = set(image_features.keys())
    
    # Find all images in the folder
    all_images = [img for img in os.listdir(folder_path) if img.lower().endswith(('.png', '.jpg', '.jpeg'))]
    print(f"Found {len(all_images)} images in the folder.")
    
    # Extract features for new images
    new_images = [img for img in all_images if img not in cached_images]
    
    # Use ThreadPoolExecutor for I/O bound operations
    if new_images:
        updated_features = {}
        print(f"Processing {len(new_images)} new images...")
        
        with concurrent.futures.ThreadPoolExecutor() as executor:
            # Create a partial function with fixed parameters
            detect_func = partial(detect_objects, net=net, output_layers=output_layers, 
                                 confidence_threshold=confidence_threshold)
            
            # Process images in parallel and track progress
            futures = {executor.submit(detect_func, os.path.join(folder_path, img)): img for img in new_images}
            
            for future in tqdm(concurrent.futures.as_completed(futures), total=len(new_images), desc="Processing images"):
                img = futures[future]
                try:
                    features = future.result()
                    if features:
                        updated_features[img] = features
                except Exception as e:
                    print(f"Error processing {img}: {e}")
        
        # Update and save cache
        image_features.update(updated_features)
        save_cache(image_features, IMAGE_CACHE)
    
    return image_features

def search_images_by_query(query, folder_image_features, classes):
    """
    Search for images containing a specific object.
    
    """
    # Convert query to lowercase
    query = query.lower()
    
    # Try to find the class ID for the query
    try:
        target_id = classes.index(query)
    except ValueError:
        print(f"Query '{query}' not found in available classes. Try another term.\n")
        return []
    
    # Find images that contain the queried object class
    matching_images = []
    for filename, features in folder_image_features.items():
        if features:  # Check if features exist
            for feature in features:
                if feature['class_id'] == target_id:
                    matching_images.append(filename)
                    break  # No need to check other features of this image
    
    return matching_images

In [None]:
# ================ Face Search Functions ================

def load_or_extract_face_features(folder_path):
    """
    Load cached face features if they exist, otherwise extract and cache them.
    
    """
    face_features = load_cache(FACE_CACHE)
    cached_images = set(face_features.keys())
    
    # Find all images in the folder
    all_images = [img for img in os.listdir(folder_path) if img.lower().endswith(('.png', '.jpg', '.jpeg'))]
    print(f"Found {len(all_images)} images in the folder.")
    
    # Extract features for new images
    new_images = [img for img in all_images if img not in cached_images]
    
    # Use ThreadPoolExecutor for I/O bound operations
    if new_images:
        updated_features = {}
        print(f"Processing {len(new_images)} new images for faces...")
        
        with concurrent.futures.ThreadPoolExecutor() as executor:
            # Process images in parallel and track progress
            futures = {executor.submit(extract_face_features, os.path.join(folder_path, img)): img for img in new_images}
            
            for future in tqdm(concurrent.futures.as_completed(futures), total=len(new_images), desc="Processing faces"):
                img = futures[future]
                try:
                    encodings = future.result()
                    updated_features[img] = encodings
                except Exception as e:
                    print(f"Error processing face in {img}: {e}")
        
        # Update and save cache
        face_features.update(updated_features)
        save_cache(face_features, FACE_CACHE)
    
    return face_features

def search_images_by_face(input_image_path, face_features_cache):
    """
    Search for images containing faces from the input image.
    
    Args:
        input_image_path (str): Path to the query image containing faces
        face_features_cache (dict): Dictionary mapping image filenames to their face encodings
        
    Returns:
        list: List of image filenames containing matching faces
    """
    input_encodings = extract_face_features(input_image_path)
    
    # Check if any encodings were found in the input image
    if not input_encodings:
        print("No faces found in the input image.")
        return []

    matching_images = []
    
    # Use ThreadPoolExecutor for parallel face comparison
    with concurrent.futures.ThreadPoolExecutor() as executor:
        def check_image_for_face_match(item):
            img, encodings = item
            
            # Skip images with no face encodings
            if not encodings:
                return None
                
            # Compare each encoding in the cached encodings with the input encodings
            for encoding in encodings:
                matches = face_recognition.compare_faces(input_encodings, encoding, tolerance=0.7)
                if any(matches):  # If there's a match with any of the faces
                    return img
            return None
        
        # Process in parallel and track progress
        futures = {executor.submit(check_image_for_face_match, item): item[0] 
                  for item in face_features_cache.items()}
        
        for future in tqdm(concurrent.futures.as_completed(futures), 
                          total=len(face_features_cache), desc="Searching faces"):
            try:
                match = future.result()
                if match:
                    matching_images.append(match)
            except Exception as e:
                print(f"Error in face comparison: {e}")
    
    return matching_images

In [9]:
# ================ Image Text Search Functions ================

def extract_text_from_image(img_path):
    """
    Extract text from an image using OCR.
    
    Args:
        img_path (str): Path to the image file
        
    Returns:
        str: Extracted text
    """
    try:
        image = Image.open(img_path)
        text = pytesseract.image_to_string(image).strip()
        return text
    except Exception as e:
        print(f"Error extracting text from image {img_path}: {e}")
        return ""

def load_or_extract_image_text(folder_path, embedding_model):
    """
    Load cached OCR results if they exist, otherwise extract and cache them.
    
    Args:
        folder_path (str): Path to the folder containing images
        embedding_model: Text embedding model
        
    Returns:
        dict: Dictionary mapping image filenames to text content and embeddings
    """
    image_text_cache = load_cache(IMAGE_TEXT_CACHE)
    cached_images = set(image_text_cache.keys())
    
    # Find all images in the folder
    all_images = [img for img in os.listdir(folder_path) if img.lower().endswith(('.png', '.jpg', '.jpeg'))]
    print(f"Found {len(all_images)} images in the folder.")
    
    # Extract text for new images
    new_images = [img for img in all_images if img not in cached_images]
    
    # Use ThreadPoolExecutor for I/O bound operations
    if new_images:
        updated_cache = {}
        print(f"Processing {len(new_images)} new images for text extraction...")
        
        with concurrent.futures.ThreadPoolExecutor() as executor:
            # Process images in parallel for OCR
            futures = {executor.submit(extract_text_from_image, 
                                      os.path.join(folder_path, img)): img for img in new_images}
            
            for future in tqdm(concurrent.futures.as_completed(futures), 
                              total=len(new_images), desc="Extracting text from images"):
                img = futures[future]
                try:
                    text = future.result()
                    
                    # If text found, create embedding
                    if text:
                        embedding = encode_text(text, embedding_model)
                        updated_cache[img] = {
                            'text': text,
                            'embedding': embedding
                        }
                    else:
                        updated_cache[img] = {
                            'text': '',
                            'embedding': None
                        }
                except Exception as e:
                    print(f"Error in text extraction for {img}: {e}")
                    updated_cache[img] = {
                        'text': '',
                        'embedding': None
                    }
        
        # Update and save cache
        image_text_cache.update(updated_cache)
        save_cache(image_text_cache, IMAGE_TEXT_CACHE)
    
    return image_text_cache

def search_images_by_text(query, image_text_cache, embedding_model):
    """
    Search for images containing text similar to the query.
    
    Args:
        query (str): Text to search for in images
        image_text_cache (dict): Dictionary of image text data
        embedding_model: Text embedding model
        
    Returns:
        list: List of tuples (image_filename, similarity_score) sorted by relevance
    """
    query_embedding = encode_text(query, embedding_model)
    
    results = []
    for img, data in image_text_cache.items():
        if data['embedding'] is not None:
            similarity = cosine_similarity([query_embedding], [data['embedding']])[0][0]
            if similarity > 0.3:  # Threshold for text similarity
                results.append((img, similarity))
    
    # Sort by similarity score
    results.sort(key=lambda x: x[1], reverse=True)
    return results

In [10]:
# =============================================================================
# VISUALIZATION FUNCTIONS
# =============================================================================

def plot_image_with_boxes(filename, features, folder_path, classes):
    """
    Plot an image with bounding boxes around detected objects.
    
    Args:
        filename (str): Name of the image file
        features (list): List of object features with bounding boxes
        folder_path (str): Path to the folder containing images
        classes (list): List of object class names
    """
    img_path = os.path.join(folder_path, filename)
    image = cv2.imread(img_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    plt.figure(figsize=(10, 10))
    plt.imshow(image)
    plt.axis('off')
    
    for feature in features:
        bbox = feature['bounding_box']
        class_id = feature['class_id']
        confidence = feature['confidence']
        x, y, w, h = bbox
        
        # Draw rectangle and label
        plt.gca().add_patch(plt.Rectangle((x, y), w, h, edgecolor='red', linewidth=2, fill=False))
        plt.text(x, y-10, f"{classes[class_id]}: {confidence:.2f}", 
                 bbox=dict(facecolor='red', alpha=0.5), color='white')
    
    plt.title(filename)
    plt.show()

def plot_matching_images(matching_images, intent, folder_path, input_image_path=None):
    """
    Plot matching images found during search.
    
    Args:
        matching_images (list): List of matching image filenames or (filename, score) tuples
        intent (str): Type of search ('face', 'image', or 'text_image')
        folder_path (str): Path to the folder containing images
        input_image_path (str, optional): Path to the input image (for face search)
    """
    if not matching_images:
        print("No matching images to display.")
        return
        
    num_images = len(matching_images)
    if intent == 'text_image':
        # Extract just the filenames for text_image intent
        matching_images = [img for img, _ in matching_images]
        num_images = len(matching_images)
    
    plt.figure(figsize=(15, 5 * (1 + (num_images // 3))))
    
    # For face intent, display the input image as well
    plot_offset = 0
    if intent == 'face' and input_image_path:
        plt.subplot(1 + (num_images // 3), 3, 1)
        input_image = Image.open(input_image_path)
        plt.imshow(np.array(input_image))
        plt.axis('off')
        plt.title("Input Image")
        plot_offset = 1
    
    # Plot matching images
    for i, result in enumerate(matching_images):
        plt.subplot(1 + (num_images // 3), 3, i + 1 + plot_offset)
        
        if intent == 'text_image':
            img_path = os.path.join(folder_path, result)
        else:
            img_path = os.path.join(folder_path, result)
            
        img = Image.open(img_path)
        plt.imshow(np.array(img))
        plt.axis('off')
        
        if intent == 'text_image':
            plt.title(f"Match {i + 1}")
        else:
            plt.title(f"Match {i + 1}")
            
    plt.tight_layout()
    plt.show()

In [None]:
# =============================================================================
# MAIN
# =============================================================================

if __name__ == "__main__":    
    """
    The program offers four search options:
    - image: Search for images based on content description
    - face: Search for images containing a specific face
    - text_image: Search for images containing specific text
    - file: Search for text files based on content similarity
    """
    # Define path to cache files
    IMAGE_CACHE = os.path.join(CACHE_DIR, CACHE_FILES['image'])
    FACE_CACHE = os.path.join(CACHE_DIR, CACHE_FILES['face'])
    IMAGE_TEXT_CACHE = os.path.join(CACHE_DIR, CACHE_FILES['image_text'])
    TEXT_CACHE = os.path.join(CACHE_DIR, CACHE_FILES['text'])
    
    print("🔍 What would you like to search for?")
    print("Please choose one of the following options:")
    print("  🧑‍🦰 face        - Search for images by a face")
    print("  🖼️ image       - Search for images using a sample object")
    print("  📝 text_image  - Search for images using keywords or text")
    print("  📄 file        - Search across all your text files")
    
    intent = input("\nEnter your choice (face/image/text_image/file): ").strip().lower()

    if intent == "image":
        """
        Image search functionality based on visual content recognition.
        
        Uses an image recognition model to find images matching a query term.
        """
        folder_path = input("Enter Folder to search in: ")
        print(f"Using folder path: {folder_path}")
        
        # Load the YOLO model
        print("Loading YOLO model...")
        net, output_layers, classes = load_yolo_model(
            model_weights=YOLO_CONFIG['weights'],
            config_file=YOLO_CONFIG['config'],
            classes_file=YOLO_CONFIG['classes']
        )
        
        # Load or extract object features
        print("Loading or extracting image features...")
        folder_image_features = load_or_extract_objects(
            folder_path, 
            net, 
            output_layers, 
            OBJECT_CONFIDENCE_THRESHOLD
        )
        
        # Perform search
        query = input("Enter your search query (one word): ")
        matching_images = search_images_by_query(query, folder_image_features, classes)
        
        print(f"Found {len(matching_images)} images matching '{query}'")
        if matching_images:
            print("Matching Images:", matching_images)
            plot_matching_images(matching_images, intent, folder_path)
        else:
            print("No search results found.")
            
    elif intent == "face":
        """
        Face search functionality to find images containing a specific face.
        
        Takes a reference image and finds other images containing the same face.
        """
        folder_path = input("Enter Folder to search in: ")
        print(f"Using folder path: {folder_path}")
        
        # Load or extract face features
        print("Loading or extracting face features...")
        face_features_cache = load_or_extract_face_features(folder_path)
        
        # Perform search
        input_image_path = input("Enter the path to the input image: ")
        matching_images = search_images_by_face(input_image_path, face_features_cache)
        
        print(f"Found {len(matching_images)} images with matching faces")
        if matching_images:
            print("Matching Images:", matching_images)
            plot_matching_images(matching_images, intent, folder_path, input_image_path)
        else:
            print("No matching images found.")
            
    elif intent == "text_image":
        """
        Text-in-image search functionality to find images containing specific text.
        
        Uses OCR to identify images that contain the requested text.
        """
        folder_path = input("Enter Folder to search in: ")
        print(f"Using folder path: {folder_path}")
        
        # Load embedding model
        print("Loading text embedding model...")
        embedding_model = load_text_embedding_model(DEFAULT_MODEL)
        
        # Load or extract text from images
        print("Loading or extracting text from images...")
        image_text_cache = load_or_extract_image_text(folder_path, embedding_model)
        
        # Perform search
        query = input("Enter text to search for in images: ")
        results = search_images_by_text(query, image_text_cache, embedding_model)
        
        print(f"Found {len(results)} images with text similar to '{query}'")
        if results:
            print("Search Results:")
            for img, similarity in results:
                print(f"Image: {img}, Similarity: {similarity:.4f}")
            plot_matching_images(results, "text_image", folder_path)
        else:
            print("No matching images found.")
            
    elif intent == 'file':
        """
        File content search functionality to find text files based on content similarity.
        
        Uses text embeddings to identify files with content similar to the query.
        Supports multiple embedding models for comparison.
        """
        folder_path = 'C:\\Users\\Rishu\\Downloads'
        print(f"Using folder path: {folder_path}")
        
        # Check if folder exists
        if not os.path.exists(folder_path):
            print(f"Error: Folder '{folder_path}' does not exist.")
            exit(1)
          
        # Select embedding model
        model_choice = input("Choose model (bge/jina/default): ").strip().lower()
        model_name = MODEL_ALIASES.get(model_choice, DEFAULT_MODEL)
        print(f"Using model: {model_name}")
        
        # Load embedding model
        print("Loading text embedding model...")
        embedding_model = load_text_embedding_model(model_name)
        
        # Load text embeddings cache
        text_cache = load_cache(TEXT_CACHE)
        
        # Get list of files to process
        print(f"Scanning for text files in {folder_path}...")
        file_paths = get_files_in_directory(
            folder_path, 
            FILE_EXTENSIONS,
            exclude_keywords=DEFAULT_EXCLUDE_KEYWORDS, 
            exclude_filenames=DEFAULT_EXCLUDE_FILENAMES
        )[:2]
        print(f"Found {len(file_paths)} text files to process")
        
        # Show some of the files to verify
        if file_paths:
            print("Sample files:")
            for i, path in enumerate(file_paths[:5]):
                print(f"  {i+1}. {path}")
            if len(file_paths) > 5:
                print(f"  ... and {len(file_paths) - 5} more")
        
        # Choose number of workers
        max_workers_input = input("Maximum number of workers (default: 4): ")
        if max_workers_input.strip() and max_workers_input.isdigit():
            max_workers = int(max_workers_input)
        else:
            max_workers = 4
        
        if file_paths:
            start_time = time.time()
            
            updated_cache, processed_count = index_files(
                file_paths=file_paths, 
                embedding_model=embedding_model, 
                embeddings_cache=text_cache,
                cache_path=TEXT_CACHE,
                max_workers=max_workers, 
                batch_size=DEFAULT_BATCH_SIZE,
                timeout=DEFAULT_TIMEOUT,
            )
            
            # Cache is already saved within index_files function
            # Just update the local reference
            if processed_count > 0:
                text_cache = updated_cache
                
            end_time = time.time()
            print(f"Indexing completed in {end_time - start_time:.2f} seconds")
        else:
            print("No files to index.")
        
        # Search the files
        query = input("Enter your text query: ")
        print(f"Searching for: '{query}'")
        
        results = search_files(query, text_cache, embedding_model)
        
        print("\nSearch Results:")
        if results:
            for i, (file_path, similarity) in enumerate(results):
                print(f"{i+1}. File: {file_path}")
                print(f"   Similarity: {similarity:.4f}")
                print()
        else:
            print("No matching files found.")
    else:
        print("Invalid option. Please choose from: face, image, text_image, or file.")

In [13]:
import concurrent.futures

def square(n):
    return n * n

if __name__ == "__main__":
    numbers = [1, 2, 3, 4, 5]

    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = list(executor.map(square, numbers))

    print(results)  # Output: [1, 4, 9, 16, 25]


BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.