## 1. Data Ingestion

The `diamonds` dataset has been widely used in data science and machine learning. We will use it to demonstrate Snowflake's native data science transformers in terms of database functionality and Spark & Pandas compatibility, using non-synthetic and statistically appropriate data that is well known to the ML community.

### Import Libraries

In [1]:
# [MB] To setup explicit connection
import os
from dotenv import load_dotenv
from snowflake.snowpark import Session

# Snowpark for Python
from snowflake.snowpark import Column
import snowflake.snowpark.functions as F
from snowflake.snowpark.types import DoubleType

In [2]:
# [MB] Load environment variables
load_dotenv()
USER = os.getenv('SNOWSQL_USER')
ACCOUNT = os.getenv('SNOWSQL_ACCOUNT')
PASSWORD = os.getenv('SNOWSQL_PWD')

### Setup and establish Secure Connection to Snowflake

Notebooks establish a Snowpark Session when the notebook is attached to the kernel. We create a new warehouse, database, and schema that will be used throughout this tutorial.

In [3]:
# [MB] Can't run SQL inside Jupyter notebook. Instead set in following cell.
# -- Using Warehouse, Database, and Schema created during Setup
# USE WAREHOUSE ML_HOL_WH;
# USE DATABASE ML_HOL_DB;
# USE SCHEMA ML_HOL_SCHEMA;

In [4]:
# [MB] Create a Snowpark session explicitly
connection_parameters = {
    "account": ACCOUNT,
    "user": USER,
    "password": PASSWORD,
    "role": "ML_MODEL_HOL_USER",
    "warehouse": "ML_HOL_WH",
    "database": "ML_HOL_DB",
    "schema": "ML_HOL_SCHEMA"
}
session = Session.builder.configs(connection_parameters).create()

# Get Snowflake Session object
# session = get_active_session()
# session.sql_simplifier_enabled = True

# Add a query tag to the session.
# session.query_tag = {
#     "origin":"sf_sit-is",
#     "name":"e2e_ml_snowparkpython",
#     "version":{"major":1, "minor":0,},
#     "attributes":{"is_quickstart":1}
# }

# Current Environment Details
print('Connection Established with the following parameters:')
print('User      : {}'.format(session.get_current_user()))
print('Role      : {}'.format(session.get_current_role()))
print('Database  : {}'.format(session.get_current_database()))
print('Schema    : {}'.format(session.get_current_schema()))
print('Warehouse : {}'.format(session.get_current_warehouse()))

Connection Established with the following parameters:
User      : "matthewbain"
Role      : "ML_MODEL_HOL_USER"
Database  : "ML_HOL_DB"
Schema    : "ML_HOL_SCHEMA"
Warehouse : "ML_HOL_WH"


### Use the Snowpark DataFrame Reader to read in data from the externally staged `diamonds` CSV file 

In setup.sql, we staged the `diamonds.csv` file from an external s3 bucket. Now, we can read it in.

For more information on loading data, see documentation on [snowflake.snowpark.DataFrameReader](https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrameReader.html).

In [5]:
# Create a Snowpark DataFrame that is configured to load data
# from the CSV file. We can now infer schema from CSV files.
diamonds_df = session.read.options({
    "field_delimiter": ",",
    "field_optionally_enclosed_by": '"',
    "infer_schema": True,
    "parse_header": True
}).csv("@DIAMONDS_ASSETS")

diamonds_df.to_pandas()

Unnamed: 0,carat,cut,color,clarity,depth,table,price,x,y,z
0,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
1,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
2,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
3,0.29,Premium,I,VS2,62.4,58.0,334,4.20,4.23,2.63
4,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75
...,...,...,...,...,...,...,...,...,...,...
53935,0.72,Ideal,D,SI1,60.8,57.0,2757,5.75,5.76,3.50
53936,0.72,Good,D,SI1,63.1,55.0,2757,5.69,5.75,3.61
53937,0.70,Very Good,D,SI1,62.8,60.0,2757,5.66,5.68,3.56
53938,0.86,Premium,H,SI2,61.0,58.0,2757,6.15,6.12,3.74


In [6]:
# Look at descriptive stats on the DataFrame
diamonds_df.describe().to_pandas()

Unnamed: 0,SUMMARY,carat,cut,color,clarity,depth,table,price,x,y,z
0,min,0.2,Fair,D,I1,43.0,43.0,326.0,0.0,0.0,0.0
1,count,53940.0,53940,53940,53940,53940.0,53940.0,53940.0,53940.0,53940.0,53940.0
2,stddev,0.474011,,,,1.432621,2.234491,3989.439738,1.121761,1.142135,0.705699
3,max,5.01,Very Good,J,VVS2,79.0,95.0,18823.0,10.74,58.9,31.8
4,mean,0.79794,,,,61.749405,57.457184,3932.799722,5.731157,5.734526,3.538734


In [7]:
diamonds_df.columns

['"carat"',
 '"cut"',
 '"color"',
 '"clarity"',
 '"depth"',
 '"table"',
 '"price"',
 '"x"',
 '"y"',
 '"z"']

### Data cleaning

First, let's force headers to uppercase using Snowpark DataFrame operations for standardization when columns are later written to a Snowflake table.

In [8]:
# Force headers to uppercase
assert diamonds_df

for colname in diamonds_df.columns:
    if colname == '"table"':
        new_colname = "TABLE_PCT"
    else:
        new_colname = str.upper(colname)
    diamonds_df = diamonds_df.with_column_renamed(
        colname,
        new_colname
    )

diamonds_df.to_pandas()

Unnamed: 0,CARAT,CUT,COLOR,CLARITY,DEPTH,TABLE_PCT,PRICE,X,Y,Z
0,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
1,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
2,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
3,0.29,Premium,I,VS2,62.4,58.0,334,4.20,4.23,2.63
4,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75
...,...,...,...,...,...,...,...,...,...,...
53935,0.72,Ideal,D,SI1,60.8,57.0,2757,5.75,5.76,3.50
53936,0.72,Good,D,SI1,63.1,55.0,2757,5.69,5.75,3.61
53937,0.70,Very Good,D,SI1,62.8,60.0,2757,5.66,5.68,3.56
53938,0.86,Premium,H,SI2,61.0,58.0,2757,6.15,6.12,3.74


Next, we standardize the category formatting for `CUT` using Snowpark DataFrame operations.

This way, when we write to a Snowflake table, there will be no inconsistencies in how the Snowpark DataFrame will read in the category names. Secondly, the feature transformations on categoricals will be easier to encode.

In [9]:
def normalize_values(column: str) -> Column:
    return F.upper(F.regexp_replace(F.col(column), '[^a-zA-Z0-9]+', '_'))


for col in ["CUT"]:
    diamonds_df = diamonds_df.with_column(col, normalize_values(col))

diamonds_df.to_pandas()

Unnamed: 0,CARAT,COLOR,CLARITY,DEPTH,TABLE_PCT,PRICE,X,Y,Z,CUT
0,0.23,E,SI2,61.5,55.0,326,3.95,3.98,2.43,IDEAL
1,0.21,E,SI1,59.8,61.0,326,3.89,3.84,2.31,PREMIUM
2,0.23,E,VS1,56.9,65.0,327,4.05,4.07,2.31,GOOD
3,0.29,I,VS2,62.4,58.0,334,4.20,4.23,2.63,PREMIUM
4,0.31,J,SI2,63.3,58.0,335,4.34,4.35,2.75,GOOD
...,...,...,...,...,...,...,...,...,...,...
53935,0.72,D,SI1,60.8,57.0,2757,5.75,5.76,3.50,IDEAL
53936,0.72,D,SI1,63.1,55.0,2757,5.69,5.75,3.61,GOOD
53937,0.70,D,SI1,62.8,60.0,2757,5.66,5.68,3.56,VERY_GOOD
53938,0.86,H,SI2,61.0,58.0,2757,6.15,6.12,3.74,PREMIUM


Check the schema.

In [10]:
list(diamonds_df.schema)

[StructField('CARAT', DecimalType(3, 2), nullable=True),
 StructField('COLOR', StringType(), nullable=True),
 StructField('CLARITY', StringType(), nullable=True),
 StructField('DEPTH', DecimalType(3, 1), nullable=True),
 StructField('TABLE_PCT', DecimalType(3, 1), nullable=True),
 StructField('PRICE', LongType(), nullable=True),
 StructField('X', DecimalType(4, 2), nullable=True),
 StructField('Y', DecimalType(4, 2), nullable=True),
 StructField('Z', DecimalType(4, 2), nullable=True),
 StructField('CUT', StringType(), nullable=True)]

Finally, let's cast the decimal types to DoubleType() since DecimalType() isn't support by Snowflake ML at the moment.

In [11]:
assert diamonds_df

for colname in ["CARAT", "X", "Y", "Z", "DEPTH", "TABLE_PCT"]:
    diamonds_df = diamonds_df.with_column(
        colname,
        diamonds_df[colname].cast(DoubleType())
    )

diamonds_df.to_pandas()

Unnamed: 0,COLOR,CLARITY,PRICE,CUT,CARAT,X,Y,Z,DEPTH,TABLE_PCT
0,E,SI2,326,IDEAL,0.23,3.95,3.98,2.43,61.5,55.0
1,E,SI1,326,PREMIUM,0.21,3.89,3.84,2.31,59.8,61.0
2,E,VS1,327,GOOD,0.23,4.05,4.07,2.31,56.9,65.0
3,I,VS2,334,PREMIUM,0.29,4.20,4.23,2.63,62.4,58.0
4,J,SI2,335,GOOD,0.31,4.34,4.35,2.75,63.3,58.0
...,...,...,...,...,...,...,...,...,...,...
53935,D,SI1,2757,IDEAL,0.72,5.75,5.76,3.50,60.8,57.0
53936,D,SI1,2757,GOOD,0.72,5.69,5.75,3.61,63.1,55.0
53937,D,SI1,2757,VERY_GOOD,0.70,5.66,5.68,3.56,62.8,60.0
53938,H,SI2,2757,PREMIUM,0.86,6.15,6.12,3.74,61.0,58.0


In [12]:
list(diamonds_df.schema)

[StructField('COLOR', StringType(), nullable=True),
 StructField('CLARITY', StringType(), nullable=True),
 StructField('PRICE', LongType(), nullable=True),
 StructField('CUT', StringType(), nullable=True),
 StructField('CARAT', DoubleType(), nullable=True),
 StructField('X', DoubleType(), nullable=True),
 StructField('Y', DoubleType(), nullable=True),
 StructField('Z', DoubleType(), nullable=True),
 StructField('DEPTH', DoubleType(), nullable=True),
 StructField('TABLE_PCT', DoubleType(), nullable=True)]

### Write cleaned data to a Snowflake table

In [13]:
diamonds_df.write.mode('overwrite').save_as_table('diamonds')

In the next notebook, we will perform data transformations with the Snowflake ML Preprocessing API for feature engineering. 