![](../../jpg/stock_small.jpg)

The third notebook builds on what you learned in part 1 and 2. It implements an end-to-end [ML use-case](#ML-integration-Using-Weka-Machine-Learning) including data ingestion, ETL/ELT transformations, model training, model scoring, and result visualization.



# Prerequisites

To get started you need a Snowflake account and read/write access to a database. If you do not have a Snowflake account, you can sign up for a [free trial](https://signup.snowflake.com/). It doesn't even require a credit card.


# Quick Start

First, we have to set up the environment for our notebook. The instructions for setting up the environment are in the Snowpark documentation in section [configuring-the-jupyter-notebook-for-snowpark](https://docs.snowflake.com/en/developer-guide/snowpark/quickstart-jupyter.html#configuring-the-jupyter-notebook-for-snowpark).


## Step 1

Configure the notebook to use a Maven repository for a library that Snowpark depends on.

In [None]:
import sys.process._
val osgeoRepo = coursierapi.MavenRepository.of("https://repo.osgeo.org/repository/release")
interp.repositories() ++= Seq(osgeoRepo)

## Step 2

Create a directory (if it doesn't exist) for temporary files created by the [REPL](https://ammonite.io/#Ammonite-REPL) environment. To avoid any side effects from previous runs, we also delete any files in that directory.

**Note: Make sure that you have the operating system permissions to create a directory in that location.**

**Note: If you are using multiple notebooks, you’ll need to create and configure a separate REPL class directory for each notebook.**

In [None]:
import ammonite.ops._
import ammonite.ops.ImplicitWd._

// This folder is used to store generated REPL classes, which will later be used in UDFs.
// Please provide an empty folder path. This is essential for Snowpark UDFs to work
val replClassPath = pwd+"/repl_classes"

// Delete any old files in the directory.
import sys.process._
s"rm -rf $replClassPath" !

// Create the REPL class folder.
import sys.process._
s"mkdir -p $replClassPath" !

## Step 3

Configure the compiler for the Scala REPL. This does the following:
- Configures the compiler to generate classes for the REPL in the directory that you created earlier.
- Configures the compiler to wrap code entered in the REPL in classes, rather than in objects.
- Adds the directory that you created earlier as a dependency of the REPL interpreter.

In [None]:
// Generate all REPL classes in REPL class folder
interp.configureCompiler(_.settings.outputDirs.setSingleOutput(replClassPath))
interp.configureCompiler(_.settings.Yreplclassbased.value = true)
interp.load.cp(os.Path(replClassPath))

## Step 4
Import the Snowpark library from Maven.

In [None]:
import $ivy.`com.snowflake:snowpark:0.8.0`

To create a session, we need to authenticate ourselves to the Snowflake instance. Though it might be tempting to just override the authentication variables below with hard coded values, it's not considered best practice to do so. If you  share your version of the notebook, you might disclose your credentials by mistake to the recipient. Even worse, if you upload your notebook to a public code repository, you might advertise your credentials to the whole world. To prevent that, you should keep your credentials in an external file (like we are doing here).

Then, update your credentials in that file and they will be saved on your local machine. Even better would be to switch from user/password authentication to [private key authentication](https://docs.snowflake.com/en/user-guide/key-pair-auth.html). 

Copy the credentials template file creds/template_credentials.txt to creds/credentials.txt and update the file with your credentials. Put your key files into the same directory or update the location in your credentials file. 

In [None]:
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._

val session = Session.builder.configFile("creds/credentials.txt").create

## Step 5
Add the Ammonite kernel classes as dependencies for your UDF.

In [None]:
def addClass(session: Session, className: String): String = {
  var cls1 = Class.forName(className)
  val resourceName = "/" + cls1.getName().replace(".", "/") + ".class"
  val url = cls1.getResource(resourceName)
  val path = url.getPath().split(":").last.split("!").head
  session.addDependency(path)
  path
}
addClass(session, "ammonite.repl.ReplBridge$")
addClass(session, "ammonite.interp.api.APIHolder")
addClass(session, "pprint.TPrintColors")

## Step 6

For this exercise we need these additional libraries.
- [plotl-scala (Plotly for Scala)](https://github.com/alexarchambault/plotly-scala)
- [Weka Machine Learning)](https://www.cs.waikato.ac.nz/ml/weka/index.html)


In [None]:
import $ivy.`nz.ac.waikato.cms.weka:weka-stable:3.8.5`
import $ivy.`org.plotly-scala::plotly-almond:0.8.1`

# ML integration Using Weka Machine Learning

The following example is based on the [Kaggle Customer Churn Dataset](https://www.kaggle.com/blastchar/telco-customer-churn). The dataset provides information about Telco customers and the goal is to predict customer churn, i.e. is a customer at risk of cancelling their service. To predict customer churn, we will use the [Weka Machine Learning Library](https://waikato.github.io/weka-wiki/).  

The example is structured in 6 steps.

- [Define configuration parameters](#Configuration)
- [Data preparation](#Data-Preparation)
- [Build user defined functions to top of the Weka API](#User-Defined-Functions)
- [Train a model using a classification algorithm (locally)](#Model-Training): 
- [Bulk scoring on dataset in Snowflake (remote)](#Scoring)
- [Visualize result (confusion matrix)](#Result-Visualization)

For ease of use, the dataset has been included in this repo but could be downloaded from Kaggle as well. 

## Configuration

The code in this example is fully configurable. 

- Tables: Import/Clean/Train/Test/OutTraining; Tables are fully qualified 
- Exclusion List : Columns to be excluded from the prediction/inference step
- Key Column: Column name uniquely identifying a a row to be scored
- Predict Column: Column to be predicted
- Model additional Parameters: Name of the Attribute Relation File Format (ARFF), Model Type (Algorithm), Number of Splits, Name & Location of the file holding the serialized model

This example is pre-configured but you could bring your own data, change the parameters listed above, and start building your own models.

In [None]:
// Parameters for run:
val trainDb="DEMO_DB"
val trainSchema="SANDBOX_RFEHRMANN"
val importTable = List(trainDb,trainSchema,"TELCO_IMPORT")
val cleanTable = List(trainDb,trainSchema,"TELCO_CLEAN")
val trainTable = List(trainDb,trainSchema,"TELCO_TRAIN")
val testTable = List(trainDb,trainSchema,"TELCO_TEST")
val outTable = List(trainDb,trainSchema,"TELCO_OUT" )
val trainExcludeCols = List("CUSTOMERID")
val scoreKeyColumn = "CUSTOMERID"
val predictColumn = "CHURN"
val modelTypeName = "trees.J48"
val modelARFFName = "telco_churn"
val modelTrainingSplits = 2             
val persistedModelFilename = "TelcoModelOut"
val persistedModelPath = "extraJars"
val persistedModel = persistedModelPath + "/" + persistedModelFilename
/////////////////////////////////////////////////////////////////////////////////////////

## Data Preparation

Data prep can be a very time consuming process when building an ML pipeline. And this is not just in terms of time spent for for the actual processing but also in terms of developer time spent on building the process. In the following cells, you can see how easy it is to perform the necessary steps of loading data, cleansing the data, and  splitting the data into training and test partitions. 

### Data Import

The data import steps assumes that the dataset is stored in CSV format and that it is stored in the directory data/telco. Change the location if your files are stored in a different location. The datafile, in this case *WA_Fn-UseC_-Telco-Customer-Churn.csv.gz*, is uploaded into a Snowflake user stage via a *put* command. Importing the file into Snowflake is as easy as reading the file into a Snowflake DataFrame and writing the DatasFrame into a Snowflake table. All we need is the file structure, i.e. table schema, and the file location.

In [None]:

import com.snowflake.snowpark.types._

session.sql("""
     put file://data/telco/WA_Fn-UseC_-Telco-Customer-Churn.csv.gz @~/upload/ overwrite=true
  """).show()

val TelcoSchema=StructType(
  Seq(
      StructField("CUSTOMERID",StringType,true),
      StructField("GENDER",StringType,true),
      StructField("SENIORCITIZEN",StringType,true),
      StructField("PARTNER",StringType,true),
      StructField("DEPENDENTS",StringType,true),
      StructField("TENURE",StringType,true),
      StructField("PHONESERVICE",StringType,true),
      StructField("MULTIPLELINES",StringType,true),
      StructField("INTERNETSERVICE",StringType,true),
      StructField("ONLINESECURITY",StringType,true),
      StructField("ONLINEBACKUP",StringType,true),
      StructField("DEVICEPROTECTION",StringType,true),
      StructField("TECHSUPPORT",StringType,true),
      StructField("STREAMINGTV",StringType,true),
      StructField("STREAMINGMOVIES",StringType,true),
      StructField("CONTRACT",StringType,true),
      StructField("PAPERLESSBILLING",StringType,true),
      StructField("PAYMENTMETHOD",StringType,true),
      StructField("MONTHLYCHARGES",StringType,true),
      StructField("TOTALCHARGES",StringType,true),
      StructField("CHURN",StringType,true)
  )
)
val importDf = session
   .read
   .option("skip_header", 1)
   .schema(TelcoSchema)
   .csv("@~/upload/WA_Fn-UseC_-Telco-Customer-Churn.csv.gz")

importDf
   .write
   .mode(SaveMode.Overwrite)
   .saveAsTable(importTable)


### Transformations and Feature Generation

Transformation and feature generation usually require many more steps and effort. In this particular case, the data is already pretty clean and one-hot-encoding is automatically done by Weka. Weka takes all string variables and automatically one-hot-encodes all values. The only requirement is that string values can not include spaces. Numbers (currently of datatype string in the import table) need to be cast to number types (i.e. double, integer, ...), otherwise Weka would create one-hot-encoded features. 

The cell below does exactly that. It replaces spaces, and casts numbers to *Integer* and *Double* types. 

In [None]:
val cleanDf=session.table(importTable)
   .select(
         replace(col("CUSTOMERID"),lit(' '),lit('-')).name("CUSTOMERID")
         ,replace(col("GENDER"),lit(' '),lit('-')).name("GENDER")
         ,col("SENIORCITIZEN").cast(IntegerType).name("SENIORCITIZEN")
         ,replace(col("PARTNER"),lit(' '),lit('-')).name("PARTNER")
         ,replace(col("DEPENDENTS"),lit(' '),lit('-')).name("DEPENDENTS")
         ,col("TENURE").cast(IntegerType).name("TENURE")
         ,replace(col("PHONESERVICE"),lit(' '),lit('-')).name("PHONESERVICE")
         ,replace(col("MULTIPLELINES"),lit(' '),lit('-')).name("MULTIPLELINES")
         ,replace(col("INTERNETSERVICE"),lit(' '),lit('-')).name("INTERNETSERVICE")
         ,replace(col("ONLINESECURITY"),lit(' '),lit('-')).name("ONLINESECURITY")
         ,replace(col("ONLINEBACKUP"),lit(' '),lit('-')).name("ONLINEBACKUP")
         ,replace(col("DEVICEPROTECTION"),lit(' '),lit('-')).name("DEVICEPROTECTION")
         ,replace(col("TECHSUPPORT"),lit(' '),lit('-')).name("TECHSUPPORT")
         ,replace(col("STREAMINGTV"),lit(' '),lit('-')).name("STREAMINGTV")
         ,replace(col("STREAMINGMOVIES"),lit(' '),lit('-')).name("STREAMINGMOVIES")
         ,replace(col("CONTRACT"),lit(' '),lit('-')).name("CONTRACT")
         ,replace(col("PAPERLESSBILLING"),lit(' '),lit('-')).name("PAPERLESSBILLING")
         ,replace(col("PAYMENTMETHOD"),lit(' '),lit('-')).name("PAYMENTMETHOD")           
         ,col("MONTHLYCHARGES").cast(DoubleType).name("MONTHLYCHARGES")
         ,replace(col("TOTALCHARGES"),lit(' '),lit('0')).cast(DoubleType).name("TOTALCHARGES")           
         ,replace(col("CHURN"),lit(' '),lit('-')).name("CHURN") )

cleanDf
  .write
  .mode(SaveMode.Overwrite)
  .saveAsTable(cleanTable)

### Sample

For training and testing purposes we split the dataset into a: 

- training dataset (66% of the data)
- test dataset (34% of the data)

and write both datasets back into Snowflake as tables. 

For the purpose of this example, we use a random split. 

In [None]:
val weights:Array[Double]=Array(66.0,34.0)
val allDataSplitDf=session.table(cleanTable)
     .randomSplit(weights)

allDataSplitDf(0)
   .write
   .mode(SaveMode.Overwrite)
   .saveAsTable(trainTable)

allDataSplitDf(1)
   .write
   .mode(SaveMode.Overwrite)
   .saveAsTable(testTable)


## User Defined Functions

In comparison to Part2 of this series, the code for these UDFs is much more advanced. Code is split into two major objects. The code that executes on the client (local) machine, and the code that executes on the Snowflake (remote) side.
From a high level perspective we need to take the training data, serialize it (getArffRows), pass it as a parameter to the training step (trainAndSaveModel)

### Remote Side Helper Functions

On the Snowflake (remote) side we only score the model. Model training is done on the client (docker container running the notebook server). Therefore, the remote class is very straightforward. There is just one important detail. The two files needed for scoring, i.e. the model file and the ARFF header file, should be loaded only once during the initialization of the WekaRemoteHelper class. You can find additional detail on this pattern in the Snowflake documentation for writing [UDF initialization code](https://docs.snowflake.com/en/developer-guide/snowpark/creating-udfs.html#writing-initialization-code-for-a-udf).

In [None]:
class WekaRemoteHelper (persistedModel:String) extends Serializable {
    import java.io.BufferedInputStream
    import java.io.ByteArrayInputStream
    import java.io.ObjectInputStream
    import java.io.BufferedReader
    import java.io.StringReader

    import weka.classifiers.Classifier
    import weka.core.Instances

    private lazy val arffHdr:String = {
        val persistedModelResource = "/" + persistedModel + ".arff"
        val sourceArff = new BufferedInputStream(getClass.getResourceAsStream(persistedModelResource))
        Stream.continually(sourceArff.read).takeWhile(-1 !=).map(_.toChar).mkString
    }

    private lazy val cls: Classifier = {
        val sourceMdlResource =  "/" + persistedModel + ".mdl"
        val sourceMdl = new BufferedInputStream(getClass.getResourceAsStream(sourceMdlResource))
        val trainedModel: Array[Byte] = Stream.continually(sourceMdl.read).takeWhile(-1 !=).map(_.toByte).toArray
        new ObjectInputStream(new ByteArrayInputStream(trainedModel))
               .readObject().asInstanceOf[Classifier]
    }

    def scoreRow (rowArff: String) : (String)  = {
        val scoreThis = new Instances(new BufferedReader(new StringReader(arffHdr + rowArff)))
        scoreThis.setClassIndex(scoreThis.numAttributes() - 1)
        // Only a single row (instance (0)) to score per UDF call
        val res = cls.classifyInstance(scoreThis.instance(0))
        scoreThis.classAttribute().value(res.asInstanceOf[Int])
    }
    
}

### Client Side Helper Functions

The Weka client side helper Object includes:
- model building functions and sub-functions (trainAndSaveModel, calculateAccuracy, crossValidationSplit, classify, saveModel, loadModel)
- ARFF serialization functions (getArffHeader, getArffRows)
- row serialization function (concatAllCol)
- the model building steps and serialization functions for the ARFF headerand dat rows

In [None]:
object WekaClientHelper {
    import java.io.BufferedOutputStream
    import java.io.FileOutputStream
    import java.io.ByteArrayOutputStream
    import java.io.ObjectInputStream
    import java.io.ByteArrayInputStream
    import java.io.StringReader
    import java.io.BufferedReader
    import java.io.BufferedInputStream
    import java.io.BufferedInputStream
    import java.io.FileInputStream

    import java.util.Base64
    import java.util.ArrayList

    import weka.classifiers.Classifier
    import weka.classifiers.Evaluation
    import weka.classifiers.evaluation.NominalPrediction
    import weka.classifiers.evaluation.NumericPrediction
    import weka.classifiers.functions.GaussianProcesses
    import weka.classifiers.rules.DecisionTable
    import weka.classifiers.rules.PART
    import weka.classifiers.trees.DecisionStump
    import weka.classifiers.functions.Logistic
    import weka.classifiers.bayes.NaiveBayesUpdateable
    import weka.classifiers.trees.J48
    import weka.classifiers.trees.RandomForest
    import weka.classifiers.evaluation.Prediction
    import weka.classifiers.functions.LinearRegression

    import weka.core.DenseInstance
    import weka.core.Instance
    import weka.core.Instances

    import java.security.MessageDigest

    import com.snowflake.snowpark.SaveMode.Overwrite

    def md5(s: String) = {
      MessageDigest.getInstance("MD5").digest(s.getBytes)
    }

    def concatAllCol(df : DataFrame, trainExcludeCols : List[String] ) : Column = {
        val colList = df.drop(trainExcludeCols).schema.map(c => c.dataType.typeName match {
              case "Long"   => coalesce(callBuiltin("to_varchar", df.col(c.name)), lit('?'))
              case "Double" => coalesce(callBuiltin("to_varchar", df.col(c.name)), lit('?'))
              case "Date"   => coalesce(callBuiltin("to_varchar", df.col(c.name)), lit('?'))
              case "String" => coalesce(df.col(c.name),lit('?'))
              case _        => throw new UnsupportedOperationException("Column Type Not Handled Yet")         
          } 
        )
        concat_ws(lit(","), colList:_*)
    }

    def getArffHeader(df : DataFrame, resArr : Array[Seq[Any]], label : String) : String = {
        val header = (0 to df.schema.length - 1)
          .map(col => "@attribute "+df.schema.names(col) + {
              // Construct dictionary of categorical values for all STRING data, or tag NUMERIC data, in header
              df.schema.fields(col).dataType.typeName match {
                case "Long"   => " NUMERIC"
                case "Double" => " NUMERIC"
                case "Date"   => " DATE yyyy-MM-dd"
                case "String" => resArr.map( r => r(col)).distinct.mkString(" { ", ",", " }")
                case _        => throw new UnsupportedOperationException("Column Type Not Handled Yet")
              }
          })
          .mkString("\n")

        // return final header String
        "@relation " + label + "\n" + header + "\n@data\n"
    }

    def getArffRows(resArr : Array[Seq[Any]]) : String = {
      // Construct comma-delimited ARFF body of rows
      // Render any NULLs as ? for ARFF
      resArr.map(r => r.map(
        c => if ( c == null ) "?" else c.toString())
        .mkString(","))
        .mkString("\n")
    }      
   
    def classify(model : Classifier, trainingSet : Instances, testingSet : Instances)
        : (Evaluation, Array[Byte]) = {
            
        val evaluation = new Evaluation(trainingSet)
        model.buildClassifier(trainingSet)
        evaluation.evaluateModel(model, testingSet)
        val os = new ByteArrayOutputStream()
        weka.core.SerializationHelper.write(os, model)
        val bArrayOut = os.toByteArray()
        (evaluation, bArrayOut)
    }


    def crossValidationSplit(data : Instances, numberOfFolds : Int) : Array[Array[Instances]] = {
        val split = Array.ofDim[Instances](2, numberOfFolds)
        for (i <- (0 to numberOfFolds-1)) {
            split(0)(i) = data.trainCV(numberOfFolds, i)
            split(1)(i) = data.testCV(numberOfFolds, i)
        }
        split
    }

    def calculateAccuracy(predictions : ArrayList[Prediction]) : Double = {
        var correct : Int = 1
        predictions.forEach( p => {
            val np = p.asInstanceOf[NominalPrediction]
            if (np.predicted() == np.actual()) correct += 1
        })
        100.0 * correct / predictions.size()
    } 

    def trainAndTestWithData(modelType : String, inData : String, nSplits : Int) 
        : (String, Array[Byte]) = {

        //val data = new Instances(new BufferedReader(new StringReader(inData)))
        val inputString = new StringReader(inData)
        val dataFile = new BufferedReader(inputString)
        val data = new Instances(dataFile)

        data.setClassIndex(data.numAttributes() - 1)

        // Do N-split cross validation
        val split = crossValidationSplit(data, nSplits)
        // Separate split into training and testing arrays
        val (trainingSplits, testingSplits) = (split(0), split(1))

        val model  = modelType match {
            case "trees.J48" => new J48()
            case "trees.RandomForest" => new RandomForest()
            case "functions.Logistic" =>  new Logistic()
            case "bayes.NaiveBayesUpdateable" =>  new NaiveBayesUpdateable()
            case "functions.LinearRegression" => new LinearRegression();
        }

        var predictions = new ArrayList[Prediction]()
        var finalModel = new Array[Byte](0)

        for (i <- (0 to trainingSplits.length-1)) {
            val (eval, modelBytes) = classify(model, trainingSplits(i), testingSplits(i))
            predictions.addAll(eval.predictions())
            finalModel = modelBytes
            // Uncomment to see the summary for each training-testing pair.
            //System.out.println(models[j].toString());
        }

        val accuracy = calculateAccuracy(predictions)

        println("Accuracy of " + model.getClass().getSimpleName() + ": "
              + f"$accuracy%.2f%%"
              //+ String.format("%.2f%%", accuracy)
              + "\n---------------------------------")
        val outval = "InputLength="+inData.length() + ", Accuracy of " + model.getClass().getSimpleName() + "= " + f"$accuracy%.2f%%" + " "
        val json_1 = "{ \"model_stats\": \"" + outval + "\"}"

        (json_1, finalModel)
    }
    
    def trainAndSaveModel ( trainingSplits : Int, arffRelationName : String, modelTypeName : String, trainExcludeCols : List [String], trainTable : List[String], persistedModel : String ) : (String,  String) = {
      // Retrieve all columns from the table except the row-key 
      val df = session.table(trainTable).drop(trainExcludeCols)
      // get results as a scala array of sequences
      val resArr = df.collect().map( r => r.toSeq)
      val arffHdr = getArffHeader(df, resArr, arffRelationName)
      val (resultStr, modelBinary) = trainAndTestWithData(modelTypeName, arffHdr + getArffRows(resArr),  trainingSplits)
      // Serialize the model into the resources folder
      saveModel(persistedModel, modelBinary, arffHdr)
      (resultStr,  arffHdr)
    } 

    def saveModel(persistedModel : String, modelBinary : Array[Byte], arffHdr : String) : Unit = {
      val targetMdl = new BufferedOutputStream( new FileOutputStream(persistedModel +".mdl") )
      try modelBinary.foreach( targetMdl.write(_) ) finally targetMdl.close
      val targetArff = new BufferedOutputStream( new FileOutputStream(persistedModel +".arff") )
      try arffHdr.foreach( targetArff.write(_) ) finally targetArff.close
    }

    def loadModel(persistedModelFileName : String ) : (Array[Byte], String) = {
      val sourceMdl = new BufferedInputStream( new FileInputStream(persistedModelFileName+".mdl") )
      val mdl = Stream.continually(sourceMdl.read).takeWhile(-1 !=).map(_.toByte).toArray
      val sourceArff = new BufferedInputStream( new FileInputStream(persistedModelFileName+".arff") )
      val arff = Stream.continually(sourceArff.read).takeWhile(-1 !=).map(_.toChar).mkString

      (mdl, arff)
    }
}

## Model Training

The model training method gets these parameters.

- Number of training data splits
- Name of the ARFF Relation (only visible in the ARFF Header)
- List of columns to exclude from training
- Name of the training table
- Path name of the serialized model file

It returns:

- Model stats like file length and accuracy
- ARFF Header



In [None]:

import ammonite.ops._
import ammonite.ops.ImplicitWd._

// This folder is used to store generated repl classes, which will later be used in UDF.
// Please provide an empty folder path.This is essential for Snowpark UDF to work
val extraJars = pwd+"/"+persistedModelPath
// Create the repl class folder
import sys.process._
s"mkdir -p $extraJars" !

import sys.process._
s"rm -rf $extraJars/*" !

import ammonite.ops._

In [None]:

val (trainingResults, myArffHdr) = WekaClientHelper
   .trainAndSaveModel(modelTrainingSplits,modelARFFName, modelTypeName, trainExcludeCols , trainTable, persistedModel )


After completing the training step, the model has been stored on local storage and it's ready for bundling into a JAR file. In addition to bundling the files into a JAR file, the following step also creates a dependency to the newly created JAR file so Snowpark will automatically upload the JAR file to Snowflake.

In [None]:


%%('bash, "-c", "jar cf $PPATH/$NAME.jar $PPATH/$NAME.arff $PPATH/$NAME.mdl", PPATH=persistedModelPath, NAME=persistedModelFilename)
%%('bash, "-c", "jar tf $PPATH/$NAME.jar ", PPATH=persistedModelPath, NAME=persistedModelFilename)

session.addDependency(persistedModel+".jar")  // External JAR

## Scoring

The final step before scoring the test dataset is to instruct Snowpark to create a new UDF so the scoring function is available in Snowflake. In contrast to the UDFs in Part 2, this time we follow the initialization pattern mentioned above. This pattern is useful when a UDF has some initialization code (like in this example when the model file needs to be loaded) and ensures that the initialization code is executed only once.

In [None]:
addClass(session,"weka.classifiers.Classifier")

val scoreCls = new WekaRemoteHelper(persistedModel)

val scoreUdf = session.udf.registerTemporary((r:String) => scoreCls.scoreRow(r))


We are now ready to score the test dataset. Scoring the training data is now as easy as:

- creating a DataFrame for the test table
- adding a column for the serialized representation of the data row
- adding a column for the result returned by the scoring function
- storing the DataFrame in a table in Snowflake

In [None]:
val testDf = session.table(testTable)

val resultDf = testDf
    .withColumns(Seq("ARFFDATA", "PREDICTION"), Seq(WekaClientHelper.concatAllCol(testDf,trainExcludeCols), scoreUdf(WekaClientHelper.concatAllCol(testDf,trainExcludeCols))))
    .select(scoreKeyColumn, "ARFFDATA", predictColumn, "PREDICTION")

resultDf.show()

val res = resultDf.write.mode(SaveMode.Overwrite).saveAsTable(outTable)

## Result Visualization

The easiest way to evaluate the results from the scoring step is to create a confusion matrix. This query uses a pivot function. It result is a 2 x 2 matrix with values for positive scores (yes-yes/no-no) and false positives (yes-no/no-yes)

In [None]:
import com.snowflake.snowpark.types._

val pivotValues = Seq("No","Yes")

val resultDf=session.table(outTable)
    .select(col(predictColumn),col("PREDICTION"))
    .pivot(col(predictColumn),pivotValues)
    .agg(count(col(predictColumn)))
    .sort(col("PREDICTION"))

resultDf
    .schema

val data=resultDf
    .collect()
    .map(r=>r.toSeq.drop(1).map(r=>r.toString().toDouble))

In [None]:
//
import plotly._, plotly.element._, plotly.layout._, plotly.Almond._

plot(Seq(
    Heatmap()
      .withZ(
        Seq(
          data(0),
          data(1),
        )
      )
      .withColorscale(ColorScale.NamedScale("YIGnBu"))
      .withX(pivotValues)
      .withY(pivotValues))
    ,Layout(
         title="Confusion Matrix",
         xaxis=Axis(title=predictColumn),
         yaxis=Axis(title="Prediction"))
  )

# Conclusion

This concludes this three part series on Snowpark on Snowflake. In this series, we have used the:

- Quick Start guide to set up a Jupyter Notebook for Snowpark on Snowflake 
- Snowpark DataFrames to simplify ETL/ELT/data engineering tasks
- Simple and complex UDFs to execute custom code and third party libraries directly in Snowflake
- Visualize results through a powerful graphics package 

We have seen how Snowpark brings the ability to execute complex ETL/ELT pipelines directly in Snowflake via the Snowflake DataFrame API. In contrast to processing data with Spark, which requires a Spark or EMR cluster, Snowpark does not require additional infrastructure. Lastly, with the ability to code in Scala we bring custom logic as well as third party libraries to the data rather than the data to the processing tier.