# Embedding Models Server for RAG Evaluation

This notebook sets up an API endpoint for embedding models that can be accessed remotely. It leverages Colab's A100 GPU for computation and exposes the embeddings through a simple Flask API, made public using ngrok.

In [None]:
# Install required packages
!pip install flask pyngrok transformers sentence-transformers FlagEmbedding flask-cloudflared -q

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/163.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━[0m [32m153.6/163.8 kB[0m [31m4.5 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m163.8/163.8 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m491.5/491.5 kB[0m [31m14.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m100.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m89.4 MB/s[0m e

In [None]:
# Check if GPU is available
import torch
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU name: {torch.cuda.get_device_name(0)}")

CUDA available: True
GPU name: NVIDIA A100-SXM4-40GB


In [None]:
# Define your authentication token (change this to a secure value)
AUTH_TOKEN = "your_secure_token_here"

# Optional: Store as environment variable
import os
os.environ['AUTH_TOKEN'] = AUTH_TOKEN

In [None]:
# Load embedding models
from sentence_transformers import SentenceTransformer
import torch
import time
from FlagEmbedding import BGEM3FlagModel

# Initialize dictionary to store models
models = {}

# Function to load a regular sentence transformer model
def load_model(name, model_id):
    print(f"Loading {name} model...")
    start_time = time.time()
    model = SentenceTransformer(model_id)
    # Move to GPU if available
    if torch.cuda.is_available():
        model = model.to(torch.device("cuda"))
    print(f"Loaded {name} in {time.time() - start_time:.2f} seconds")
    return model

# Function to load Nomic model
def load_nomic_model():
    print("Loading Nomic Embed model...")
    start_time = time.time()
    model = SentenceTransformer("nomic-ai/nomic-embed-text-v2-moe", trust_remote_code=True)
    # Move to GPU if available
    if torch.cuda.is_available():
        model = model.to(torch.device("cuda"))
    print(f"Loaded Nomic Embed in {time.time() - start_time:.2f} seconds")
    return model

# Function to load BGE-M3 model
def load_bge_m3_model():
    print("Loading BGE-M3 model...")
    start_time = time.time()
    model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=True)
    print(f"Loaded BGE-M3 in {time.time() - start_time:.2f} seconds")
    return model

# Load models
models["e5"] = load_model("E5", "intfloat/multilingual-e5-large")
models["nomic"] = load_nomic_model()
models["bge-m3"] = load_bge_m3_model()

# Test embedding generation
test_text = "This is a test sentence for embedding models."
print("Testing models with a sample text...")

# Test E5
e5_embedding = models["e5"].encode(test_text)
print(f"E5 embedding shape: {e5_embedding.shape}")

# Test Nomic
nomic_embedding = models["nomic"].encode([test_text], prompt_name="passage")
print(f"Nomic embedding shape: {nomic_embedding.shape}")

# Test BGE-M3
bge_m3_output = models["bge-m3"].encode(test_text)
bge_m3_embedding = bge_m3_output['dense_vecs']
print(f"BGE-M3 embedding shape: {len(bge_m3_embedding) if isinstance(bge_m3_embedding, list) else bge_m3_embedding.shape}")

In [None]:
# Import necessary libraries
from flask import Flask, request, jsonify
from flask_cloudflared import run_with_cloudflared
import numpy as np
import time
import traceback
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Create Flask app
app = Flask(__name__)
run_with_cloudflared(app)  # This is key - it adds the Cloudflare tunnel functionality

# Authentication middleware
@app.before_request
def authenticate():
    if request.path == '/health':
        return None  # Skip auth for health check
    auth_header = request.headers.get('Authorization')
    if not auth_header or auth_header != f"Bearer {AUTH_TOKEN}":
        return jsonify({"error": "Unauthorized"}), 401

# Health check endpoint (no auth required)
@app.route('/health', methods=['GET'])
def health_check():
    return jsonify({"status": "healthy", "models": list(models.keys())})

# Embedding endpoint
@app.route('/embed', methods=['POST'])
def embed():
    try:
        data = request.json

        # Strictly require all parameters
        if 'model' not in data:
            return jsonify({"error": "Missing 'model' in request"}), 400

        if 'texts' not in data:
            return jsonify({"error": "Missing 'texts' in request"}), 400

        model_name = data['model']
        texts = data['texts']

        # Validate model
        if model_name not in models:
            return jsonify({"error": f"Model '{model_name}' not found. Available models: {list(models.keys())}"}), 400

        # Handle single text or list of texts
        if isinstance(texts, str):
            texts = [texts]

        # Get the model
        model = models[model_name]
        start_time = time.time()

        # Model-specific encoding logic
        if model_name == "bge-m3":
            # BGE-M3 requires extracting the dense vectors from the returned dict
            result = model.encode(texts)
            embeddings = result['dense_vecs']

        elif model_name == "nomic":
            # Nomic Embed requires task instruction
            is_query = data.get('is_query', False)  # Default to document mode for Nomic

            # Use SentenceTransformer's prompt_name parameter
            prompt_name = "query" if is_query else "passage"
            embeddings = model.encode(texts, prompt_name=prompt_name)

        elif model_name.startswith("e5"):
            prefix = "query: " if data.get("is_query", False) else "passage: "
            texts = [prefix + t for t in texts]
            embeddings = model.encode(texts)
        # Convert to list for JSON serialization
        if isinstance(embeddings, np.ndarray):
            embeddings = embeddings.tolist()

        processing_time = time.time() - start_time

        return jsonify({
            "embeddings": embeddings,
            "model": model_name,
            "processing_time": processing_time,
            "dimensions": len(embeddings[0]) if embeddings else 0
        })

    except Exception as e:
        error_traceback = traceback.format_exc()
        logger.error(f"Error: {str(e)}\n{error_traceback}")
        return jsonify({
            "error": str(e),
            "traceback": error_traceback
        }), 500

# Main block to run the app
if __name__ == '__main__':
    # Print information before starting the app
    print(f"\n\n===== IMPORTANT =====")
    print(f"Auth Token: {AUTH_TOKEN}")
    print(f"====================\n")
    print(f"The Cloudflare Tunnel URL will appear below once the server starts")
    print(f"Your server will remain active as long as this notebook is running")

    # Run the app - this starts both Flask and the Cloudflare tunnel
    # You can customize ports if needed
    app.run(host='0.0.0.0', port=5000) # metrics_port will be chosen randomly

    # Note: execution will block here until the app is stopped