In [None]:
!pip install sentence-transformers faiss-cpu ujson tqdm

In [None]:
!pip install -U unsloth transformers accelerate bitsandbytes


In [None]:
import os
import json
import gc

import numpy as np
import pandas as pd
import torch

from tqdm import tqdm
from sentence_transformers import SentenceTransformer
import pyarrow as pa
import pyarrow.parquet as pq


In [None]:
os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:true")

faq_input      = ""
product_input  = ""

faq_out_parquet     = ""
faq_out_npy         = ""
product_out_parquet = ""
product_out_npy     = ""

MODEL_NAME      = "BAAI/bge-m3"
DEVICE          = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE_INIT = 64
SHARD_SIZE      = 2000
NORMALIZE       = True
MAX_SEQ_LENGTH  = 512

print(f"Device: {DEVICE}")

def load_any(path):
    path_l = path.lower()
    if path_l.endswith(".jsonl"):
        rows = []
        with open(path, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if line:
                    rows.append(json.loads(line))
        return rows
    elif path_l.endswith(".json"):
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
        if isinstance(data, list):
            return data
        if isinstance(data, dict):
            if "items" in data and isinstance(data["items"], list):
                return data["items"]
            try:
                return list(data.values())
            except Exception:
                return [data]
    raise ValueError(f"Unsupported file type: {path}")

def safe_encode(model, texts, batch_size, normalize=True):
    bs = batch_size
    while True:
        try:
            vecs = model.encode(
                texts,
                batch_size=bs,
                show_progress_bar=False,
                convert_to_numpy=True,
                normalize_embeddings=normalize
            )
            return vecs, bs
        except RuntimeError as e:
            if "cuda out of memory" in str(e).lower() and bs > 4 and DEVICE == "cuda":
                bs = max(4, bs // 2)
                torch.cuda.empty_cache()
                print(f"OOM - giảm batch_size xuống {bs}")
            else:
                raise

def _meta_to_json(x):
    if isinstance(x, (dict, list)):
        return json.dumps(x, ensure_ascii=False)
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return ""
    return str(x)

def _coerce_df_strings(df_sh):
    for c in ["id","type","url","category","title","text","meta"]:
        if c not in df_sh.columns:
            df_sh[c] = None
    df_sh["meta"] = df_sh["meta"].apply(_meta_to_json)
    for c in ["id","type","url","category","title","text"]:
        df_sh[c] = df_sh[c].astype(str).replace("nan", "").fillna("")
    return df_sh

PARQUET_SCHEMA = pa.schema([
    pa.field("id",       pa.string()),
    pa.field("type",     pa.string()),
    pa.field("url",      pa.string()),
    pa.field("category", pa.string()),
    pa.field("title",    pa.string()),
    pa.field("text",     pa.string()),
    pa.field("meta",     pa.string()),
    pa.field("embedding", pa.list_(pa.float32())),
])

def embed_streaming(INPUT_PATH, OUT_PARQUET, OUT_NPY, model):
    rows = load_any(INPUT_PATH)
    df = pd.DataFrame(rows)
    if "text" not in df.columns:
        raise ValueError(f"Thiếu cột 'text' trong dữ liệu: {INPUT_PATH}")

    print(f"Loading: {INPUT_PATH} | Rows: {len(df)}")

    try:
        model.max_seq_length = MAX_SEQ_LENGTH
    except Exception:
        pass

    N = len(df)
    writer = None
    memmap = None
    dim = None
    current_offset = 0
    batch_size = BATCH_SIZE_INIT

    for start in tqdm(range(0, N, SHARD_SIZE), desc=f"Embedding shards {os.path.basename(OUT_PARQUET)}"):
        end = min(N, start + SHARD_SIZE)
        df_sh = df.iloc[start:end].copy()
        texts = df_sh["text"].astype(str).tolist()

        vecs, batch_size = safe_encode(model, texts, batch_size, normalize=NORMALIZE)

        if dim is None:
            dim = vecs.shape[1]
            memmap = np.memmap(OUT_NPY, dtype=np.float32, mode="w+", shape=(N, dim))
            writer = pq.ParquetWriter(OUT_PARQUET, schema=PARQUET_SCHEMA, compression="zstd")
            print(f"embedding_dim = {dim}")
            print(f"Init writers {OUT_PARQUET} | {OUT_NPY}")

        memmap[current_offset:current_offset + len(df_sh)] = vecs.astype(np.float32)

        df_sh = _coerce_df_strings(df_sh)
        emb_list = [v.astype(np.float32).tolist() for v in vecs]
        table = pa.Table.from_arrays(
            [
                pa.array(df_sh["id"],       type=pa.string()),
                pa.array(df_sh["type"],     type=pa.string()),
                pa.array(df_sh["url"],      type=pa.string()),
                pa.array(df_sh["category"], type=pa.string()),
                pa.array(df_sh["title"],    type=pa.string()),
                pa.array(df_sh["text"],     type=pa.string()),
                pa.array(df_sh["meta"],     type=pa.string()),
                pa.array(emb_list,          type=pa.list_(pa.float32())),
            ],
            names=["id","type","url","category","title","text","meta","embedding"]
        )
        writer.write_table(table)

        current_offset += len(df_sh)

        del df_sh, texts, vecs, table, emb_list
        if DEVICE == "cuda":
            torch.cuda.empty_cache()
        gc.collect()

    if writer is not None:
        writer.close()
    if memmap is not None:
        del memmap

    print(f"DONE {OUT_PARQUET} | {OUT_NPY}  (rows={N}, dim={dim})")

for p in [faq_out_parquet, faq_out_npy, product_out_parquet, product_out_npy]:
    try:
        os.remove(p)
    except FileNotFoundError:
        pass

model = SentenceTransformer(MODEL_NAME, device=DEVICE)
print(f"Loaded model: {MODEL_NAME} | max_seq_length={getattr(model,'max_seq_length','N/A')}")

embed_streaming(faq_input, faq_out_parquet, faq_out_npy, model)
embed_streaming(product_input, product_out_parquet, product_out_npy, model)

print("All done.")


In [None]:
faq_df = pd.read_parquet("faq_vectors_bge_new.parquet")
E_faq = np.vstack(faq_df["embedding"].to_numpy()).astype("float32")
np.save("faq_vectors_bge.npy", E_faq)

prod_df = pd.read_parquet("product_vectors_bge.parquet")
E_prod = np.vstack(prod_df["embedding"].to_numpy()).astype("float32")
np.save("product_vectors_bge.npy", E_prod)


In [None]:
faq_parquet = "faq_vectors_bge.parquet"
faq_npy     = "faq_vectors_bge_fixed.npy"

product_parquet = "product_vectors_bge.parquet"
product_npy     = "product_vectors_bge.npy"


faq_out_parquet     = "faq_vectors_bge_new.parquet"
faq_out_npy         = "faq_vectors_bge_new.npy"
product_out_parquet = "product_vectors_bge.parquet"
product_out_npy     = "product_vectors_bge.npy"

for p in [faq_parquet, faq_npy, product_parquet, product_npy]:
    print(p, " " if os.path.exists(p) else " ")
from numpy.linalg import norm

faq_df  = pd.read_parquet(faq_parquet)
E_faq   = np.load(faq_npy, allow_pickle=True)
prod_df = pd.read_parquet(product_parquet)
E_prod  = np.load(product_npy, allow_pickle=True)

print(f"FAQ shape:     {E_faq.shape}")
print(f"Product shape: {E_prod.shape}")

L2_faq  = norm(E_faq, axis=1)
L2_prod = norm(E_prod, axis=1)

