In [None]:
# === 1. K√ñRNYEZET BE√ÅLL√çT√ÅSA ===
# K√∂nyvt√°rak telep√≠t√©se √©s import√°l√°sa
!pip install -U torch sentence-transformers accelerate pyarrow pandas tqdm transformers

import pandas as pd
import numpy as np
import gc
import json
import pyarrow as pa
import pyarrow.parquet as pq
from sentence_transformers import SentenceTransformer
import torch
import time
from tqdm.auto import tqdm
from typing import List
from pathlib import Path
import os

# !!! KRITIKUS JAV√çT√ÅS: PyTorch mem√≥ria t√∂redezetts√©g√©nek kezel√©se !!!
# A hiba√ºzenet javaslata alapj√°n be√°ll√≠tjuk ezt a k√∂rnyezeti v√°ltoz√≥t,
# hogy a PyTorch rugalmasabban kezelje a GPU mem√≥ri√°t.
# Ezt minden m√°s PyTorch m≈±velet el≈ëtt kell megtenni.
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

# GPU optimaliz√°ci√≥ A100-hoz
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_tf32 = True

print(f"CUDA el√©rhet≈ë: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

In [None]:
# === 2. KONFIGUR√ÅCI√ì ===
# RunPod felh≈ë k√∂rnyezethez igaz√≠tott konfigur√°ci√≥.

from pathlib import Path
import logging
import csv
import sys

# --- CSV OLVAS√ÅSI LIMIT N√ñVEL√âSE ---
try:
    max_int = sys.maxsize
    while True:
        try:
            csv.field_size_limit(max_int)
            break
        except OverflowError:
            max_int = int(max_int / 10)
except (ValueError, TypeError):
    csv.field_size_limit(1_000_000_000)

INPUT_CSV_PATH = Path("/workspace/cleaned_data_for_embedding.csv")
OUTPUT_PARQUET_PATH = Path("/workspace/documents_with_embeddings.parquet")

MODEL_NAME = "Qwen/Qwen3-Embedding-0.6B"
EMBEDDING_DIMENSION = 1024
# !!! V√âGS≈ê BIZTONS√ÅGI INT√âZKED√âS: BATCH M√âRET CS√ñKKENT√âSE !!!
BATCH_SIZE = 128  # Tov√°bb cs√∂kkentj√ºk 256-r√≥l 128-ra

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

current_limit = csv.field_size_limit()
logger.info(f"CSV field size limit be√°ll√≠tva: {current_limit:,}")

logger.info(f"Input: {INPUT_CSV_PATH}")
logger.info(f"Output: {OUTPUT_PARQUET_PATH}")
logger.info(f"Modell: {MODEL_NAME}")
logger.info(f"Batch m√©ret: {BATCH_SIZE}")

In [None]:
# === 3. EMBEDDING GENER√ÅTOR OSZT√ÅLY ===
# Ez az oszt√°ly tiszta √©s √∂n√°ll√≥, csak az embedding gener√°l√°sra f√≥kusz√°l.
class EmbeddingGenerator:
    def __init__(self, model_name: str, batch_size: int, dimension: int, device: str = 'cuda'):
        self.model_name = model_name
        self.batch_size = batch_size
        self.dimension = dimension
        self.device = device if torch.cuda.is_available() else 'cpu'
        self.model = None
        logger.info(f"Gener√°tor inicializ√°lva a(z) '{self.device}' eszk√∂z√∂n.")

    def load_model(self):
        if self.model is not None:
            logger.info("Modell m√°r be van t√∂ltve.")
            return
        try:
            logger.info(f"'{self.model_name}' modell bet√∂lt√©se...")
            self.model = SentenceTransformer(self.model_name, device=self.device, trust_remote_code=True)
            self._warmup_model()
            logger.info("Modell sikeresen bet√∂ltve √©s bemeleg√≠tve.")
        except Exception as e:
            logger.error(f"Modell bet√∂lt√©si hiba: {e}")
            raise

    def _warmup_model(self):
        logger.info("Modell bemeleg√≠t√©se...")
        self.generate_embeddings(["meleg√≠t√©s"])
        self._cleanup_memory()
        logger.info("Bemeleg√≠t√©s k√©sz.")

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        if self.model is None:
            raise RuntimeError("A modell nincs bet√∂ltve. H√≠vd meg a load_model() met√≥dust.")
        try:
            embeddings = self.model.encode(
                texts, 
                batch_size=self.batch_size, 
                normalize_embeddings=True, 
                show_progress_bar=True, # Legyen progress bar a konzolon
                convert_to_numpy=True
            )
            if embeddings.shape[1] != self.dimension: # Biztons√°gi ellen≈ërz√©s
                logger.warning(f"V√°ratlan embedding dimenzi√≥: {embeddings.shape[1]}. Korrekci√≥ {self.dimension}-ra.")
                embeddings = embeddings[:, :self.dimension]
            return embeddings.astype(np.float32)
        except Exception as e:
            # R√©szletesebb logol√°s a hiba jobb meg√©rt√©s√©hez
            problematic_text_snippet = texts[0][:200] if texts else "√úres a sz√∂veg lista"
            logger.error(f"!!! KRITIKUS HIBA AZ EMBEDDING GENER√ÅL√ÅSKOR !!!")
            logger.error(f"Hiba√ºzenet: {e}")
            logger.error(f"A hib√°t okoz√≥ batch els≈ë sz√∂veg√©nek r√©szlete (els≈ë 200 karakter): '{problematic_text_snippet}'")
            
            # √öjra feldobjuk a hib√°t a teljes hiba-visszak√∂vet√©s√©rt (traceback)
            raise

    def _cleanup_memory(self):
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

In [None]:
# === 4. F≈ê FELDOLGOZ√ÅSI FOLYAMAT ===
def create_metadata_json(row: pd.Series) -> str:
    metadata_cols = [col for col in row.index if col not in ['text', 'embedding']]
    metadata_dict = row[metadata_cols].dropna().to_dict()
    return json.dumps({k: str(v) for k, v in metadata_dict.items()}, ensure_ascii=False)

def main():
    logger.info("Feldolgoz√°s ind√≠t√°sa...")
    start_time = time.time()

    # Bemeneti adatok beolvas√°sa
    if not INPUT_CSV_PATH.exists():
        error_msg = f"Hiba: A bemeneti f√°jl nem tal√°lhat√≥: {INPUT_CSV_PATH}"
        logger.error(error_msg)
        raise FileNotFoundError(error_msg)
    
    logger.info(f"Bemeneti CSV beolvas√°sa: {INPUT_CSV_PATH}")
    df = pd.read_csv(INPUT_CSV_PATH, engine='python', quoting=csv.QUOTE_ALL, on_bad_lines='warn')
    logger.info(f"{len(df):,} sor sikeresen beolvasva.")

    # Sz√∂vegek kinyer√©se √©s tiszt√≠t√°sa
    df['text'] = df['text'].fillna('')
    texts_to_process = df['text'].astype(str).tolist()
    
    if not texts_to_process:
        logger.warning("Nincs feldolgozand√≥ sz√∂veg a bemeneti f√°jlban.")
        return

    # Embedding gener√°tor inicializ√°l√°sa
    generator = EmbeddingGenerator(MODEL_NAME, BATCH_SIZE, EMBEDDING_DIMENSION)
    generator.load_model()

    # --- MEM√ìRIAHAT√âKONY FELDOLGOZ√ÅS DARABOKBAN (CHUNK-OKBAN) ---
    logger.info("Embedding gener√°l√°s megkezd√©se mem√≥riahat√©kony, darabolt m√≥dszerrel.")
    all_embeddings = []
    # Biztons√°gi okokb√≥l cs√∂kkentett darabm√©ret
    processing_chunk_size = 4096 

    for i in tqdm(range(0, len(texts_to_process), processing_chunk_size), desc="Adatdarabok feldolgoz√°sa"):
        batch_texts = texts_to_process[i:i + processing_chunk_size]
        batch_embeddings = generator.generate_embeddings(batch_texts)
        all_embeddings.append(batch_embeddings)
        
        # !!! KRITIKUS JAV√çT√ÅS: GPU mem√≥ria felszabad√≠t√°sa minden darab ut√°n !!!
        generator._cleanup_memory()
            
    # Az √∂sszes darab embeddingjeinek √∂sszef≈±z√©se
    embeddings = np.concatenate(all_embeddings, axis=0)
    
    # Eredm√©nyek hozz√°ad√°sa a DataFrame-hez
    if len(embeddings) == len(df):
        df['embedding'] = list(embeddings)
    else:
        logger.error(f"KRITIKUS HIBA: Az embeddingek sz√°ma ({len(embeddings)}) nem egyezik a DataFrame sorainak sz√°m√°val ({len(df)}). A program le√°ll.")
        return

    # Metaadatok gener√°l√°sa
    tqdm.pandas(desc="Metaadat JSON gener√°l√°sa")
    df['metadata_json'] = df.progress_apply(create_metadata_json, axis=1)

    # Kimeneti DataFrame √©s ment√©s Parquet form√°tumba
    final_df = df[['doc_id', 'text', 'embedding', 'metadata_json']]
    OUTPUT_PARQUET_PATH.parent.mkdir(parents=True, exist_ok=True)
    logger.info(f"Eredm√©nyek ment√©se a Parquet f√°jlba: {OUTPUT_PARQUET_PATH}")
    final_df.to_parquet(OUTPUT_PARQUET_PATH, index=False, compression='snappy')
    
    total_rows_processed = len(final_df)
    total_time_seconds = time.time() - start_time
    rows_per_second = total_rows_processed / total_time_seconds if total_time_seconds > 0 else 0
    
    # √ñsszegz√©s
    print("\n" + "="*50)
    print("‚úÖ FELDOLGOZ√ÅS BEFEJEZVE")
    print(f"üìÑ Kimeneti f√°jl: {OUTPUT_PARQUET_PATH}")
    print(f"‚è±Ô∏è Teljes id≈ë: {total_time_seconds:.2f} m√°sodperc ({total_time_seconds / 60:.2f} perc)")
    print(f"üìä Feldolgozott sorok: {total_rows_processed:,}")
    print(f"‚ö° √Åtlagos sebess√©g: {rows_per_second:.2f} sor/mp")
    print("="*50)

# F≈ë folyamat futtat√°sa
main()

In [None]:
# === 5. VALID√ÅCI√ì ===
logger.info("Kimeneti Parquet f√°jl valid√°l√°sa...")

if OUTPUT_PARQUET_PATH.exists():
    try:
        parquet_file = pq.ParquetFile(OUTPUT_PARQUET_PATH)
        file_num_rows = parquet_file.metadata.num_rows
        file_size_mb = OUTPUT_PARQUET_PATH.stat().st_size / (1024 * 1024)
        
        df_sample = pd.read_parquet(OUTPUT_PARQUET_PATH, engine='pyarrow', use_threads=True).head(5)
        sample_embedding = df_sample['embedding'].iloc[0]
        
        print("\n‚úÖ VALID√ÅCI√ì SIKERES!")
        print(f"  F√°jl m√©ret: {file_size_mb:.2f} MB")
        print(f"  Sorok sz√°ma: {file_num_rows:,}")
        print(f"  Oszlopok: {df_sample.columns.tolist()}")
        print(f"  Els≈ë embedding dimenzi√≥ja: {len(sample_embedding)}")
        print("\n--- Minta Adatsor ---")
        display(df_sample)
        
    except Exception as e:
        logger.error(f"Hiba a Parquet f√°jl valid√°l√°sa k√∂zben: {e}")
        print(f"\n‚ùå HIBA a valid√°ci√≥ sor√°n: {e}")
else:
    logger.error("A kimeneti Parquet f√°jl nem j√∂tt l√©tre.")
    print("\n‚ùå HIBA: A kimeneti f√°jl nem tal√°lhat√≥!")