In [None]:
# === 1. KÖRNYEZET BEÁLLÍTÁSA ===
# Könyvtárak telepítése és importálása
%pip install -U torch sentence-transformers accelerate pyarrow pandas tqdm transformers

import pandas as pd
import numpy as np
import gc
import json
import pyarrow as pa
import pyarrow.parquet as pq
from sentence_transformers import SentenceTransformer
import torch
import time
import logging
from tqdm.auto import tqdm
from typing import List
from pathlib import Path
import os

# GPU optimalizáció A100-hoz
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_tf32 = True

print(f"CUDA elérhető: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

In [None]:
# === 2. KONFIGURÁCIÓ ===
# Feltételezzük, hogy a notebook a 'notebooks' mappában van.
# A projekt gyökere a szülő mappa.
project_root = Path.cwd().parent

# Bemeneti útvonalak
# Elsődleges: egyetlen, nagy CSV fájl
UNIFIED_CSV_PATH = project_root / "processed_data" / "cleaned_data_for_embedding.csv"
# Fallback: a chunk-olt CSV-ket tartalmazó mappa
CHUNKED_INPUT_DIR = project_root / "processed_data" / "chunked_cleaned"

# Kimenet: Egyetlen Parquet fájl
OUTPUT_PARQUET = project_root / "processed_data" / "documents_with_embeddings.parquet"

# Modell és feldolgozási paraméterek A100-ra optimalizálva
MODEL_NAME = "Qwen/Qwen3-Embedding-0.6B"
EMBEDDING_DIMENSION = 1024
BATCH_SIZE = 256  # A100-on magasabb batch méretet használhatunk
UNIFIED_CSV_CHUNK_SIZE = 10000 # Mekkora darabokban olvassuk a nagy CSV-t

# Logging beállítása
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

logger.info(f"Elsődleges input: {UNIFIED_CSV_PATH}")
logger.info(f"Fallback input: {CHUNKED_INPUT_DIR}")
logger.info(f"Output fájl: {OUTPUT_PARQUET}")
logger.info(f"Modell: {MODEL_NAME}")
logger.info(f"Batch méret: {BATCH_SIZE}")

In [None]:
# === 3. BEMENETI MÓD MEGHATÁROZÁSA ===
INPUT_MODE = None
if UNIFIED_CSV_PATH.exists():
    INPUT_MODE = 'UNIFIED'
    logger.info(f"✅ Elsődleges mód kiválasztva: Egyetlen CSV fájl feldolgozása ({UNIFIED_CSV_PATH}).")
elif CHUNKED_INPUT_DIR.exists() and any(CHUNKED_INPUT_DIR.glob('*.csv')):
    INPUT_MODE = 'CHUNKED'
    logger.info(f"⚠️  Fallback mód kiválasztva: Chunk-olt CSV-k feldolgozása ({CHUNKED_INPUT_DIR}).")
else:
    error_msg = f"Hiba: Sem az elsődleges input ({UNIFIED_CSV_PATH}), sem a fallback input ({CHUNKED_INPUT_DIR}) nem található."
    logger.error(error_msg)
    raise FileNotFoundError(error_msg)

In [None]:
# === 4. EMBEDDING GENERÁTOR OSZTÁLY ===
# Ez az osztály tiszta és önálló, csak az embedding generálásra fókuszál.
class EmbeddingGenerator:
    def __init__(self, model_name: str, batch_size: int, dimension: int, device: str = 'cuda'):
        self.model_name = model_name
        self.batch_size = batch_size
        self.dimension = dimension
        self.device = device if torch.cuda.is_available() else 'cpu'
        self.model = None
        logger.info(f"Generátor inicializálva a(z) '{self.device}' eszközön.")

    def load_model(self):
        if self.model is not None:
            logger.info("Modell már be van töltve.")
            return
        try:
            logger.info(f"'{self.model_name}' modell betöltése...")
            self.model = SentenceTransformer(self.model_name, device=self.device, trust_remote_code=True)
            self._warmup_model()
            logger.info("Modell sikeresen betöltve és bemelegítve.")
        except Exception as e:
            logger.error(f"Modell betöltési hiba: {e}")
            raise

    def _warmup_model(self):
        logger.info("Modell bemelegítése...")
        self.generate_embeddings(["melegítés"])
        self._cleanup_memory()
        logger.info("Bemelegítés kész.")

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        if self.model is None:
            raise RuntimeError("A modell nincs betöltve. Hívd meg a load_model() metódust.")
        try:
            embeddings = self.model.encode(texts, batch_size=self.batch_size, normalize_embeddings=True, show_progress_bar=False, convert_to_numpy=True)
            if embeddings.shape[1] != self.dimension: # Biztonsági ellenőrzés
                logger.warning(f"Váratlan embedding dimenzió: {embeddings.shape[1]}. Korrekció {self.dimension}-ra.")
                embeddings = embeddings[:, :self.dimension]
            return embeddings.astype(np.float32)
        except Exception as e:
            logger.error(f"Hiba az embedding generálás közben: {e}")
            return np.full((len(texts), self.dimension), np.nan, dtype=np.float32)

    def _cleanup_memory(self):
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

In [None]:
# === 5. FŐ FELDOLGOZÁSI FOLYAMAT ===
def create_metadata_json(row: pd.Series) -> str:
    metadata_cols = [col for col in row.index if col not in ['text', 'embedding']]
    metadata_dict = row[metadata_cols].dropna().to_dict()
    return json.dumps({k: str(v) for k, v in metadata_dict.items()}, ensure_ascii=False)

def process_and_write_chunk(df_chunk, generator, parquet_writer):
    """Feldolgoz egy dataframe chunk-ot és kiírja a Parquet fájlba."""
    texts_to_process = df_chunk['text'].dropna().astype(str).tolist()
    if not texts_to_process:
        return 0

    embeddings = generator.generate_embeddings(texts_to_process)
    df_chunk['embedding'] = list(embeddings)
    df_chunk['metadata_json'] = df_chunk.apply(create_metadata_json, axis=1)

    final_df = df_chunk[['doc_id', 'text', 'embedding', 'metadata_json']]
    pa_table = pa.Table.from_pandas(final_df, preserve_index=False)
    
    parquet_writer.write_table(pa_table)
    return len(final_df)

def main():
    logger.info(f"Feldolgozás indítása '{INPUT_MODE}' módban...")
    start_time = time.time()

    generator = EmbeddingGenerator(MODEL_NAME, BATCH_SIZE, EMBEDDING_DIMENSION)
    generator.load_model()
    
    total_rows_processed = 0
    parquet_writer = None

    try:
        # A séma meghatározásához beolvasunk egyetlen sort
        if INPUT_MODE == 'UNIFIED':
            schema_df = pd.read_csv(UNIFIED_CSV_PATH, nrows=1, engine='python')
        else: # CHUNKED
            chunk_files_list = sorted(CHUNKED_INPUT_DIR.glob("cleaned_chunk_*.csv"))
            schema_df = pd.read_csv(chunk_files_list[0], nrows=1, engine='python')

        schema_df['embedding'] = [np.zeros(EMBEDDING_DIMENSION, dtype=np.float32)]
        schema_df['metadata_json'] = ""
        output_schema = pa.Table.from_pandas(schema_df[['doc_id', 'text', 'embedding', 'metadata_json']], preserve_index=False).schema

        OUTPUT_PARQUET.parent.mkdir(parents=True, exist_ok=True)
        parquet_writer = pq.ParquetWriter(OUTPUT_PARQUET, output_schema, compression='snappy')

        # Feldolgozás a kiválasztott mód szerint
        if INPUT_MODE == 'UNIFIED':
            # A nagy CSV darabonkénti olvasása
            row_count = sum(1 for row in open(UNIFIED_CSV_PATH)) -1
            with tqdm(total=row_count, desc="Sorok feldolgozása", unit="sor") as pbar:
                for df_chunk in pd.read_csv(UNIFIED_CSV_PATH, chunksize=UNIFIED_CSV_CHUNK_SIZE, engine='python'):
                    rows_done = process_and_write_chunk(df_chunk, generator, parquet_writer)
                    total_rows_processed += rows_done
                    pbar.update(rows_done)
        
        elif INPUT_MODE == 'CHUNKED':
            chunk_files = sorted(CHUNKED_INPUT_DIR.glob("cleaned_chunk_*.csv"))
            for chunk_file in tqdm(chunk_files, desc="Chunk fájlok feldolgozása", unit="fájl"):
                df_chunk = pd.read_csv(chunk_file, engine='python')
                total_rows_processed += process_and_write_chunk(df_chunk, generator, parquet_writer)

    finally:
        if parquet_writer:
            parquet_writer.close()
            logger.info("Parquet író sikeresen lezárva.")

    # Összegzés
    total_time_seconds = time.time() - start_time
    rows_per_second = total_rows_processed / total_time_seconds if total_time_seconds > 0 else 0
    print("\n" + "="*50)
    print("✅ FELDOLGOZÁS BEFEJEZVE")
    print(f"📄 Kimeneti fájl: {OUTPUT_PARQUET}")
    print(f"⏱️ Teljes idő: {total_time_seconds / 60:.2f} perc")
    print(f"📊 Feldolgozott sorok: {total_rows_processed:,}")
    print(f"⚡ Átlagos sebesség: {rows_per_second:.2f} sor/mp")
    print("="*50)

# Fő folyamat futtatása
main()

In [None]:
# === 6. VALIDÁCIÓ ===
logger.info("Kimeneti Parquet fájl validálása...")

if OUTPUT_PARQUET.exists():
    try:
        parquet_file = pq.ParquetFile(OUTPUT_PARQUET)
        file_num_rows = parquet_file.metadata.num_rows
        file_size_mb = OUTPUT_PARQUET.stat().st_size / (1024 * 1024)
        
        df_sample = pd.read_parquet(OUTPUT_PARQUET, nrows=5)
        sample_embedding = df_sample['embedding'].iloc[0]
        
        print("\n✅ VALIDÁCIÓ SIKERES!")
        print(f"  Fájl méret: {file_size_mb:.2f} MB")
        print(f"  Sorok száma: {file_num_rows:,}")
        print(f"  Oszlopok: {df_sample.columns.tolist()}")
        print(f"  Első embedding dimenziója: {len(sample_embedding)}")
        print("\n--- Minta Adatsor ---")
        display(df_sample)
        
    except Exception as e:
        logger.error(f"Hiba a Parquet fájl validálása közben: {e}")
        print(f"\n❌ HIBA a validáció során: {e}")
else:
    logger.error("A kimeneti Parquet fájl nem jött létre.")
    print("\n❌ HIBA: A kimeneti fájl nem található!")