In [4]:
from PIL import Image
from transformers  import BlipProcessor, BlipForConditionalGeneration
from pathlib import Path
import mimetypes
import hashlib
from pathlib import Path
import faiss
from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM
from transformers import CLIPProcessor, CLIPModel
from transformers import BlipProcessor, BlipForImageTextRetrieval, BlipForConditionalGeneration
import numpy as np
import torch
import os
import shutil


In [5]:
IMG_TXT_PATH = "../models/img_to_text/"
SUMMARIZER_PATH = "../models/summarizer/"
# Paths where you previously saved the BLIP models & processors:
LOCAL_CAPTION_DIR = "../models/img_caption"
LOCAL_RETRIEVAL_DIR    = "../models/embedding"

FILE_STORING_PATH ='../files_DB'
FILE_SYSTEM_PATH  = '../File_System_Simulation'
VECTOR_DB_PATH = '../vectorDB'
# Choose device (GPU if available):
device = "cuda" if torch.cuda.is_available() else "cpu"


In [3]:

def load_models():

    # summarizer:
    summary_tokenizer = AutoTokenizer.from_pretrained(SUMMARIZER_PATH)
    summary_model = AutoModelForSeq2SeqLM.from_pretrained(SUMMARIZER_PATH)
    summarizer = pipeline("summarization", model=summary_model, tokenizer=summary_tokenizer)

    # image captioning
    caption_processor = BlipProcessor.from_pretrained(LOCAL_CAPTION_DIR)
    caption_model     = BlipForConditionalGeneration.from_pretrained(LOCAL_CAPTION_DIR).to(device)

    # embedding model:
    emb_processor = CLIPProcessor.from_pretrained(LOCAL_RETRIEVAL_DIR)
    emb_model     = CLIPModel.from_pretrained(LOCAL_RETRIEVAL_DIR).to(device)
    
    return summarizer, caption_processor, caption_model, emb_processor, emb_model

def text_to_vector(text: str, emb_model, emb_processor) -> np.ndarray:
    inputs = emb_processor(text=text, return_tensors="pt", padding=True).to(device)
    with torch.no_grad():
        text_features = emb_model.get_text_features(**inputs)
    # Normalize to unit vector (L2 norm)
    text_features /= text_features.norm(dim=-1, keepdim=True)
    return text_features.cpu().numpy().squeeze()

# 2. Image to normalized vector (using CLIP)
def image_to_vector(image: Image.Image, emb_model, emb_processor) -> np.ndarray:
    """
    Convert image to normalized L2 vector using CLIP
    
    Args:
        image: PIL Image object
        
    Returns:
        Normalized embedding vector (numpy array)
    """
    inputs = emb_processor(images=image, return_tensors="pt").to(device)
    with torch.no_grad():
        image_features = emb_model.get_image_features(**inputs)
    # Normalize to unit vector (L2 norm)
    image_features /= image_features.norm(dim=-1, keepdim=True)
    return image_features.cpu().numpy().squeeze()

# 3. Image to caption (using BLIP)
def image_to_caption(image: Image.Image,caption_processor, caption_model , max_length: int = 30) -> str:
    """
    Generate caption from image using BLIP
    
    Args:
        image: PIL Image object
        max_length: Maximum caption length (default 30)
        
    Returns:
        Generated caption string
    """
    inputs = caption_processor(image, return_tensors="pt").to(device)
    with torch.no_grad():
        output = caption_model.generate(**inputs, max_length=max_length)
    caption = caption_processor.decode(output[0], skip_special_tokens=True)
    return caption


def summarize_text_file(summarizer, text):
    return summarizer(text, max_length=130, min_length=30, do_sample=False)[0]['summary_text']


def check_file_type(file_path) -> str:
    file_path = Path(file_path)
    mime_type, _ = mimetypes.guess_type(file_path)
    
    if mime_type:
        if mime_type.startswith("image/"):
            return "image"
        elif mime_type.startswith("text/"):
            return "text"
    return "unknown"

def generate_file_id(file_path) -> str:
    file_path = Path(file_path)
    path_hash = hashlib.md5(str(file_path.resolve()).encode()).hexdigest()[:8]
    return f"{file_path.stem}_{path_hash}"



In [6]:
file_path = 'sea.png'
loading_file_path = os.path.join(FILE_SYSTEM_PATH, file_path)


In [8]:
# 0. Load models: 
summarizer, caption_processor, caption_model, emb_processor, emb_model = load_models()

# 1. check file type: image, text, unknown
file_type = check_file_type(file_path)
file_name = Path(file_path).name
file_id = generate_file_id(file_path)

display(
    file_type , 
    file_name,
    file_id
)

Device set to use cpu


'image'

'sea.png'

'sea_54380609'

In [13]:
# Process based on file type
if file_type == 'image':
    try:
        with Image.open(loading_file_path) as img:
            img = img.convert('RGB')
            content = image_to_caption(img, caption_processor, caption_model)
            emb_vec = image_to_vector(img, emb_model, emb_processor)
    except Exception as e:
        print(f"Error processing image {loading_file_path}: {e}")
        
elif file_type == 'text':
    try:
        with open(loading_file_path, 'r', encoding='utf-8') as f:
            full_text = f.read()
            content = summarize_text_file(summarizer, full_text)
            emb_vec = text_to_vector(full_text, emb_model, emb_processor)
    except Exception as e:
        print(f"Error processing text file {file_path}: {e}")
        
else:
    print(f"Skipping unsupported file type: {file_path}")

display(
    content,
    emb_vec
)

'a beach with waves and clouds'

array([ 1.26300165e-02,  1.49946837e-02, -1.45996800e-02, -6.55191718e-03,
        7.53544318e-03, -1.76167302e-02,  1.60724148e-02,  4.06984650e-02,
        4.85199206e-02,  2.16006376e-02,  2.94376276e-02, -2.13964339e-02,
        6.21252926e-03, -2.52609863e-03,  2.10647658e-02, -2.10154112e-02,
       -2.90923771e-02,  4.47862335e-02,  6.62008002e-02, -2.54076961e-02,
       -5.94157726e-02,  2.49163583e-02,  2.05598306e-02, -2.36844621e-03,
       -1.03093972e-02,  4.66534793e-02,  2.48438418e-02, -1.56605430e-02,
       -1.07810348e-02,  1.11055642e-03, -2.72567361e-03,  2.16117781e-02,
       -2.23331414e-02, -4.21095220e-03, -7.18788709e-03,  2.73612514e-03,
       -2.09477171e-02,  1.20199919e-02,  4.29112557e-03,  1.14516146e-01,
        1.94068849e-02,  2.56641693e-02, -7.78234156e-04, -1.19918799e-02,
        1.12227974e-02, -1.12327628e-01, -3.03490795e-02,  1.58793479e-02,
        2.07514153e-03, -8.20735120e-04, -1.13206096e-02,  2.85740644e-02,
        3.02011315e-02, -

In [19]:

def store_file(file_id: str, original_file_path: str, file_type: str):
    # Create storage directory if it doesn't exist
    os.makedirs(FILE_STORING_PATH, exist_ok=True)
    
    # Get the original file extension
    original_path = Path(original_file_path)
    file_extension = original_path.suffix.lower()
    
    # Handle files without extensions
    if not file_extension:
        if file_type == 'image':
            file_extension = '.jpg'  # Default image format
        else:
            file_extension = '.txt'  # Default text format
    
    # Create storage path with file_id + original extension
    storage_path = os.path.join(FILE_STORING_PATH, f"{file_id}{file_extension}")
    
    # Copy the file to storage
    try:
        if file_type == 'image':
            # For images, we use the processed version to maintain RGB format
            with Image.open(original_file_path) as img:
                img = img.convert('RGB')
                img.save(storage_path)
            print(f"Stored image: {storage_path}")
        else:
            # For text files, copy the original content
            shutil.copy2(original_file_path, storage_path)
            print(f"Stored text: {storage_path}")
    except Exception as e:
        print(f"Error storing file {original_file_path}: {e}")
        return None
    
    return storage_path


In [27]:
import os
import json
import faiss
import numpy as np
from pathlib import Path

INDEX_FILE = os.path.join(VECTOR_DB_PATH, "file_index.faiss")
METADATA_FILE = os.path.join(VECTOR_DB_PATH, "index_metadata.json")

# Initialize global variables
index = None
metadata_list = []

def load_or_create_vector_db():
    global index, metadata_list
    os.makedirs(VECTOR_DB_PATH, exist_ok=True)
    # Check if database exists
    if os.path.exists(INDEX_FILE) and os.path.exists(METADATA_FILE):
        print("Loading existing vector database...")
        try:
            # Load FAISS index
            index = faiss.read_index(INDEX_FILE)
            
            # Load metadata
            with open(METADATA_FILE, 'r') as f:
                metadata_list = json.load(f)
                
            print(f"Loaded vector DB with {index.ntotal} entries")
        except Exception as e:
            print(f"Error loading vector DB: {e}")
            create_new_vector_db()
    else:
        print("Creating new vector database...")
        create_new_vector_db()
        
    return index, metadata_list

def create_new_vector_db():
    """Create a new empty vector database"""
    global index, metadata_list
    dimension = 512  # CLIP embedding dimension
    index = faiss.IndexFlatL2(dimension)
    metadata_list = []
    save_vector_db()
    print(f"Created new vector DB with dimension {dimension}")

def save_vector_db():
    """Save the vector database to disk"""
    try:
        # Save FAISS index
        faiss.write_index(index, INDEX_FILE)
        
        # Save metadata
        with open(METADATA_FILE, 'w') as f:
            json.dump(metadata_list, f, indent=2)
            
        print(f"Saved vector DB with {index.ntotal} entries")
    except Exception as e:
        print(f"Error saving vector DB: {e}")

def store_embedding(embedding: np.ndarray, meta_data: dict):
    """
    Store embedding and metadata in vector database
    
    Args:
        embedding: Embedding vector (1D numpy array)
        meta_data: Metadata dictionary
    """
    global index, metadata_list
    
    # Ensure embedding is in correct format
    if len(embedding.shape) == 1:
        embedding = embedding.reshape(1, -1)
    
    # Add to index
    index.add(embedding.astype('float32'))
    
    # Add to metadata
    metadata_list.append(meta_data)
    
    # Save to disk
    save_vector_db()
    
    print(f"Stored embedding for: {meta_data['file_name']}")

from typing import List, Dict, Tuple

# Update the store_Vdb function
def store_Vdb(embedding: np.ndarray, meta_data: dict):
    """Store in vector database (actual implementation)"""
    global index, metadata_list
    
    # Load DB if not already loaded
    if index is None:
        index, metadata_list = load_or_create_vector_db()
    
    # Store the embedding
    store_embedding(embedding, meta_data)
    
    return True
    
def retrieve_all_embeddings():
    """
    Retrieve all stored embeddings and metadata
    
    Returns:
        tuple: (embeddings, metadata_list)
    """
    global index, metadata_list
    
    if index.ntotal == 0:
        return np.array([]), []
    
    # Retrieve all embeddings
    all_embeddings = index.reconstruct_n(0, index.ntotal)
    
    return all_embeddings, metadata_list

def search_similar(query_embedding: np.ndarray, k: int = 5):
    """
    Search for similar vectors in the database
    
    Args:
        query_embedding: Embedding vector to compare against
        k: Number of results to return
        
    Returns:
        list: Metadata of top k matches
    """
    global index, metadata_list
    
    if index.ntotal == 0:
        return []
    
    # Prepare query vector
    if len(query_embedding.shape) == 1:
        query_embedding = query_embedding.reshape(1, -1)
    
    # Perform search
    distances, indices = index.search(query_embedding.astype('float32'), k)
    
    # Get metadata for results
    results = []
    for i in indices[0]:
        if i >= 0:  # FAISS returns -1 for invalid indices
            results.append(metadata_list[i])
    
    return results



# New function for searching files
def search_files(query: str, k: int = 5) -> List[Dict]:
    """
    Search files based on text query
    
    Args:
        query: Text query to search for
        k: Number of results to return
        
    Returns:
        List of metadata dictionaries for matching files
    """
    # Load models and DB
    _, _, _, emb_processor, emb_model = load_models()
    if index is None:
        index, metadata_list = load_or_create_vector_db()
    
    # Convert query to embedding
    query_embedding = text_to_vector(query, emb_model, emb_processor)
    
    # Search vector DB
    results = search_similar(query_embedding, k)
    
    # Format results
    for result in results:
        result['stored_file'] = os.path.join(
            FILE_STORING_PATH, 
            f"{result['file_id']}{Path(result['file_path']).suffix}"
        )
    
    return results

    


In [22]:
# TODO: storing file in DB + file_id
storage_path = store_file(file_id, original_file_path =loading_file_path , file_type =file_type)
storage_path

Stored image: ../files_DB/sea_54380609.png


'../files_DB/sea_54380609.png'

In [28]:
# TODO: store: text_embedding,metadata
metadata = {
    'file_id': file_id,
    'file_name': file_name,
    'file_path': str(Path(file_path).resolve()),
    'file_type': file_type,
    'content': content,
    'storage_path' : storage_path
}

load_or_create_vector_db()
store_Vdb(emb_vec, metadata)


Loading existing vector database...
Loaded vector DB with 1 entries
Saved vector DB with 2 entries
Stored embedding for: sea.png


True

# Semantic Search

In [8]:
import os
import torch
import json
import faiss
from pathlib import Path
from transformers import CLIPProcessor, CLIPModel
import numpy as np

# Get the directory of the current script
LOCAL_RETRIEVAL_DIR =   "../models/embedding"
VECTOR_DB_PATH =   "../vectorDB"
INDEX_FILE =  "file_index.faiss"
METADATA_FILE = "index_metadata.json"


index = None
metadata_list = []
device = "cuda" if torch.cuda.is_available() else "cpu"

def load_models():
    emb_processor = CLIPProcessor.from_pretrained(LOCAL_RETRIEVAL_DIR)
    emb_model     = CLIPModel.from_pretrained(LOCAL_RETRIEVAL_DIR).to(device)
    return emb_processor, emb_model

def text_to_vector(text: str, emb_model, emb_processor) -> np.ndarray:
    inputs = emb_processor(text=text, return_tensors="pt", padding=True).to(device)
    with torch.no_grad():
        text_features = emb_model.get_text_features(**inputs)
    # Normalize to unit vector (L2 norm)
    text_features /= text_features.norm(dim=-1, keepdim=True)
    return text_features.cpu().numpy().squeeze()

def load_or_create_vector_db():
    global index, metadata_list
    os.makedirs(VECTOR_DB_PATH, exist_ok=True)
    # Check if database exists
    if os.path.exists(INDEX_FILE) and os.path.exists(METADATA_FILE):
        print("Loading existing vector database...")
        try:
            # Load FAISS index
            index = faiss.read_index(INDEX_FILE)
            
            # Load metadata
            with open(METADATA_FILE, 'r') as f:
                metadata_list = json.load(f)
                
            print(f"Loaded vector DB with {index.ntotal} entries")
        except Exception as e:
            print(f"Error loading vector DB: {e}")
            create_new_vector_db()
    else:
        print("Creating new vector database...")
        create_new_vector_db()
        
    return index, metadata_list
def create_new_vector_db():
    """Create a new empty vector database"""
    global index, metadata_list
    dimension = 512  # CLIP embedding dimension
    index = faiss.IndexFlatL2(dimension)
    metadata_list = []
    save_vector_db()
    print(f"Created new vector DB with dimension {dimension}")

def save_vector_db():
    """Save the vector database to disk"""
    try:
        # Save FAISS index
        faiss.write_index(index, INDEX_FILE)
        
        # Save metadata
        with open(METADATA_FILE, 'w') as f:
            json.dump(metadata_list, f, indent=2)
            
        print(f"Saved vector DB with {index.ntotal} entries")
    except Exception as e:
        print(f"Error saving vector DB: {e}")

def search_similar(query_embedding: np.ndarray, k: int = 5):
    """
    Search for similar vectors in the database
    
    Args:
        query_embedding: Embedding vector to compare against
        k: Number of results to return
        
    Returns:
        list: Metadata of top k matches
    """
    global index, metadata_list
    
    if index.ntotal == 0:
        return []
    
    # Prepare query vector
    if len(query_embedding.shape) == 1:
        query_embedding = query_embedding.reshape(1, -1)
    
    # Perform search
    distances, indices = index.search(query_embedding.astype('float32'), k)
    
    # Get metadata for results
    results = []
    for i in indices[0]:
        if i >= 0:  # FAISS returns -1 for invalid indices
            results.append(metadata_list[i]['file_path'])
    
    return results



def semantic_search_engine(search_query: str):
    load_or_create_vector_db()
    emb_processor, emb_model = load_models()
    search_emb = text_to_vector(search_query, emb_model, emb_processor)
    relevant_files_paths = search_similar(search_emb)
    return relevant_files_paths




In [10]:
search_query = "sea with blue sky"
semantic_search_engine(search_query)


Loading existing vector database...
Loaded vector DB with 0 entries


[]