In [None]:
# Install required libraries
!pip install snowflake-connector-python pandas
!pip install pandas 

In [None]:
# UNSUPPORTED BY SNOWFLAKE - CUSTOMER SUPPORTED ONLY

# Copyright (c) 2025 Snowflake Inc. All rights reserved.

In [None]:
# Import necessary libraries
#import snowflake.connector
import pandas as pd

# Use the new Container Services keypair authentication method
from generateJWT import JWTGenerator

# other supporting libraries
from datetime import timedelta
import argparse
import logging
import sys
import requests
import json


In [None]:
account = '<org>-<account>'
user = 'username'.upper()
role = 'SPCS_ROLE'
private_key_file_path = '/Users/rsa/rsa_key.pem'
endpoint = '<generated-endpoint>.snowflakecomputing.app'
endpoint_path = ''

In [None]:
# Helper functions that connect to Snowflake
logger = logging.getLogger(__name__)
renewal_delay = 54
lifetime = 59

def _get_token():
  token = JWTGenerator(account, user, private_key_file_path, timedelta(minutes=lifetime),
            timedelta(minutes=renewal_delay)).get_token()
  logger.info("Key Pair JWT: %s" % token)
  return token

def token_exchange(token, role, endpoint, snowflake_account_url, snowflake_account):
  scope_role = f'session:role:{role}' if role is not None else None
  scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint
  data = {
    'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
    'scope': scope,
    'assertion': token,
  }
  logger.info(data)
  url = f'https://{snowflake_account}.snowflakecomputing.com/oauth/token'
  if snowflake_account_url:
    url =       f'{snowflake_account_url}/oauth/token'
  logger.info("oauth url: %s" %url)
  response = requests.post(url, data=data)
  logger.info("snowflake jwt : %s" % response.text)
  assert 200 == response.status_code, "unable to get snowflake token"
  return response.text

def connect_to_spcs(method, token, url, data=None):
  # Create a request to the ingress endpoint with authz.
  headers = {'Authorization': f'Snowflake Token="{token}"'}
  if method == 'POST':
    response = requests.post(f'{url}', headers=headers, data=json.dumps(data))
  else:
    response = requests.get(f'{url}', headers=headers)

  if response.status_code != 200:
    print(f"Error: {response.text}")
    return None

  if response.headers.get('content-type').startswith('application/json'):
    json_object = response.json()
    df = pd.DataFrame(json_object, index=[0])
  else:
    if "\n" in str(response.text):
      # clean up some data responses and convert to dataframe
      print("response.text: "+str( response.text))
      temp = response.text
      rows = [x.split() for x in temp.split("\\n") if x]
      df = pd.DataFrame(rows[1:], columns=rows[0])
    else:
      # convert string to json and then to dataframe
      df = response.text
      
  return df


In [None]:
# connection requests to use the JWT token
snowflake_account_url = 'https://'+account+'.snowflakecomputing.com'
token = _get_token()
snowflake_jwt = token_exchange(token,endpoint=endpoint, role=role,
                  snowflake_account_url=snowflake_account_url,
                  snowflake_account=account)


In [None]:
# reads the csv file using numpy
import numpy as np
df = np.loadtxt('Boston_Snowflake_NO_ID.csv', delimiter=',', skiprows=1)

# for testing, just use the first row, otherwise we could batch the data
df = df[:1]

In [None]:
import pprint
print("Input data")
pprint.pprint(df)

# convert the numpy array to a list of lists
data={"data": np.column_stack([range(df.shape[0]), df]).tolist()}

print("Payload for request")
pprint.pprint(data)

#send request to SPCS
spcs_url=f'https://{endpoint}/predict'
result = connect_to_spcs('POST', snowflake_jwt, spcs_url, data)

# display the result
print(result)


In [None]:
# Rather than the HTTP POST call, the SQL function for the model could be called using Snowpark

from snowflake.snowpark import Session

connection_parameters = {
  "account": "<org>-<account>",
  "user": "<username>",
  "role": "SPCS_ROLE",
  "database": "<database>",
  "schema": "<schema>",
  "warehouse": "xsmall",
  "password": "<password-or-ras-key>",
}

# Create a new session
session = Session.builder.configs(connection_parameters).create()

In [None]:
# get table to score
input_dataframe = session.read.table('<database>.<schema>.<table>')

In [None]:

%pip install snowflake-ml-python "snowflake-connector-python[pandas]" --quiet


In [None]:
# Install snowflake-ml-python
from snowflake.ml.registry import registry

# get the models function name for this model/version
reg = registry.Registry(session=session, database_name='<database>', schema_name='<schema>')
mv = reg.get_model('BOSTON_MODEL').version('GIANT_PUMA_1')
mv.run(input_dataframe, function_name='PREDICT')

In [None]:
results = mv.run(input_dataframe, function_name='PREDICT')

In [None]:
print(results.show())


In [None]:
import pandas as pd

In [None]:
# alternative method to call the model using SQL
sql = 'WITH mv AS MODEL "BOSTON_MODEL" VERSION "GIANT_PUMA_1" SELECT *,mv ! "PREDICT"(<column-names>) FROM <database>.<schema>.<table>'

In [None]:
results_sql = session.sql(sql)

In [None]:
print(results.show())
