<a href="https://colab.research.google.com/github/giuseppefutia/cdl2025/blob/master/vLLM_%2B_Qwen_Embedding.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Installation, Importing, Configuration

In [1]:
%%capture
!pip install -qq fastapi uvicorn
!pip install -qq vllm
!pip install -qq pyngrok

In [2]:
from __future__ import annotations

# Standard library
import os
import socket
import subprocess
import sys
import textwrap
import time
from contextlib import contextmanager
from dataclasses import dataclass
from getpass import getpass
from typing import Iterable, Generator, Optional, Sequence

# Third-party
import torch
from pyngrok import ngrok
from fastapi import FastAPI
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from openai import OpenAI
from huggingface_hub import login
from google.colab import userdata
import uvicorn

In [3]:
# You need to store your keys into Secret section
HF_TOKEN = userdata.get("HF_TOKEN")
NGROK_TOKEN = userdata.get("NGROK_TOKEN")

os.environ["HF_TOKEN"] = HF_TOKEN
os.environ["NGROK_TOKEN"] = NGROK_TOKEN

login(HF_TOKEN)

Note: Environment variable`HF_TOKEN` is set and is the current active token independently from the token you've just configured.


In [4]:
!ngrok config add-authtoken "$NGROK_TOKEN"

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [5]:
MODEL = 'google/medgemma-4b-it' # @param {type:'string'}
VLLM_HOST = '0.0.0.0' # @param {type:'string'}
VLLM_PORT = 8000 # @param {type:'integer'}
API_HOST = '127.0.0.1' # @param {type:'string'}
API_PORT = 8001 # @param {type:'integer'}
MAX_MODEL_LEN = 8192 # @param {type:'integer'}
TENSOR_PARALLEL_SIZE = 1 # @param {type:'integer'}

# Embedding Model Deployment with vLLM

In [6]:
# ----------------------------
# Config (simple, safe defaults for Colab)
# ----------------------------
from dataclasses import dataclass
from typing import Optional, Sequence, Generator, Tuple, List
import subprocess, time, socket, torch
from contextlib import contextmanager
from threading import Thread

MODEL = "Alibaba-NLP/gte-Qwen2-7B-instruct"
VLLM_HOST = "0.0.0.0"
VLLM_PORT = 8000
MAX_MODEL_LEN = 32768        # gte-Qwen2-7B-instruct supports 32k context
TENSOR_PARALLEL_SIZE = 1

@dataclass
class VLLMConfig:
    model: str = MODEL
    host: str = VLLM_HOST
    port: int = VLLM_PORT
    max_model_len: int = MAX_MODEL_LEN
    tensor_parallel_size: int = TENSOR_PARALLEL_SIZE
    trust_remote_code: bool = True
    download_dir: Optional[str] = None
    task: str = "embed"                 # <- key change for embeddings
    served_model_name: Optional[str] = None  # optional: alias exposed by server

    @property
    def base_url(self) -> str:
        return f"http://{self.host}:{self.port}"

# ----------------------------
# Small helpers (no filesystem/log files)
# ----------------------------
def select_dtype() -> str:
    if torch.cuda.is_available():
        major, _ = torch.cuda.get_device_capability(0)
        return "bfloat16" if major >= 8 else "float16"
    return "float32"

def wait_for_port(host: str, port: int, timeout: float = 120.0, interval: float = 0.5) -> bool:
    deadline = time.time() + timeout
    while time.time() < deadline:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(1.0)
            if s.connect_ex((host, port)) == 0:
                return True
        time.sleep(interval)
    return False

def build_args(cfg: VLLMConfig, dtype: str) -> Sequence[str]:
    args = [
        "vllm", "serve", cfg.model,
        "--task", cfg.task,                        # <- tell vLLM this is an embedding model
        "--dtype", dtype,
        "--max-model-len", str(cfg.max_model_len),
        "--tensor-parallel-size", str(cfg.tensor_parallel_size),
        "--host", cfg.host,
        "--port", str(cfg.port),
    ]
    if cfg.trust_remote_code:
        args.append("--trust-remote-code")
    if cfg.download_dir:
        args += ["--download-dir", cfg.download_dir]
    if cfg.served_model_name:
        args += ["--served-model-name", cfg.served_model_name]
    return args

# ----------------------------
# Log streaming helpers
# ----------------------------
def _pump(pipe, tag: str, sink: Optional[List[str]] = None):
    """Read a subprocess pipe line-by-line and stream it to stdout (and optional sink)."""
    try:
        for line in iter(pipe.readline, ""):
            if not line:
                break
            print(f"[vLLM {tag}] {line}", end="")   # stream to cell output
            if sink is not None:
                sink.append(f"[{tag}] {line}")
    except Exception as e:
        print(f"[vLLM LOG ERROR {tag}] {e}")
    finally:
        try:
            pipe.close()
        except Exception:
            pass

class VLLMProcess:
    """Wrapper for the vLLM subprocess that also carries in-memory logs."""
    def __init__(self, proc: subprocess.Popen):
        self.proc = proc
        self.logs: List[str] = []
        self._t_out = Thread(target=_pump, args=(proc.stdout, "OUT", self.logs), daemon=True)
        self._t_err = Thread(target=_pump, args=(proc.stderr, "ERR", self.logs), daemon=True)
        self._t_out.start()
        self._t_err.start()

    @property
    def pid(self) -> int:
        return self.proc.pid

    def wait(self, timeout: Optional[float] = None) -> int:
        return self.proc.wait(timeout=timeout)

# ----------------------------
# Start/stop + context manager (with live logs)
# ----------------------------
def start_vllm(cfg: VLLMConfig) -> VLLMProcess:
    dtype = select_dtype()
    args = build_args(cfg, dtype)
    proc = subprocess.Popen(
        args,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
        encoding="utf-8",
        errors="replace",
        start_new_session=True,
        bufsize=1,          # line-buffered for timely streaming
    )
    return VLLMProcess(proc)

def stop_vllm(p: Optional[VLLMProcess]) -> None:
    if not p:
        return
    proc = p.proc
    try:
        proc.terminate()
        try:
            proc.wait(timeout=10)
        except subprocess.TimeoutExpired:
            proc.kill()
    except Exception:
        pass

@contextmanager
def run_vllm(cfg: VLLMConfig) -> Generator[Tuple[str, VLLMProcess], None, None]:
    """Start vLLM, stream logs live, yield (base_url, process), then clean up."""
    p = start_vllm(cfg)
    try:
        if not wait_for_port(cfg.host, cfg.port, timeout=600):
            stop_vllm(p)
            raise RuntimeError("vLLM didn't become ready on time. (Port check failed)")
        print(f"vLLM up at {cfg.base_url} (pid={p.pid})")
        yield (cfg.base_url, p)
    finally:
        stop_vllm(p)


In [None]:
cfg = VLLMConfig(model=MODEL, port=VLLM_PORT)

with run_vllm(cfg) as (base_url, p):  # p is VLLMProcess (has .proc, .pid, .logs)
    print("Server is ready at:", base_url)

    # OpenAI-compatible client pointing to local vLLM
    client = OpenAI(base_url=f"{base_url}/v1", api_key="EMPTY")  # api_key is ignored by vLLM

    # Use the served alias if you set one; otherwise the model id
    served_name = cfg.served_model_name or cfg.model

    out = client.embeddings.create(
        model=served_name,  # e.g. "Alibaba-NLP/gte-Qwen2-7B-instruct"
        input=[
            "A cozy coffee shop with brick walls.",
            "The capital of Italy is Rome."
        ],
    )

    print(len(out.data[0].embedding), len(out.data[1].embedding))


[vLLM ERR] 2025-11-20 07:35:52.133636: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
[vLLM ERR] 2025-11-20 07:35:52.151724: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
[vLLM ERR] E0000 00:00:1763624152.174065    1683 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
[vLLM ERR] E0000 00:00:1763624152.183589    1683 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
[vLLM ERR] W0000 00:00:1763624152.200572    1683 computation_placer.cc:177] computation placer alre

In [None]:
from typing import List, Optional, Union
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from openai import OpenAI

# ---------- Pydantic models ----------
class EmbeddingRequest(BaseModel):
    # Accept a single string or a list of strings; coerce to a list internally
    input: Union[str, List[str]]
    # Optional override of the model per-request; defaults to the app's model_name
    model: Optional[str] = None
    # If True, L2-normalize each embedding before returning
    normalize: bool = False

class MeanPoolRequest(EmbeddingRequest):
    # Same fields; this endpoint returns a single mean-pooled vector
    pass

# ---------- App factory ----------
def build_app(client: OpenAI, model_name: str) -> FastAPI:
    """
    Build a FastAPI that proxies to a vLLM embeddings server via the
    OpenAI-compatible SDK, e.g. client = OpenAI(base_url="http://localhost:8000/v1", api_key="EMPTY")
    """
    app = FastAPI(title="vLLM Embeddings Proxy")

    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],   # tighten for production
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )

    @app.get("/healthz")
    def healthz():
        return {"status": "ok", "model": model_name}

    # -------- Embedding helpers --------
    def _to_list(x: Union[str, List[str]]) -> List[str]:
        return [x] if isinstance(x, str) else x

    def _l2_normalize(vec: List[float]) -> List[float]:
        # Avoid pulling in numpy just for this
        s = sum(v * v for v in vec) ** 0.5
        return [v / s if s > 0 else 0.0 for v in vec]

    # -------- Endpoints --------
    @app.post("/embed")
    def embed(req: EmbeddingRequest):
        inputs: List[str] = _to_list(req.input)
        mdl = req.model or model_name

        resp = client.embeddings.create(
            model=mdl,
            input=inputs,
        )

        # vLLM follows OpenAI schema: resp.data is a list matching inputs order
        vectors = [d.embedding for d in resp.data]
        if req.normalize:
            vectors = [_l2_normalize(v) for v in vectors]

        payload = {
            "model": resp.model,
            "num_inputs": len(inputs),
            "embedding_dims": len(vectors[0]) if vectors else 0,
            "data": vectors,  # List[List[float]]
        }
        return JSONResponse(payload)

    @app.post("/embed/mean")
    def embed_mean(req: MeanPoolRequest):
        inputs: List[str] = _to_list(req.input)
        mdl = req.model or model_name

        resp = client.embeddings.create(
            model=mdl,
            input=inputs,
        )

        vectors = [d.embedding for d in resp.data]
        # Mean-pool across inputs (dimension-wise)
        if not vectors:
            mean_vec: List[float] = []
        else:
            dims = len(vectors[0])
            # Verify consistent dims (defensive)
            assert all(len(v) == dims for v in vectors), "Inconsistent embedding dimensions"
            mean_vec = [sum(v[i] for v in vectors) / len(vectors) for i in range(dims)]

        if req.normalize and mean_vec:
            mean_vec = _l2_normalize(mean_vec)

        payload = {
            "model": resp.model,
            "inputs": len(inputs),
            "embedding_dims": len(mean_vec) if mean_vec else (len(vectors[0]) if vectors else 0),
            "embedding": mean_vec,  # List[float]
        }
        return JSONResponse(payload)

    return app


In [None]:
cfg = VLLMConfig(model=MODEL, port=VLLM_PORT)
with run_vllm(cfg) as (base_url, proc):   # 'proc' is the VLLMProcess; keep the name as in your original

    client = OpenAI(base_url=f"{base_url}/v1", api_key="EMPTY")
    model_name = cfg.served_model_name or cfg.model

    public_url = ngrok.connect(API_PORT).public_url
    # print(f" * ngrok tunnel \"{public_url}\" -> \"{API_HOST}:{API_PORT}\"")

    app = build_app(client, model_name)
    config = uvicorn.Config(app, host=API_HOST, port=API_PORT, log_level="info")
    server = uvicorn.Server(config)

    await server.serve()