In [29]:
import snowflake.snowpark.functions as F
from snowflake.snowpark.functions import udf
from snowflake.snowpark.types import VariantType
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import StructType, StructField, FloatType
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
import os
import json

In [2]:
connection_parameters = {
    "account": os.getenv("SNOWFLAKE_ACCOUNT"),
    "user": os.getenv("SNOWFLAKE_USER"),
    "password": os.getenv("SNOWFLAKE_PASSWORD"),
    "schema": os.getenv("SNOWFLAKE_SCHEMA"),
    "database": os.getenv("SNOWFLAKE_DATABASE"),
    "role": os.getenv("SNOWFLAKE_ROLE"),
    "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
}

session = Session.builder.configs(connection_parameters).create()


In [3]:

print(f"Current Database and schema: {session.get_fully_qualified_current_schema()}")
print(f"Current Warehouse: {session.get_current_warehouse()}")

Current Database and schema: "MLOPS"."ADVERTISING"
Current Warehouse: "COMPUTE_WH"


In [4]:
ad_df = session.table("ADVERTISING")


In [5]:
# Stage for storing the trained model without specifying file format
session.sql("""
CREATE OR REPLACE STAGE ml_models
""").collect()



[Row(status='Stage area ML_MODELS successfully created.')]

In [9]:
session.sql(
    f"ALTER WAREHOUSE {session.get_current_warehouse()[1:-1]} SET WAREHOUSE_SIZE=XSMALL;"
).collect()

[Row(status='Statement executed successfully.')]

In [39]:

create_procedure_sql = """
CREATE OR REPLACE PROCEDURE train()
  RETURNS VARIANT
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.11
  PACKAGES = ('snowflake-snowpark-python', 'scikit-learn', 'joblib')
  HANDLER = 'main'
AS $$
import os
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split, GridSearchCV
from joblib import dump

def main(session):
  df = session.table('ADVERTISING').to_pandas()
  X = df[['TV', 'RADIO', 'NEWSPAPER']]
  y = df['SALES']
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
  
  numeric_features = ['TV', 'RADIO', 'NEWSPAPER']
  numeric_transformer = Pipeline(steps=[('poly', PolynomialFeatures()), ('scaler', StandardScaler())])
  preprocessor = ColumnTransformer(transformers=[('num', numeric_transformer, numeric_features)])
  pipeline = Pipeline(steps=[('preprocessor', preprocessor), ('classifier', LinearRegression(n_jobs=-1))])

  # Define parameter grid for GridSearchCV
  param_grid = {
      'preprocessor__num__poly__degree': [2, 3],
      'classifier__fit_intercept': [True, False]
  }

  model = GridSearchCV(pipeline, param_grid=param_grid, n_jobs=-1, cv=10)
  model.fit(X_train, y_train)
  
  model_file = os.path.join('/tmp', 'model.joblib')
  dump(model, model_file)
  session.file.put(model_file, "@ml_models", overwrite=True)
  
  return {"Best parameters": model.best_params_, "R2 score on Train": model.score(X_train, y_train), "R2 score on Test": model.score(X_test, y_test)}
$$;
"""
session.sql(create_procedure_sql).collect()

[Row(status='Function TRAIN successfully created.')]

In [None]:
# Execute the stored procedure to train the model
session.sql("CALL train()").show()


In [None]:
session.sql(
    f"ALTER WAREHOUSE {session.get_current_warehouse()[1:-1]} SET WAREHOUSE_SIZE=LARGE;"
).collect()


In [51]:
from snowflake.snowpark.functions import udf
import snowflake.snowpark.types as T

# Define the UDF function
def predict_sales(tv: float, radio: float, newspaper: float) -> float:
    import os
    import sys
    from joblib import load
    import pandas as pd
    
    # Specify the import directory for the Snowflake stage files
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    model_path = os.path.join(import_dir, 'model.joblib')
    model = load(model_path)
    input_data = pd.DataFrame([[tv, radio, newspaper]], columns=['TV', 'RADIO', 'NEWSPAPER'])
    prediction = model.predict(input_data)[0]
    
    return float(prediction)

# Register the UDF
session.udf.register(
    func=predict_sales, 
    name="predict_sales", 
    stage_location="@ml_models",
    input_types=[T.FloatType(), T.FloatType(), T.FloatType()],
    return_type=T.FloatType(),
    replace=True, 
    is_permanent=True, 
    imports=['@ml_models/model.joblib'],
    packages=['scikit-learn', 'pandas', 'joblib']
)

<snowflake.snowpark.udf.UserDefinedFunction at 0x76c59e949d10>

In [52]:
from snowflake.snowpark.functions import col
import snowflake.snowpark.functions as F
advertising_df = session.table('ADVERTISING')
predicted_sales_df = advertising_df.select(
    col('TV'),
    col('RADIO'),
    col('NEWSPAPER'),
    F.call_udf('predict_sales', col('TV'), col('RADIO'), col('NEWSPAPER')).alias('PREDICTED_SALES')
)


In [54]:
predicted_sales_df.show()

------------------------------------------------------
|"TV"   |"RADIO"  |"NEWSPAPER"  |"PREDICTED_SALES"   |
------------------------------------------------------
|230.1  |37.8     |69.2         |21.886417875690817  |
|44.5   |39.3     |45.1         |10.372262452131155  |
|17.2   |45.9     |69.3         |9.113870015216659   |
|151.5  |41.3     |58.5         |18.388258741022366  |
|180.8  |10.8     |58.4         |16.125779196227914  |
|8.7    |48.9     |75.0         |8.805982700924098   |
|57.5   |32.8     |23.5         |10.576290497207951  |
|120.2  |19.6     |11.6         |13.689984286660831  |
|8.6    |2.1      |1.0          |5.743650332055633   |
|199.8  |2.6      |21.2         |16.197702901058705  |
------------------------------------------------------



Bad pipe message: %s [b'0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7\r\nHost: localhost:39381\r\nUs', b'-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.']
Bad pipe message: %s [b'0.0 Safari/537.36\r\nAccept-Encoding: gzip, defla']
Bad pipe message: %s [b', br, zstd\r\nAccept-Language: en-GB,en-US;q=0.9,en;q=0.8,de-DE;q=0.7,de;q=0.6,az-AZ;q=0.5,az;q=0.4\r\nCache-Control: m', b'-age=0\r\nReferer: https://github.com/\r\nX-Request-ID: 71d2a66a6d2b1980a9952c5c330abb83\r\nX-Real-IP:']
Bad pipe message: %s [b'0.240.0.10\r\nX-Forwarded-Port: 4', b'\r\nX-Forwarded-Scheme: https\r\nX-Original-URI: /\r\nX-S', b'eme: https\r\nDNT: 1\r\nsec-fetch-site: cross-site\r\nsec-fetch-mode: navigate\r\nsec-fetch-dest: document']
Bad pipe message: %s [b'sec-ch-ua: "']
Bad pipe message: %s [b't/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"\r\nsec-ch-ua-mobi', b': ?0\r\nsec-ch-ua-platform: "Windows"\r\npr