# House Price Prediction

In [21]:
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestRegressor

from snowflake.snowpark import Session
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
session = Session.builder.configs(SnowflakeLoginOptions("sanju")).create()
session.use_warehouse("ADHOC_WH")
session.use_database("DATA_ALCHEMIST")
session.use_schema("CORTEX")


Load CSV into Snowflake

In [None]:
# Load the data
housing_df = pd.read_csv("../../data/housing.csv")
housing_df.columns = housing_df.columns.str.upper()
session.write_pandas(df=housing_df,
                     table_name="HOUSING",
                     overwrite=True,
                     auto_create_table=True)

Connect to Snowflake and get 10% data from HOUSING table

In [10]:
df = session.table("HOUSING").sample(frac=0.10).to_pandas()
df.shape

X = df.loc[:, df.columns != 'MEDIAN_HOUSE_VALUE']
y = df['MEDIAN_HOUSE_VALUE']
X, y
# numerical and categorical attributes
num_attribs = ['LONGITUDE', 'LATITUDE', 'HOUSING_MEDIAN_AGE', 'TOTAL_ROOMS',
       'TOTAL_BEDROOMS', 'POPULATION', 'HOUSEHOLDS', 'MEDIAN_INCOME']  
cat_attribs = ['OCEAN_PROXIMITY']  

Training function

In [None]:
def fit_pipeline(X, y, cat_attribs, num_attribs):
    # numerical pipeline
    num_pipeline = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy="median")),
    ('std_scaler', StandardScaler())
    ])

    # categorical pipeline
    cat_pipeline = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])

    # ColumnTransformer
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', num_pipeline, num_attribs),  
            ('cat', cat_pipeline, cat_attribs)  
        ])

    # Create the final pipeline with the preprocessor and the model
    model = Pipeline([
        ('preprocessor', preprocessor),
        ('model', RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1))
    ])
    # Above code is equivalent to below:
    # model = Pipeline([
    #     ('preprocessor', ColumnTransformer([
    #         ( "num", Pipeline([
    #             ('imputer', SimpleImputer(strategy="median")),
    #             ('std_scaler', StandardScaler())]), num_attribs),
    #         ( "cat", Pipeline(steps=[
    #             ('imputer', SimpleImputer(strategy='most_frequent')),
    #             ('onehot', OneHotEncoder(handle_unknown='ignore'))]), cat_attribs)])),
    #     ('model', RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1))])
    model.fit(X, y)
    return model

model = fit_pipeline(X, y, cat_attribs, num_attribs)
# Sample prediction
print(f"Actual Value: {y[0]} \nPredicted Value {model.predict(X)[0]}")
model


Actual Value: 155500.0 
Predicted Value 154155.01


Save model in an internal stage

In [28]:
def save_model(session, model, stage_name, stage_path, model_file):
    import io, joblib

    input_stream = io.BytesIO()
    input_stream.name = model_file
    joblib.dump(model, input_stream)

    model_path = f'{stage_name}/{stage_path}/{model_file}'
    session.file.put_stream(input_stream, model_path, overwrite=True)
    return model_path

Generic Training function

In [29]:

def train_model(
    session: Session,           # Snowflake session object
    training_table: str,        # Table name that has the data to be used for training and test
    target_col: str,            # name of the target column
    save_stage: str) -> dict:   # name of the stage to save the fitted pipline object
    
    from datetime import datetime
    import numpy as np
    from snowflake.snowpark import types as T
    from sklearn.metrics import mean_squared_error
    from sklearn.metrics import classification_report, confusion_matrix

    now = datetime.now() # Get the date and time when this is strated
    
    # Get the training table and split into a training and test Snowpark DataFrames
    snowdf_train, snowdf_test = session.table(training_table).random_split([0.8, 0.2], seed=82) # use seed to make the split repeatable

    # Get the categorical and numeric column names
    cat_attribs = [c.name for c in snowdf_train.schema.fields
        if (type(c.datatype) == T.StringType) & (c.name != target_col)]
    numeric_types = [T.DecimalType, T.LongType, T.DoubleType, T.FloatType, T.IntegerType]
    num_attribs = [c.name for c in snowdf_train.schema.fields
        if (type(c.datatype) in numeric_types) & (c.name != target_col)]

    # save the train and test sets as time stamped tables in Snowflake 
    #table_suffix = now.strftime("%Y%m%d%H%M%S")
    train_table_name = training_table + '_TRAIN'
    snowdf_train.write.mode("overwrite").save_as_table(train_table_name)
    test_table_name = training_table + '_TEST'
    snowdf_test.write.mode("overwrite").save_as_table(test_table_name)

    pd_train = snowdf_train.to_pandas()
    
    X_train = pd_train.loc[:, pd_train.columns != target_col]
    y_train = pd_train[target_col]
    
    # Fit the model (pipeline)
    housing_price_prediction_model = fit_pipeline(X_train, y_train, cat_attribs, num_attribs)

    # save the full pipeline including the model
    # Save the model to stage
    save_path = now.strftime("%Y-%m-%d-%H%M%S")
    object_saved_path = save_model(session, housing_price_prediction_model,
        f"@{save_stage}/models", save_path, 'housing_price_RandForrest.joblib')

    # predict on the test set and return the root mean squared error (RMSE)
    pd_test = snowdf_test.to_pandas()
    
    X_test = pd_test.loc[:, pd_train.columns != target_col]
    y_test = pd_test[target_col]
    
    housing_predictions = housing_price_prediction_model.predict(X_test)
    lin_mse = mean_squared_error(y_test, housing_predictions)
    lin_rmse = np.sqrt(lin_mse)

    # Create a dict to return with test metrics and the path to the saved model pipeline
    ret_dict = {
        "MSE": lin_mse,
        "RMSE": lin_rmse,
        "model_path": object_saved_path,
        "train_table": train_table_name,
        "test_table": test_table_name }
    return ret_dict

Create and call stored proc for model training

In [30]:
session.clear_packages()
session.clear_imports()

session.add_packages('snowflake-snowpark-python', 'scikit-learn',
    'pandas', 'numpy', 'joblib', 'cachetools')

from snowflake.snowpark import functions as F
housing_price_prediction_sp = F.sproc(func=train_model, name="housing_price_prediction_sp" ,
    replace=True, is_permanent=True,
    stage_location="int_stage/sp", session=session)

ret = housing_price_prediction_sp(session, "HOUSING", "MEDIAN_HOUSE_VALUE", "int_stage")

import json
return_dict = json.loads(ret)
print(return_dict)

session.sql("ls @int_stage").show(max_width=150)



{'MSE': 2450365551.71328, 'RMSE': 49501.16717526244, 'model_path': '@int_stage/models/2025-04-29-154207/housing_price_RandForrest.joblib', 'test_table': 'HOUSING_TEST', 'train_table': 'HOUSING_TRAIN'}
--------------------------------------------------------------------------------------------------------------------------------------------------------
|"name"                                                                  |"size"    |"md5"                             |"last_modified"                |
--------------------------------------------------------------------------------------------------------------------------------------------------------
|int_stage/models/2025-04-29-144518/housing_price_RandForrest.joblib.gz  |27983248  |e38f31257d74a0a6dd56f9acdd7c2b58  |Tue, 29 Apr 2025 21:46:20 GMT  |
|int_stage/models/2025-04-29-153710/housing_price_RandForrest.joblib.gz  |27983248  |f078fe588811fbe526a12de0ff79f065  |Tue, 29 Apr 2025 22:38:12 GMT  |
|int_stage/models/2025-04-29-15385