In [None]:
# Package Imports
import dataiku
from dataiku import pandasutils as pdu
from dataiku.snowpark import DkuSnowpark

import pandas as pd
import torch
import importlib
import json
import subprocess
import requests
import logging

import transformers
from transformers import pipeline
from transformers import AutoTokenizer, AutoModelForCausalLM
from langchain.embeddings import HuggingFaceEmbeddings

from snowflake.ml.registry import model_registry
from snowflake.ml.model import deploy_platforms
from snowflake.ml.model.models import huggingface_pipeline
from snowflake.ml.model.models import llm
from snowflake.ml.model import custom_model
from snowflake.ml.model import model_signature

from snowflake.snowpark.functions import lit
import snowflake.connector

logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)

In [None]:
# Params to control model name, num gpus, max tokens, etc.

# The name of the Snowflake connection (in Dataiku Admin UI) that has access to deploy LLMs to the Snowflake Model Registry
snowflake_connection_name = "spcs-access-only"

# The name of the main LLM you want to deploy
main_llm_model_name = "ZEPHYRBeta" # or "LLAMA2" or "FALCON" or "ZEPHYRBeta" or "Phi2"

# SPCS compute pool to deploy the main LLM - note: must be set up on Snowflake side
main_llm_compute_pool = "DATAIKU_GPU_NV_S_MODEL_COMPUTE_POOL"

# The name of the embedding LLM you want to deploy
embedding_llm_model_name = "MiniLM-L6-v2" 

# SPCS compute pool to deploy the embedding LLM - note: must be set up on Snowflake side
embedding_llm_compute_pool = "DATAIKU_CPU_X64_XS_EMBED_COMPUTE_POOL"

# Snowflake Model Registry DB and schema where you want to register the LLM
database_name = "DATAIKU_SPCS"
schema_name = "MODEL_REGISTRY"

# Snowflake external access integration names
external_access_integrations = ["SNOWFLAKE_EGRESS_ACCESS_INTEGRATION"]

# Num GPUs available for main LLM
main_llm_num_gpus = 1
# Num SPCS container instances of the main LLM
main_llm_max_instances = 1

# Num GPUs available for embedding LLM
embedding_llm_num_gpus = 0
# Num SPCS container instances of the embedding LLM
embedding_llm_max_instances = 1

# Max new tokens for the main LLM to generate
max_new_tokens = 200

# Hugging face token
#hugging_face_token = "<YOUR_HF_TOKEN>"

hugging_face_token = dataiku.get_custom_variables()["hugging_face_token"] 

In [None]:
# Create the DSS wrapper around Snowpark
dku_snowpark = DkuSnowpark()
snowpark_session = dku_snowpark.get_session(snowflake_connection_name)

In [None]:
def deploy_main_llm_to_spcs(modelname, snowparksession, computepool, maxnewtokens, numgpus, maxinstances, databasename, schemaname, hftoken, externalaccessintegrations):
    """
    Deploy a chat complete LLM endpoint to SPCS using Snowpark ML
    """
    
    # Mapping of model name to the actual HuggingFace name, tokenizer (if required), and pip/conda dependencies
    model_name_hf_mapping = {
        
        "FALCON": {"hf_name": "tiiuae/falcon-7b-instruct",
                   "tokenizer": AutoTokenizer.from_pretrained("tiiuae/falcon-7b-instruct"),
                   "token": None,
                   "deployment_name": "falcon_7b_predict",
                   "pip_requirements": ["einops","snowflake-snowpark-python==1.9.0"],
                   "conda_dependencies": None},
                
        "LLAMA2": {"hf_name": "meta-llama/Llama-2-7b-chat-hf",
                   "tokenizer": None,
                   "token": hftoken,
                   "deployment_name": "llama2_7b_predict",
                   "pip_requirements": None,
                   "conda_dependencies": ["snowflake-snowpark-python==1.9.0"]},
        
        "ZEPHYRBeta": {"hf_name": "HuggingFaceH4/zephyr-7b-beta",
                       "tokenizer": None,
                       "token": None,
                       "deployment_name": "zephyr_beta_7b_predict",
                       "pip_requirements": None,
                       "conda_dependencies": ["snowflake-snowpark-python==1.9.0", "transformers==4.34.1"]},
        
        "Phi2": {"hf_name": "microsoft/phi-2",
                       "tokenizer": None,
                       "token": None,
                       "deployment_name": "phi2_predict",
                       "pip_requirements": None,
                       "conda_dependencies": ["snowflake-snowpark-python==1.9.0", "transformers==4.37.1"]}
        
    }
    
    hf_model_name = model_name_hf_mapping[modelname]["hf_name"]
    tokenizer = model_name_hf_mapping[modelname]["tokenizer"]
    token = model_name_hf_mapping[modelname]["token"]
    
    # Get a HuggingFace Pipeline Model  
    hf_model = huggingface_pipeline.HuggingFacePipelineModel(task = "text-generation", 
                                                             model = hf_model_name, 
                                                             tokenizer = tokenizer, 
                                                             token = token,
                                                             trust_remote_code = True, 
                                                             return_full_text = False, 
                                                             max_new_tokens = maxnewtokens,
                                                             device_map = "auto",
                                                             model_kwargs = {"load_in_8bit": True})
    
    # Get a Snowflake Model Registry (create if doesn't exist)
    registry = model_registry.ModelRegistry(session = snowparksession,
                                            database_name = databasename,
                                            schema_name = schemaname,
                                            create_if_not_exists = True)
    
    # Model version 1
    model_version = "1"
    
    # Delete a previous version of the model if it already exists in the registry
    try:
        registry.delete_model(model_name = modelname,
                              model_version = model_version)
    except:
        pass
        
    # Log the model to the Snowflake Model Registry
    hf_model_registry = registry.log_model(model_name = modelname,
                                           model_version = model_version,
                                           model = hf_model,
                                           pip_requirements = model_name_hf_mapping[modelname]["pip_requirements"],
                                           conda_dependencies = model_name_hf_mapping[modelname]["conda_dependencies"])
    
    # Deploy the model from the Registry to the SPCS compute pool chosen
    deployed_model = hf_model_registry.deploy(deployment_name = model_name_hf_mapping[modelname]["deployment_name"],
                              platform = deploy_platforms.TargetPlatform.SNOWPARK_CONTAINER_SERVICES,
                              options={"compute_pool": computepool,
                                       "num_gpus": numgpus, 
                                       "max_instances": maxinstances,
                                       "enable_ingress": True,
                                       "external_access_integrations": externalaccessintegrations
                                      })
            
    print("Deployed Main LLM to SPCS")
    
    return deployed_model

In [None]:
def deploy_embedding_llm_to_spcs(modelname, snowparksession, computepool, numgpus, maxinstances, databasename, schemaname, hftoken, externalaccessintegrations):
    """
    Deploy a chat complete LLM endpoint to SPCS using Snowpark ML
    """
    
    # Mapping of model name (only 1 written right now) to the actual HuggingFace name and pip/conda dependencies
    model_name_hf_mapping = {
        "MiniLM-L6-v2": {"hf_name": "sentence-transformers/all-MiniLM-L6-v2",
                         "deployment_name": "MiniLM_L6_v2_embed",
                         "pip_requirements": None,
                         "conda_dependencies": ["snowflake-snowpark-python==1.9.0", "transformers==4.34.1", 
                                                "sentence-transformers==2.2.2", "langchain"]
                        }
    }
    
    if modelname == "MiniLM-L6-v2":
        hf_model_name = "sentence-transformers/all-MiniLM-L6-v2"
    
    # Create a custom Embedding LLM Class to wrap around the LLM and conform it to SPCS standards
    class EmbeddingLLMCustom(custom_model.CustomModel):
        def __init__(self, context: custom_model.ModelContext) -> None:
            super().__init__(context)

            self.embeddings = HuggingFaceEmbeddings(
                model_name = model_name_hf_mapping[modelname]["hf_name"],
            )

        @custom_model.inference_api
        def predict(self, X: pd.DataFrame) -> pd.DataFrame:
            def _embed(input_text: str) -> str:

                query_result = self.embeddings.embed_query(input_text)

                return str(query_result)

            res_df = pd.DataFrame({"outputs": pd.Series.apply(X["inputs"], _embed)})
            return res_df
    
    # Get the custom LLM Class
    hf_model = EmbeddingLLMCustom(custom_model.ModelContext())   
        
    # Get a Snowflake Model Registry (create if doesn't exist)
    registry = model_registry.ModelRegistry(session = snowparksession,
                                            database_name = databasename,
                                            schema_name = schemaname,
                                            create_if_not_exists = True)
    
    # Model version 1
    model_version = "1"
    
    # Delete a previous version of the model if it already exists in the registry
    try:
        registry.delete_model(model_name = modelname,
                              model_version = model_version)
    except:
        pass
    
    # Log the model to the Snowflake Model Registry
    hf_model_registry = registry.log_model(
        model_name = modelname,
        model_version = model_version,
        model = hf_model,
        conda_dependencies = model_name_hf_mapping[modelname]["conda_dependencies"],
        signatures = {
            "predict": model_signature.ModelSignature(
                inputs = [model_signature.FeatureSpec(name="inputs", dtype=model_signature.DataType.STRING)],
                outputs = [model_signature.FeatureSpec(name="outputs", dtype=model_signature.DataType.STRING)],
            )
        }
    )
    
    # Deploy the model from the Registry to the SPCS compute pool chosen
    deployed_model = hf_model_registry.deploy(deployment_name = model_name_hf_mapping[modelname]["deployment_name"],
                                              platform = deploy_platforms.TargetPlatform.SNOWPARK_CONTAINER_SERVICES,
                                              options={"compute_pool": computepool,
                                                       "max_instances": maxinstances,
                                                       "num_gpus": numgpus,
                                                       "enable_ingress": True,
                                                       "external_access_integrations": externalaccessintegrations
                                                      })
    
    print("Deployed Embedding LLM to SPCS")
    
    return deployed_model

In [None]:
# Deploy the main LLM to SPCS
main_llm_on_spcs = deploy_main_llm_to_spcs(main_llm_model_name, snowpark_session, main_llm_compute_pool, 
                                           max_new_tokens, main_llm_num_gpus, main_llm_max_instances,
                                           database_name, schema_name, hugging_face_token, external_access_integrations)


In [None]:
# Deploy the embedding LLM to SPCS
embedding_llm_on_spcs = deploy_embedding_llm_to_spcs(embedding_llm_model_name, snowpark_session, 
                                                     embedding_llm_compute_pool, embedding_llm_num_gpus, 
                                                     embedding_llm_max_instances, database_name, schema_name, 
                                                     hugging_face_token, external_access_integrations)


In [None]:
# Get the public URLs of the main and embedding LLMs on SPCS
def get_llm_endpoint_urls(snowparksession, main_llm, embedding_llm):
    
    try:
        main_llm_service_name = main_llm['details']['service_info']['name']
        show_services_query = "SHOW ENDPOINTS IN SERVICE " + main_llm_service_name
        show_services_query_result = snowparksession.sql(show_services_query)
        service = show_services_query_result.collect()[0]
        spcs_service_url = service['ingress_url']
        spcs_service_url_full = "https://" + spcs_service_url
        print("Chat completion LLM URL: " + spcs_service_url_full)
    except:
        print("No chat complete URL. Can take a minute to generate the public URL...try again")
         
    try:
        embedding_llm_service_name = embedding_llm['details']['service_info']['name']
        show_services_query = "SHOW ENDPOINTS IN SERVICE " + embedding_llm_service_name
        show_services_query_result = snowparksession.sql(show_services_query)
        service = show_services_query_result.collect()[0]
        spcs_service_url = service['ingress_url']
        spcs_service_url_full = "https://" + spcs_service_url
        print("Text embedding LLM URL: " + spcs_service_url_full)
    except:
        print("No text embedding URL. Can take a minute to generate the public URL...try again")

In [None]:
# Get the LLM endpoint URLS
get_llm_endpoint_urls(snowpark_session, main_llm_on_spcs, embedding_llm_on_spcs)