In [None]:
# 패키지 임포트
import dataiku  # Dataiku 라이브러리 임포트
from dataiku import pandasutils as pdu  # Pandas 유틸리티를 Dataiku에서 임포트
from dataiku.snowpark import DkuSnowpark  # Dataiku Snowpark 클래스 임포트

import pandas as pd  # Pandas 데이터 처리 라이브러리 임포트
import torch  # PyTorch 딥러닝 라이브러리 임포트
import importlib  # 동적 모듈 임포트를 위한 라이브러리
import json  # JSON 데이터 처리를 위한 라이브러리
import subprocess  # 하위 프로세스를 관리하기 위한 라이브러리
import requests  # HTTP 요청을 위한 라이브러리
import logging  # 로깅을 위한 라이브러리

import transformers  # Hugging Face의 Transformers 라이브러리 임포트
from transformers import pipeline  # 파이프라인 API 임포트
from transformers import AutoTokenizer, AutoModelForCausalLM  # 자동 토크나이저 및 대화형 모델 클래스 임포트
from langchain.embeddings import HuggingFaceEmbeddings  # Hugging Face 임베딩을 위한 LangChain 라이브러리 임포트

from snowflake.ml.registry import model_registry  # Snowflake ML 모델 레지스트리 임포트
from snowflake.ml.model import deploy_platforms  # Snowflake ML 모델 배포 플랫폼 임포트
from snowflake.ml.model.models import huggingface_pipeline  # Hugging Face 파이프라인 모델 임포트
from snowflake.ml.model.models import llm  # LLM(대형 언어 모델) 관련 임포트
from snowflake.ml.model import custom_model  # 사용자 정의 모델 관련 임포트
from snowflake.ml.model import model_signature  # 모델 서명 관련 임포트

from snowflake.snowpark.functions import lit  # Snowpark 함수에서 리터럴(literal) 함수 임포트
import snowflake.connector  # Snowflake 데이터베이스 연결을 위한 라이브러리 임포트

# 로깅 설정
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)

In [None]:
# 모델 이름, GPU 수, 최대 토큰 수 등을 제어하는 파라미터 설정

# Snowflake 모델 레지스트리에 LLM을 배포할 수 있는 데이터 아이쿠 관리 UI에서의 Snowflake 연결 이름
snowflake_connection_name = "spcs-access-only"  # Snowflake 연결 이름

# 배포할 주요 LLM의 이름
main_llm_model_name = "ZEPHYRBeta"  # 또는 "LLAMA2" 또는 "FALCON" 또는 "ZEPHYRBeta" 또는 "Phi2"

# 주요 LLM을 배포할 SPCS 컴퓨트 풀 - Snowflake 측에서 설정되어 있어야 함
main_llm_compute_pool = "DATAIKU_GPU_NV_S_MODEL_COMPUTE_POOL"  # 메인 LLM을 배포할 SPCS 컴퓨트 풀 이름

embedding_llm_model_name = "MiniLM-L6-v2"  # 배포할 임베딩 LLM 모델 이름

# 임베딩 LLM을 배포할 SPCS 컴퓨트 풀 - Snowflake 측에서 설정되어 있어야 함
embedding_llm_compute_pool = "DATAIKU_CPU_X64_XS_EMBED_COMPUTE_POOL" 

# LLM을 등록할 Snowflake 모델 레지스트리 DB 및 스키마
database_name = "DATAIKU_SPCS"  # Snowflake 모델 레지스트리 데이터베이스 이름
schema_name = "MODEL_REGISTRY"  # Snowflake 모델 레지스트리 스키마 이름

external_access_integrations = ["SNOWFLAKE_EGRESS_ACCESS_INTEGRATION"]  # Snowflake 외부 접근 통합 이름

# LLM과 관련된 추가 파라미터 설정

main_llm_num_gpus = 1  # 메인 LLM에 사용 가능한 GPU 수

main_llm_max_instances = 1  # 메인 LLM의 최대 인스턴스 수

embedding_llm_num_gpus = 0  # 임베딩 LLM에 사용 가능한 GPU 수

embedding_llm_max_instances = 1  # 임베딩 LLM의 최대 인스턴스 수

max_new_tokens = 200  # 메인 LLM에서 생성할 최대 새로운 토큰 수

# Hugging Face 토큰 가져오기
# hugging_face_token = "<YOUR_HF_TOKEN>"
hugging_face_token = dataiku.get_custom_variables()["hugging_face_token"]

In [None]:
# Snowpark의 DSS 래퍼 생성
dku_snowpark = DkuSnowpark()
snowpark_session = dku_snowpark.get_session(snowflake_connection_name)

In [None]:
def deploy_main_llm_to_spcs(modelname, snowparksession, computepool, maxnewtokens, numgpus, maxinstances, databasename, schemaname, hftoken, externalaccessintegrations):
    """
    Snowpark ML을 사용하여 SPCS에 채팅 완료 LLM 엔드포인트를 배포합니다.
    """
    
    # 모델 이름과 실제 HuggingFace 이름, 토크나이저(필요한 경우), pip/conda 의존성을 매핑
    model_name_hf_mapping = {
        "FALCON": {
            "hf_name": "tiiuae/falcon-7b-instruct",  # HuggingFace 모델 이름
            "tokenizer": AutoTokenizer.from_pretrained("tiiuae/falcon-7b-instruct"),  # 토크나이저
            "token": None,  # 인증 토큰 (필요한 경우)
            "deployment_name": "falcon_7b_predict",  # 배포 이름
            "pip_requirements": ["einops","snowflake-snowpark-python==1.9.0"],  # pip 요구 사항
            "conda_dependencies": None  # conda 의존성
        },
        "LLAMA2": {
            "hf_name": "meta-llama/Llama-2-7b-chat-hf",
            "tokenizer": None,
            "token": hftoken,
            "deployment_name": "llama2_7b_predict",
            "pip_requirements": None,
            "conda_dependencies": ["snowflake-snowpark-python==1.9.0"]
        },
        "ZEPHYRBeta": {
            "hf_name": "HuggingFaceH4/zephyr-7b-beta",
            "tokenizer": None,
            "token": None,
            "deployment_name": "zephyr_beta_7b_predict",
            "pip_requirements": None,
            "conda_dependencies": ["snowflake-snowpark-python==1.9.0", "transformers==4.34.1"]
        },
        "Phi2": {
            "hf_name": "microsoft/phi-2",
            "tokenizer": None,
            "token": None,
            "deployment_name": "phi2_predict",
            "pip_requirements": None,
            "conda_dependencies": ["snowflake-snowpark-python==1.9.0", "transformers==4.37.1"]
        }
    }

    # 모델 이름에 대한 Hugging Face 이름, 토크나이저, 토큰을 가져오기
    hf_model_name = model_name_hf_mapping[modelname]["hf_name"]
    tokenizer = model_name_hf_mapping[modelname]["tokenizer"]
    token = model_name_hf_mapping[modelname]["token"]

    # HuggingFace 파이프라인 모델을 가져옴
    hf_model = huggingface_pipeline.HuggingFacePipelineModel(
        task="text-generation",  # 작업 유형
        model=hf_model_name,  # 모델 이름
        tokenizer=tokenizer,  # 토크나이저
        token=token,  # 인증 토큰
        trust_remote_code=True,  # 원격 코드 신뢰 설정
        return_full_text=False,  # 전체 텍스트 반환 여부
        max_new_tokens=maxnewtokens,  # 생성할 최대 새로운 토큰 수
        device_map="auto",  # 장치 맵 자동 설정
        model_kwargs={"load_in_8bit": True}  # 모델 로딩 옵션
    )

    # Snowflake 모델 레지스트리 가져오기 (존재하지 않으면 생성)
    registry = model_registry.ModelRegistry(
        session=snowparksession,
        database_name=databasename,
        schema_name=schemaname,
        create_if_not_exists=True # 존재하지 않으면 생성
    )

    # 모델 버전 1
    model_version = "1"

    # 레지스트리에 이전 모델 버전 삭제 시도
    try:
        registry.delete_model(model_name=modelname, model_version=model_version)
    except:
        pass # 삭제 중 오류가 발생하면 무시

    # Snowflake 모델 레지스트리에 모델 기록하기
    hf_model_registry = registry.log_model(
        model_name=modelname,
        model_version=model_version,
        model=hf_model,
        pip_requirements=model_name_hf_mapping[modelname]["pip_requirements"],
        conda_dependencies=model_name_hf_mapping[modelname]["conda_dependencies"]
    )

    # 레지스트리에서 SPCS 컴퓨트 풀에 모델 배포
    deployed_model = hf_model_registry.deploy(
        deployment_name=model_name_hf_mapping[modelname]["deployment_name"],
        platform=deploy_platforms.TargetPlatform.SNOWPARK_CONTAINER_SERVICES,
        options={
            "compute_pool": computepool,  # 컴퓨트 풀 이름
            "num_gpus": numgpus,  # 사용할 GPU 수
            "max_instances": maxinstances,  # 최대 인스턴스 수
            "enable_ingress": True,  # 인그레스 활성화 여부
            "external_access_integrations": externalaccessintegrations  # 외부 접근 통합
        }
    )

    print("메인 LLM을 SPCS에 배포했습니다.")
    return deployed_model # 배포된 모델 반환

In [None]:
def deploy_embedding_llm_to_spcs(modelname, snowparksession, computepool, numgpus, maxinstances, databasename, schemaname, hftoken, externalaccessintegrations):
    """
    Snowpark ML을 사용하여 SPCS에 임베딩 LLM 엔드포인트를 배포합니다.
    """
    
    # 모델 이름 (현재는 하나만 작성됨)과 Hugging Face 이름 및 pip/conda 의존성을 매핑
    model_name_hf_mapping = {
        "MiniLM-L6-v2": {
            "hf_name": "sentence-transformers/all-MiniLM-L6-v2",  # HuggingFace 모델 이름
            "deployment_name": "MiniLM_L6_v2_embed",  # 배포 이름
            "pip_requirements": None,  # pip 요구 사항
            "conda_dependencies": ["snowflake-snowpark-python==1.9.0", "transformers==4.34.1", "sentence-transformers==2.2.2", "langchain"]  # conda 의존성
        }
    }

    # 모델 이름이 "MiniLM-L6-v2"인 경우 HuggingFace 모델 이름 설정
    if modelname == "MiniLM-L6-v2":
        hf_model_name = "sentence-transformers/all-MiniLM-L6-v2"

    # LLM을 감싸고 SPCS 표준에 맞게 조정하는 사용자 정의 임베딩 LLM 클래스 생성
    class EmbeddingLLMCustom(custom_model.CustomModel):
        def __init__(self, context: custom_model.ModelContext) -> None:
            super().__init__(context)  # 부모 클래스 초기화

            # HuggingFace 임베딩 초기화
            self.embeddings = HuggingFaceEmbeddings(
                model_name=model_name_hf_mapping[modelname]["hf_name"],  # HuggingFace 모델 이름
            )

        @custom_model.inference_api  # 추론 API 데코레이터
        def predict(self, X: pd.DataFrame) -> pd.DataFrame:
            # 입력 텍스트를 임베딩하는 내부 함수 정의
            def _embed(input_text: str) -> str:
                query_result = self.embeddings.embed_query(input_text)  # 쿼리 결과 생성
                return str(query_result)  # 결과를 문자열로 반환

            # 입력 데이터 프레임의 각 입력에 대해 _embed 함수 호출
            res_df = pd.DataFrame({"outputs": pd.Series.apply(X["inputs"], _embed)})
            return res_df  # 결과 데이터 프레임 반환


    # 사용자 정의 LLM 클래스 가져오기
    hf_model = EmbeddingLLMCustom(custom_model.ModelContext())

    # Snowflake 모델 레지스트리 가져오기 (존재하지 않으면 생성)
    registry = model_registry.ModelRegistry(
        session=snowparksession,
        database_name=databasename,
        schema_name=schemaname,
        create_if_not_exists=True  # 존재하지 않으면 생성
    )

    # 모델 버전 1
    model_version = "1"

    # 레지스트리에 이미 모델이 존재하면 이전 버전을 삭제
    try:
        registry.delete_model(model_name=modelname, model_version=model_version)
    except:
        pass  # 삭제 중 오류가 발생하면 무시

    # Snowflake 모델 레지스트리에 모델 기록하기
    hf_model_registry = registry.log_model(
        model_name=modelname,
        model_version=model_version,
        model=hf_model,
        conda_dependencies=model_name_hf_mapping[modelname]["conda_dependencies"],
        signatures={
            "predict": model_signature.ModelSignature(
                inputs=[model_signature.FeatureSpec(name="inputs", dtype=model_signature.DataType.STRING)],
                outputs=[model_signature.FeatureSpec(name="outputs", dtype=model_signature.DataType.STRING)],
            )
        }
    )

    # 레지스트리에서 선택한 SPCS 컴퓨트 풀에 모델을 배포
    deployed_model = hf_model_registry.deploy(
        deployment_name=model_name_hf_mapping[modelname]["deployment_name"],  # 배포 이름
        platform=deploy_platforms.TargetPlatform.SNOWPARK_CONTAINER_SERVICES,  # 배포 플랫폼
        options={
            "compute_pool": computepool,  # 컴퓨트 풀 이름
            "max_instances": maxinstances,  # 최대 인스턴스 수
            "num_gpus": numgpus,  # 사용할 GPU 수
            "enable_ingress": True,  # 인그레스 활성화 여부
            "external_access_integrations": externalaccessintegrations  # 외부 접근 통합
        }
    )

    print("SPCS에 임베딩 LLM을 배포했습니다.")  # 배포 완료 메시지 출력

    return deployed_model  # 배포된 모델 반환

In [None]:
# 메인 LLM을 SPCS에 배포
main_llm_on_spcs = deploy_main_llm_to_spcs(
    main_llm_model_name,  # 배포할 메인 LLM 모델 이름
    snowpark_session,  # Snowpark 세션
    main_llm_compute_pool,  # 메인 LLM을 배포할 컴퓨트 풀
    max_new_tokens,  # 메인 LLM이 생성할 최대 새로운 토큰 수
    main_llm_num_gpus,  # 메인 LLM에 사용할 GPU 수
    main_llm_max_instances,  # 메인 LLM의 최대 인스턴스 수
    database_name,  # 모델 레지스트리 데이터베이스 이름
    schema_name,  # 모델 레지스트리 스키마 이름
    hugging_face_token,  # Hugging Face API 토큰
    external_access_integrations  # 외부 접근 통합 이름
)

In [None]:
# 임베딩 LLM을 SPCS에 배포
embedding_llm_on_spcs = deploy_embedding_llm_to_spcs(
    embedding_llm_model_name,  # 배포할 임베딩 LLM 모델 이름
    snowpark_session,  # Snowpark 세션
    embedding_llm_compute_pool,  # 임베딩 LLM을 배포할 컴퓨트 풀
    embedding_llm_num_gpus,  # 임베딩 LLM에 사용할 GPU 수
    embedding_llm_max_instances,  # 임베딩 LLM의 최대 인스턴스 수
    database_name,  # 모델 레지스트리 데이터베이스 이름
    schema_name,  # 모델 레지스트리 스키마 이름
    hugging_face_token,  # Hugging Face API 토큰
    external_access_integrations  # 외부 접근 통합 이름
)

In [None]:
# 메인 및 임베딩 LLM의 SPCS에서의 공개 URL 가져오기
def get_llm_endpoint_urls(snowparksession, main_llm, embedding_llm):
    try:
        # 메인 LLM 서비스 이름 가져오기
        main_llm_service_name = main_llm['details']['service_info']['name']
        # 서비스의 엔드포인트를 보여주는 쿼리 생성
        show_services_query = "SHOW ENDPOINTS IN SERVICE " + main_llm_service_name
        # 쿼리 실행 결과 가져오기
        show_services_query_result = snowparksession.sql(show_services_query)
        service = show_services_query_result.collect()[0]  # 첫 번째 서비스 정보 추출
        spcs_service_url = service['ingress_url']  # 서비스의 인그레스 URL 가져오기
        spcs_service_url_full = "https://" + spcs_service_url  # 전체 URL 생성
        print("채팅 완료 LLM URL: " + spcs_service_url_full)  # 메인 LLM URL 출력
    except:
        # URL 생성 중 오류 발생 시 메시지 출력
        print("채팅 완료 URL이 없습니다. 공개 URL 생성에 1분 정도 걸릴 수 있습니다... 다시 시도해 주세요.")

    try:
        # 임베딩 LLM 서비스 이름 가져오기
        embedding_llm_service_name = embedding_llm['details']['service_info']['name']
        # 서비스의 엔드포인트를 보여주는 쿼리 생성
        show_services_query = "SHOW ENDPOINTS IN SERVICE " + embedding_llm_service_name
        # 쿼리 실행 결과 가져오기
        show_services_query_result = snowparksession.sql(show_services_query)
        service = show_services_query_result.collect()[0]  # 첫 번째 서비스 정보 추출
        spcs_service_url = service['ingress_url']  # 서비스의 인그레스 URL 가져오기
        spcs_service_url_full = "https://" + spcs_service_url  # 전체 URL 생성
        print("텍스트 임베딩 LLM URL: " + spcs_service_url_full)  # 임베딩 LLM URL 출력
    except:
        # URL 생성 중 오류 발생 시 메시지 출력
        print("텍스트 임베딩 URL이 없습니다. 공개 URL 생성에 1분 정도 걸릴 수 있습니다... 다시 시도해 주세요.")

In [None]:
# LLM 엔드포인트 URL 가져오기
get_llm_endpoint_urls(snowpark_session, main_llm_on_spcs, embedding_llm_on_spcs)