In [None]:
%pip install tensorflow[and-cuda]==2.14

%pip install snowflake-snowpark-python==1.25.0 snowflake-ml-python==1.7.5 snowflake==1.0.5

In [None]:
import tensorflow as tf
print(tf.__version__)

In [None]:
use role accountadmin;
create schema if not exists tensorflow_data.feature_store;
grant usage on database tensorflow_data to sysadmin;
grant usage on schema tensorflow_data.public to sysadmin;
grant usage on schema tensorflow_data.feature_store to sysadmin;
grant create dynamic table, create tag, create stage, create view on schema tensorflow_data.feature_store to sysadmin;
grant select on table tensorflow_data.public.PENGUINS to sysadmin;


![](https://www.tensorflow.org/tutorials/customization/images/penguins_ds_species.png)

In [None]:
use database TENSORFLOW_DATA;
use schema public;

-- Define Sequence
CREATE or replace SEQUENCE penguin_seq START = 1 INCREMENT = 1;

create or replace table PENGUINS(
    ID NUMBER DEFAULT penguin_seq.NEXTVAL,
    SPECIES VARCHAR(20),
	ISLAND VARCHAR(20),
	BILL_LENGTH_MM NUMBER(10,2),
	BILL_DEPTH_MM NUMBER(10,2),
	FLIPPER_LENGTH_MM NUMBER(10,2),
	BODY_MASS_G NUMBER(10,2),
	SEX VARCHAR(20),
	YEAR NUMBER(4,0)	
)

In [None]:
COPY INTO "TENSORFLOW_DATA"."PUBLIC"."PENGUINS"
FROM (
    SELECT penguin_seq.NEXTVAL, $1, $2, $3, $4, $5, $6, $7, $8
    FROM '@"TENSORFLOW_DATA"."PUBLIC"."RAW_DATA"'
)
FILES = ('penguins.csv')
FILE_FORMAT = (
    TYPE=CSV,
    NULL_IF = ('NA', 'N/A', 'na', 'n/a')
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=TRUE,
    FIELD_OPTIONALLY_ENCLOSED_BY='"',
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
)
ON_ERROR=ABORT_STATEMENT;

In [None]:
select * from TENSORFLOW_DATA.PUBLIC.PENGUINS;

In [None]:
-- Define event table
CREATE EVENT TABLE IF NOT EXISTS tensorflow_data.public.events;
-- Set event table for the database
ALTER DATABASE tensorflow_data SET EVENT_TABLE = tensorflow_data.public.events;
-- Set database logging level
ALTER DATABASE tensorflow_data SET LOG_LEVEL = INFO;

In [None]:
import pandas as pd
import numpy as np
from snowflake.ml.data.data_connector import DataConnector

# Initialize Snowflake session
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
from snowflake.ml.feature_store import FeatureStore, CreationMode, Entity, FeatureView

fs = FeatureStore(
    session=session,
    database='tensorflow_data',
    name='feature_store',
    default_warehouse='QUICKSTART_WH',
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST,
)

In [None]:
entity = Entity(
    name="PENGUIN_ID",
    join_keys=["id"],
    desc="Penguin ID"
)

fs.register_entity(entity)

# Show our newly created entity
fs.list_entities().show()

In [None]:
create or replace stage TENSORFLOW_DATA.FEATURE_STORE.UDF_STAGE
DIRECTORY = ( ENABLE = TRUE )
COMMENT = 'Stage for UDF functions'; 

In [None]:
from snowflake.snowpark.functions import udf
from snowflake.snowpark.types import IntegerType

@udf(return_type=IntegerType(),name="penguin_species_to_int", is_permanent=True, replace=True, stage_location="@TENSORFLOW_DATA.FEATURE_STORE.UDF_STAGE")  # Specify the return data type
def penguin_species_to_int(species: str):
    if species == "Adelie":
        return 0
    elif species == "Gentoo":
        return 1
    elif species == "Chinstrap":
        return 2
    else:  # Handle other species or NULLs appropriately
        return -1  # Or raise an exception, or return NULL, depending on your needs

In [None]:
from snowflake.snowpark.functions import udtf
from snowflake.snowpark.types import StructType, StructField, FloatType, StringType, DecimalType
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

@udtf(output_schema=StructType([
    StructField("t_species", StringType(20)),
    StructField("t_sex", StringType(20)),
    StructField("t_bill_length_mm", DecimalType(10,2)),
    StructField("t_bill_depth_mm", DecimalType(10,2)),
    StructField("t_flipper_length_mm", DecimalType(10,2)),
    StructField("t_body_mass_g", DecimalType(10,2)),
    StructField("t_body_condition_score", FloatType())
]), input_types=[StringType(20), StringType(20), DecimalType(10,2), DecimalType(10,2), DecimalType(10,2), DecimalType(10,2)],
    name="body_condition_pca", 
    replace=True,
    is_permanent=True,
    stage_location="@TENSORFLOW_DATA.FEATURE_STORE.UDF_STAGE",    
    packages=['pandas', 'numpy', 'scikit-learn'])
class BodyConditionPCA:
    def __init__(self):
        self.rows = []
        
    def process(self, species: str, sex: str, bill_length: float, bill_depth: float, 
                flipper_length: float, body_mass: float):
        # Only add row if all values are not None/NULL
        if all(v is not None for v in [species, sex, bill_length, bill_depth, flipper_length, body_mass]):
            self.rows.append([species, sex, bill_length, bill_depth, flipper_length, body_mass])
        
    def end_partition(self):
        if not self.rows:
            return
            
        df = pd.DataFrame(self.rows, columns=['species', 'sex', 'bill_length_mm', 'bill_depth_mm', 
                                            'flipper_length_mm', 'body_mass_g'])
        
        # Standardize features
        features = ['bill_length_mm', 'bill_depth_mm', 'flipper_length_mm', 'body_mass_g']
        scaler = StandardScaler()
        standardized = scaler.fit_transform(df[features])
        
        # Apply PCA
        pca = PCA(n_components=1)
        scores = pca.fit_transform(standardized).flatten()
        
        # Return results
        for idx, row in df.iterrows():
            yield (row['species'], row['sex'], row['bill_length_mm'], row['bill_depth_mm'],
                  row['flipper_length_mm'], row['body_mass_g'], float(scores[idx]))


In [None]:
SELECT
  t.*
FROM
  TENSORFLOW_DATA.PUBLIC.PENGUINS as p,
  TABLE (
    BODY_CONDITION_PCA (
      p.SPECIES,
      p.SEX,
      p.BILL_LENGTH_MM,
      p.BILL_DEPTH_MM,
      p.FLIPPER_LENGTH_MM,
      p.BODY_MASS_G
    )
  ) as t
ORDER BY
  t.t_body_condition_score DESC;

In [None]:
# Import required functions
from snowflake.snowpark import functions as F

# Start with the base table
df = session.table("TENSORFLOW_DATA.PUBLIC.PENGUINS")

# Apply the UDTF and sort results
df_with_pca = (df.join_table_function("BODY_CONDITION_PCA", 
                                     df["SPECIES"], 
                                     df["SEX"], 
                                     df["BILL_LENGTH_MM"], 
                                     df["BILL_DEPTH_MM"], 
                                     df["FLIPPER_LENGTH_MM"], 
                                     df["BODY_MASS_G"])
               .select(["t_SPECIES", "t_SEX", "t_BILL_LENGTH_MM", "t_BILL_DEPTH_MM", 
                       "t_FLIPPER_LENGTH_MM", "t_BODY_MASS_G", "t_BODY_CONDITION_SCORE"])
               .sort(F.col("t_BODY_CONDITION_SCORE").desc())
)

# Convert to pandas DataFrame for display
df_with_pca.to_pandas()


In [None]:
from snowflake.snowpark import functions as F

table_name = 'TENSORFLOW_DATA.PUBLIC.PENGUINS'

#Droping rows with NULL values
snowpark_df = session.table(table_name).dropna()

feature_df = snowpark_df.select(                    
                   F.col("ID"), 
                   F.col("BILL_LENGTH_MM").cast("float").alias("BILL_LENGTH_MM"),
                   F.col("BILL_DEPTH_MM").cast("float").alias("BILL_DEPTH_MM"),
                   (F.col("FLIPPER_LENGTH_MM").cast("float")/10).alias("FLIPPER_LENGTH_MM"), #normalize
                   (F.col("BODY_MASS_G").cast("float")/100).alias("BODY_MASS_G"), #normalize
                    F.col("SPECIES")                   
                  )

feature_df.show(n=5)

In [None]:
pen_fv = FeatureView(
    name="Penguin_Data",
    entities = [entity],
    feature_df = feature_df,
    refresh_freq= '5 minutes',
    desc="Penguin Data managed feature view"
)

pen_nn_fv = fs.register_feature_view(pen_fv, version="1", overwrite=True)

In [None]:
import streamlit as st
# For Views
#session.sql(f"""SELECT GET_DDL('VIEW', '{pen_nn_fv.fully_qualified_name()}');""").collect()[0][0]

# For Dynamic Tables 
str_sql = session.sql(f"""SELECT GET_DDL('DYNAMIC_TABLE', '{pen_nn_fv.fully_qualified_name()}');""").collect()[0][0]

st.text(str_sql)

In [None]:
#Datasets are new Snowflake schema-level objects specially designed for machine learning workflows. 
#Snowflake Datasets hold collections of data organized into versions, 
#where each version holds a materialized snapshot of your data with guaranteed immutability, 
#efficient data access, and interoperability with popular deep learning frameworks.

spine_df = session.table("TENSORFLOW_DATA.PUBLIC.PENGUINS").dropna().select(F.col("ID"))

training_dataset = fs.generate_dataset(
    name="PENGUIN_TRAINING_DATASET",
    spine_df = spine_df,
    features=[pen_nn_fv],
)

In [None]:
show datasets in database;

In [None]:
select * from events order by timestamp desc limit 100;

**Model Training**

In [None]:
import ray
from ray import train
from ray.train.tensorflow import TensorflowTrainer
from ray.train import Checkpoint, ScalingConfig
import tensorflow as tf
from tensorflow import keras

# Make Ray output less verbose
context = ray.data.DataContext().get_current() 
context.execution_options.verbose_progress = False
context.enable_operator_progress_bars = False
context.enable_progress_bars = False

In [None]:
#Loading training Data Set
data_connector = DataConnector.from_dataset(training_dataset)

# Getting Ray Data Set
penguins_ray_ds = data_connector._ingestor.ray_ds

In [None]:
features = ["BILL_LENGTH_MM", "BILL_DEPTH_MM", "FLIPPER_LENGTH_MM" , "BODY_MASS_G"]
label = 'SPECIES'

def one_hot_encode_format_input(row):
    row['features'] = [float(row['BILL_LENGTH_MM']) , float(row['BILL_DEPTH_MM']), float(row['FLIPPER_LENGTH_MM'])
                       , float(row['BODY_MASS_G'])]
    
    match row['SPECIES']:
        case 'Adelie':
            row['label'] = [1, 0, 0]
        case 'Gentoo':
            row['label'] = [0, 1, 0]
        case 'Chinstrap':
            row['label'] = [0, 0, 1]
    return row

#selecting only features and label arrays
penguins_ray_ds = penguins_ray_ds.map(one_hot_encode_format_input).select_columns(['features','label'])

In [None]:
known_to_ray_gpus = ray.cluster_resources()

gpu_count = int(known_to_ray_gpus.get("GPU",0))

print("Total GPUS known to Ray:", gpu_count)

In [None]:
# Split into train test split
train_rs, test_rs = penguins_ray_ds.train_test_split(test_size=0.2, shuffle=True, seed=42)  # Split into 2 datasets

In [None]:
import tensorflow
from tensorflow import keras
from tensorflow.keras import models
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

# Set random seed for reproducability
tensorflow.random.set_seed(0)


![](https://www.tensorflow.org/tutorials/customization/images/full_network_penguin.png)

In [None]:
import os
import tempfile, json
import tensorflow as tf
from tensorflow.keras import optimizers

from ray import train
from ray.train import Checkpoint, ScalingConfig
from ray.train.tensorflow import TensorflowTrainer

penguin_classes = ['Adelie', 'Gentoo', 'Chinstrap']
features = ["BILL_LENGTH_MM", "BILL_DEPTH_MM", "FLIPPER_LENGTH_MM" , "BODY_MASS_G"]
label = 'SPECIES_INT'

def build_model():
    # Define a classifier network
    hl = 10 # Number of hidden layer nodes

    model = Sequential()
    model.add(Dense(hl, input_dim=len(features), activation='relu'))
    model.add(Dense(hl, input_dim=hl, activation='relu'))
    model.add(Dense(len(penguin_classes), input_dim=hl, activation='softmax'))    
    return model

def train_loop_per_worker(config):
    print(config)
    dataset_shard = train.get_dataset_shard("train")
          
    strategy = tf.distribute.MultiWorkerMirroredStrategy()
    
    with strategy.scope():
        model = build_model()
        #hyper-parameters for optimizer
        learning_rate = 0.001
        opt = optimizers.Adam(lr=learning_rate)
        
        model.compile(             
            loss="categorical_crossentropy", 
            optimizer=opt, metrics=["accuracy"]
        )
        
        print(model.summary())

    tf_dataset = dataset_shard.to_tf(        
        feature_columns='features',
        label_columns='label',
        batch_size=10
    )
        
    for epoch in range(config["num_epochs"]):
        history = model.fit(tf_dataset)

        #saving model for later loading
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            model.save(os.path.join(temp_checkpoint_dir, "model.keras"))
            checkpoint_dict = os.path.join(temp_checkpoint_dir, "checkpoint.json")
            with open(checkpoint_dict, "w") as f:
                json.dump({"epoch": epoch}, f)
            checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)

            train.report({"loss": history.history["loss"][0], "accuracy": history.history["accuracy"][0]}, checkpoint=checkpoint)

trainer = TensorflowTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config=ScalingConfig(num_workers= gpu_count, use_gpu=True),
    datasets={"train": train_rs},
    train_loop_config={"num_epochs": 50},
)

result = trainer.fit()

In [None]:
trained_model = tf.keras.models.load_model(result.checkpoint.path+'/model.keras')

In [None]:
test_data = test_rs.to_tf(feature_columns="features", label_columns="label")

res = trained_model.predict(x=test_data)

np.set_printoptions(suppress=True)

# Get Inference for Test dataset
inference_dataset = tf.data.Dataset.from_tensor_slices(res)

# Combine Inference with Test dataset
combine_dataset = tf.data.Dataset.zip((test_data,tf.data.Dataset.from_tensor_slices(tf.convert_to_tensor(res))))

# Convert to pandas for Display
df = pd.DataFrame()

for element in combine_dataset:
    test_species = penguin_classes[tf.argmax(element[0][1][0])]
    predicted_species = penguin_classes[tf.argmax(element[1])]

    same_result = (test_species == predicted_species)
    row = {'test_species':test_species,'test_res': tf.argmax(element[0][1][0]).numpy(),
           'predicted_species': predicted_species, 'predicted_res': tf.argmax(element[1]).numpy(),'same_result': same_result   }
    df = pd.concat([df, pd.DataFrame([row])])


df


In [None]:
# Tensorflow doesn't have a built-in confusion matrix metric, so we'll use SciKit-Learn
import numpy as np
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt


# Plot the confusion matrix

cm = confusion_matrix(df['test_res'].to_numpy(), df['predicted_res'].to_numpy())

plt.imshow(cm, interpolation="nearest", cmap=plt.cm.Blues)
plt.colorbar()
tick_marks = np.arange(len(penguin_classes))
plt.xticks(tick_marks, penguin_classes, rotation=85)
plt.yticks(tick_marks, penguin_classes)
plt.xlabel("Predicted Species")
plt.ylabel("Actual Species")
plt.show()


In [None]:
from snowflake.ml.registry import Registry

reg = Registry(session=session, database_name="TENSORFLOW_DATA", schema_name="FEATURE_STORE")

In [None]:

# get one element from the dataset
element_input = next(test_data.take(1).as_numpy_iterator())

from snowflake.ml.model import model_signature

# get signature
predict_signature = model_signature.infer_signature(input_data=element_input[0], output_data=element_input[1])

mv = reg.log_model(
    trained_model, 
    model_name ='tf_pengiun_model', 
    #version_name='v2', # Auto generate version name
    pip_requirements=["tensorflow[and-cuda]==2.14"],
    signatures={"predict": predict_signature},
    options={'relax_version': False},    
    target_platforms=['SNOWPARK_CONTAINER_SERVICES']
)
 