In [1]:
# 환경 변수와 자격증명 파일 경로 확인
import os, pathlib
from dotenv import load_dotenv

load_dotenv()
cred = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_PATH") or os.getenv(
    "GOOGLE_APPLICATION_CREDENTIALS"
)
print("GOOGLE_APPLICATION_CREDENTIALS:", cred)
print("Exists:", pathlib.Path(cred).exists() if cred else None)

GOOGLE_APPLICATION_CREDENTIALS: /root/project/Gourmet-with-GeminiEmbedding/secrets/vertex.json
Exists: True


In [2]:
import google.generativeai as genai
from dotenv import load_dotenv
import os
import pandas as pd
import json
import time
from tqdm import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
import vertexai
from vertexai.language_models import TextEmbeddingModel, TextEmbeddingInput
from google.api_core import exceptions

In [4]:
# .env 로드
load_dotenv()

# .env 파일에서 Vertex AI 설정 값 로드
service_account_key_path = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_PATH")
project_id = os.getenv("PROJECT_ID")
location = os.getenv("LOCATION")

# 에러 유무 확인
if not all([service_account_key_path, project_id, location]):
    raise ValueError(
        "GOOGLE_APPLICATION_CREDENTIALS_PATH, PROJECT_ID, LOCATION 환경 변수를 .env 파일에 설정해주세요."
    )

# 서비스 계정 키를 환경 변수에 설정하여 인증
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_key_path

# Vertex AI 객체 초기화
vertexai.init(project=project_id, location=location)

In [5]:
# 불러올 모델명 설정
model_name = "gemini-embedding-001"

In [6]:
# 데이터셋 경로
input_file = "../Dataset/states/dataset_fl_split100k.jsonl"
output_file = "../Dataset/states/fl_split100k"

# vertexAI api call 시 batch크기
BATCH_SIZE = 250
# api call 시 대기 시간(1초)
DELAY_BETWEEN_REQUESTS = 0
# 에러 발생시 대기시간
RETRY_DELAY = 60
# 같은 요청에 대한 최대 재시도 횟수
MAX_RETRIES = 5

In [7]:
# Vertex AI API 호출 함수
print(f"입력 파일에서 데이터를 로드합니다: {input_file}")
try:
    df_reviews = pd.read_json(input_file, lines=True)
    review_texts = df_reviews["text"].astype(str).tolist()
    total_reviews = len(review_texts)
    print(f"총 {total_reviews}개의 리뷰를 로드했습니다.")
except FileNotFoundError:
    print(f"오류: 입력 파일 '{input_file}'을 찾을 수 없습니다.")

입력 파일에서 데이터를 로드합니다: ../Dataset/states/dataset_fl_split100k.jsonl
총 100000개의 리뷰를 로드했습니다.


In [8]:
# 이어하기 기능
start_index = 0
if os.path.exists(output_file):
    try:
        with open(output_file, "r", encoding="utf-8") as f:
            processed_count = sum(1 for line in f)
        if processed_count > 0:
            start_index = processed_count
            print(
                f"기존 파일 '{output_file}'에 {processed_count}개의 임베딩이 저장되어 있습니다."
            )
            print(f"{processed_count}번째 리뷰부터 이어합니다.")
    except Exception as e:
        print(f"기존 출력 파일 처리 중 오류 발생: {e}. 처음부터 다시 시작합니다.")
        start_index = 0

In [9]:
# Vertex AI 모델 로드
print(f"Vertex AI 임베딩 모델을 로드합니다: {model_name}")
model = TextEmbeddingModel.from_pretrained(model_name)
print(f"\n'임베딩 모델 '{model_name}'을 로드했습니다.")

Vertex AI 임베딩 모델을 로드합니다: gemini-embedding-001





'임베딩 모델 'gemini-embedding-001'을 로드했습니다.


In [10]:
# TASK 리스트
TASK = [
    "RETRIEVAL_DOCUMENT",
    "QUESTION_ANSWERING",
    "FACT_VERIFICATION",
    "CODE_RETRIEVAL_QUERY",
    "RETRIEVAL_QUERY",
    "CLASSIFICATION",
    "CLUSTERING",
    "SEMANTIC_SIMILARITY",
]
print(TASK)

['RETRIEVAL_DOCUMENT', 'QUESTION_ANSWERING', 'FACT_VERIFICATION', 'CODE_RETRIEVAL_QUERY', 'RETRIEVAL_QUERY', 'CLASSIFICATION', 'CLUSTERING', 'SEMANTIC_SIMILARITY']


In [None]:
# 임베딩 진행
# file_mode = "a" if start_index > 0 else "w"

base_file = "../Dataset/states/fl_split100k"

for task in TASK:
    file_mode = "w"
    start_index = 0
    output_file = f"{base_file}_{task}.jsonl"
    print(f"\n임베딩 결과를 '{output_file}' 파일에 저장합니다.")

    with open(output_file, file_mode, encoding="utf-8") as f_out:
        for i in tqdm(
            range(start_index, total_reviews, BATCH_SIZE),
            initial=start_index,
            total=total_reviews,
            desc=f"[{task}] 임베딩 진행률",
        ):
            batch_texts = review_texts[i : i + BATCH_SIZE]
            retries = 0

            while retries < MAX_RETRIES:
                try:
                    req = [TextEmbeddingInput(text, task) for text in batch_texts]

                    # 임베딩 모델 call 시 파라미터 지정
                    response = model.get_embeddings(
                        req,
                        auto_truncate=False,
                        output_dimensionality=3072,
                    )

                    # 임베딩 값만 추출
                    embeddings = [embedding.values for embedding in response]

                    # 임베딩 결과를 데이터프레임에 추가
                    batch_df = df_reviews.iloc[i : i + BATCH_SIZE].copy()
                    batch_df["embedding"] = embeddings[: len(batch_df)]

                    json_lines = batch_df.to_json(
                        orient="records", lines=True, force_ascii=False
                    )
                    f_out.write(json_lines)
                    f_out.flush()

                    break

                # api call 장애발생 핸들링
                except exceptions.ResourceExhausted as e:
                    retries += 1
                    # 할당량 초과시 대기
                    print(
                        f"\n[경고] Quota 초과 (429 에러) (배치 인덱스: {i}). {retries}/{MAX_RETRIES}번째 재시도. {RETRY_DELAY}초 후 다시 시도합니다."
                    )
                    time.sleep(RETRY_DELAY)
                except Exception as e:
                    # 문제가 있는 배치는 건너뜀
                    print(f"\n[오류] 처리 불가 (배치 인덱스: {i}): {e}")
                    break

            # 재시도 횟수 초과 시 건너뜀
            if retries == MAX_RETRIES:
                print(f"[실패] 배치 인덱스 {i}를 {MAX_RETRIES}번 재시도 후 건너뜁니다.")

            time.sleep(DELAY_BETWEEN_REQUESTS)

    print(f"\n임베딩 결과가 '{output_file}' 파일에 저장되었습니다.")


임베딩 결과를 '../Dataset/states/fl_split100k_RETRIEVAL_DOCUMENT.jsonl' 파일에 저장합니다.


[RETRIEVAL_DOCUMENT] 임베딩 진행률:   0%|          | 343/100000 [33:56<169:32:05,  6.12s/it]