## 1. Data Ingestion

The `diamonds` dataset has been widely used in data science and machine learning. We will use it to demonstrate Snowflake's native data science transformers in terms of database functionality and Spark & Pandas comportablity, using non-synthetic and statistically appropriate data that is well known to the ML community.



### Import Libraries

In [None]:
# Snowpark for Python
from snowflake.snowpark import Session
from snowflake.snowpark.version import VERSION
from snowflake.snowpark.types import StructType, StructField, DoubleType, StringType
import snowflake.snowpark.functions as F

# data science libs
import numpy as np

# misc
import json

In [3]:
pip install fosforio

Collecting fosforio
  Downloading https://files.pythonhosted.org/packages/39/85/527779b1003babb2deff0438e746d6b7ef8140d5c07f8d4992edcf758f3e/fosforio-1.0.1-py3-none-any.whl
Collecting pandas==2.0.0
[?25l  Downloading https://files.pythonhosted.org/packages/d0/70/7296ce76d838e0023edf3cc287a437b66957f4155678f514d82ddd6d1e1b/pandas-2.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.3MB)
[K     |████████████████████████████████| 12.3MB 16.5MB/s eta 0:00:01    |▎                               | 122kB 16.5MB/s eta 0:00:01
[?25hCollecting numpy>=1.21.0; python_version >= "3.10"
[?25l  Downloading https://files.pythonhosted.org/packages/4b/d7/ecf66c1cd12dc28b4040b15ab4d17b773b87fa9d29ca16125de01adb36cd/numpy-1.26.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.2MB)
[K     |████████████████████████████████| 18.2MB 37.3MB/s eta 0:00:01
[?25hCollecting tzdata>=2022.1
[?25l  Downloading https://files.pythonhosted.org/packages/65/58/f9c9e6be752e9fcb8b6a0ee9

In [1]:
from fosforio import snowflake

Connection manager service url initialised to http://fdc-project-manager:80/project-manager
If you need to update its value then update the variable CONNECTION_MANAGER_BASE_URL in os env.


In [8]:
pip install snowflake

Collecting snowflake
  Downloading https://files.pythonhosted.org/packages/1f/46/a456b8d3ea1f03d172503166e1f3d58c67bb5b3cfd73d32bdb6c55b4d885/snowflake-0.8.0-py3-none-any.whl
Collecting snowflake-legacy
  Downloading https://files.pythonhosted.org/packages/75/c9/bf69f07cf4f23fba13bd79e8e93b3b8fa7914fa8e0ef66e42332cc02b6d5/snowflake_legacy-0.8.0-py3-none-any.whl
Collecting snowflake-core==0.8.0
[?25l  Downloading https://files.pythonhosted.org/packages/6f/55/1053b2e8e000c8aa68a6ded471a4fcc209fef77071c8d86096586a749120/snowflake_core-0.8.0-py3-none-any.whl (443kB)
[K     |████████████████████████████████| 450kB 10.4MB/s eta 0:00:01
[?25hCollecting urllib3
[?25l  Downloading https://files.pythonhosted.org/packages/a2/73/a68704750a7679d0b6d3ad7aa8d4da8e14e151ae82e6fee774e6e0d05ec8/urllib3-2.2.1-py3-none-any.whl (121kB)
[K     |████████████████████████████████| 122kB 46.1MB/s eta 0:00:01
[?25hCollecting atpublic>=4
  Downloading https://files.pythonhosted.org/packages/5e/5e/3613e5aa17

In [1]:
conn = fosforio.snowflake.get_connection("Snowflake_Rakesh")
# Current Environment Details
##print('\nConnection Established with the following parameters:')
##print('User                        : {}'.format(snowflake_connection.dbusername))
##print('Role                        : {}'.format(session.get_current_role()))
##print('Database                    : {}'.format(session.get_current_database()))
##print('Schema                      : {}'.format(session.get_current_schema()))
##print('Warehouse                   : {}'.format(session.get_current_warehouse()))
##print('Snowflake version           : {}'.format(snowflake_environment[0][1]))
##print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

NameError: name 'fosforio' is not defined

### Establish Secure Connection to Snowflake

*Other connection options include Username/Password, MFA, OAuth, Okta, SSO. For more information, refer to the [Python Connector](https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-example) documentation.*

In [2]:
# Make a Snowpark Connection
# Create Snowflake Session object
connection_parameters = json.load(open('connection.json'))
session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True

snowflake_environment = session.sql('SELECT current_user(), current_version()').collect()
snowpark_version = VERSION

# Current Environment Details
print('\nConnection Established with the following parameters:')
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(session.get_current_role()))
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Snowflake version           : {}'.format(snowflake_environment[0][1]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))


Connection Established with the following parameters:
User                        : SIKHADAS
Role                        : "ACCOUNTADMIN"
Database                    : "ML_HOL_DB"
Schema                      : "ML_HOL_SCHEMA"
Warehouse                   : "ML_HOL_WH"
Snowflake version           : 8.9.2
Snowpark for Python version : 1.12.0


### Use the Snowpark DataFrame Reader to read in data from the externally staged `diamonds` CSV file 

In setup.sql, we staged the `diamonds.csv` file from an external s3 bucket. Now, we can read it in.

For more information on loading data, see documentation on [snowflake.snowpark.DataFrameReader](https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrameReader.html).




In [3]:
# Show the file before loading
session.sql("LS @DIAMONDS_ASSETS;").show()

# Create a Snowpark DataFrame that is configured to load data from the CSV file
# We can now infer schema from CSV files.
diamonds_df = session.read.options({"field_delimiter": ",",
                                    "field_optionally_enclosed_by": '"',
                                    "infer_schema": True,
                                    "parse_header": True}).csv("@DIAMONDS_ASSETS")

diamonds_df.show()

# Look at descriptive stats on the DataFrame
diamonds_df.describe().show()

-----------------------------------------------------------------------------------------------------------------------------------
|"name"                                              |"size"   |"md5"                             |"last_modified"                |
-----------------------------------------------------------------------------------------------------------------------------------
|s3://sfquickstarts/intro-to-machine-learning-wi...  |2772143  |4d3d1d4bbad5e0806dbaec425cf90196  |Mon, 10 Jul 2023 22:04:47 GMT  |
-----------------------------------------------------------------------------------------------------------------------------------

------------------------------------------------------------------------------------------------
|"carat"  |"cut"      |"color"  |"clarity"  |"depth"  |"table"  |"price"  |"x"   |"y"   |"z"   |
------------------------------------------------------------------------------------------------
|0.23     |Ideal      |E        |SI2        |61.

In [4]:
diamonds_df.columns

['"carat"',
 '"cut"',
 '"color"',
 '"clarity"',
 '"depth"',
 '"table"',
 '"price"',
 '"x"',
 '"y"',
 '"z"']

### Data cleaning

First, let's force headers to uppercase using Snowpark DataFrame operations for standardization when columns are later written to a Snowflake table.

In [5]:
# Force headers to uppercase
for colname in diamonds_df.columns:
    if colname == '"table"':
       new_colname = "TABLE_PCT"
    else:
        new_colname = str.upper(colname)
    diamonds_df = diamonds_df.with_column_renamed(colname, new_colname)

diamonds_df.show()

----------------------------------------------------------------------------------------------------
|"CARAT"  |"CUT"      |"COLOR"  |"CLARITY"  |"DEPTH"  |"TABLE_PCT"  |"PRICE"  |"X"   |"Y"   |"Z"   |
----------------------------------------------------------------------------------------------------
|0.23     |Ideal      |E        |SI2        |61.5     |55.0         |326      |3.95  |3.98  |2.43  |
|0.21     |Premium    |E        |SI1        |59.8     |61.0         |326      |3.89  |3.84  |2.31  |
|0.23     |Good       |E        |VS1        |56.9     |65.0         |327      |4.05  |4.07  |2.31  |
|0.29     |Premium    |I        |VS2        |62.4     |58.0         |334      |4.20  |4.23  |2.63  |
|0.31     |Good       |J        |SI2        |63.3     |58.0         |335      |4.34  |4.35  |2.75  |
|0.24     |Very Good  |J        |VVS2       |62.8     |57.0         |336      |3.94  |3.96  |2.48  |
|0.24     |Very Good  |I        |VVS1       |62.3     |57.0         |336      |3.95  |3.98 

Next, we standardize the category formatting for `CUT` using Snowpark DataFrame operations.

This way, when we write to a Snowflake table, there will be no inconsistencies in how the Snowpark DataFrame will read in the category names. Secondly, the feature transformations on categoricals will be easier to encode.

In [6]:
def fix_values(columnn):
    return F.upper(F.regexp_replace(F.col(columnn), '[^a-zA-Z0-9]+', '_'))

for col in ["CUT"]:
    diamonds_df = diamonds_df.with_column(col, fix_values(col))

diamonds_df.show()

----------------------------------------------------------------------------------------------------
|"CARAT"  |"COLOR"  |"CLARITY"  |"DEPTH"  |"TABLE_PCT"  |"PRICE"  |"X"   |"Y"   |"Z"   |"CUT"      |
----------------------------------------------------------------------------------------------------
|0.23     |E        |SI2        |61.5     |55.0         |326      |3.95  |3.98  |2.43  |IDEAL      |
|0.21     |E        |SI1        |59.8     |61.0         |326      |3.89  |3.84  |2.31  |PREMIUM    |
|0.23     |E        |VS1        |56.9     |65.0         |327      |4.05  |4.07  |2.31  |GOOD       |
|0.29     |I        |VS2        |62.4     |58.0         |334      |4.20  |4.23  |2.63  |PREMIUM    |
|0.31     |J        |SI2        |63.3     |58.0         |335      |4.34  |4.35  |2.75  |GOOD       |
|0.24     |J        |VVS2       |62.8     |57.0         |336      |3.94  |3.96  |2.48  |VERY_GOOD  |
|0.24     |I        |VVS1       |62.3     |57.0         |336      |3.95  |3.98  |2.47  |VER

Check the schema.

In [7]:
list(diamonds_df.schema)

[StructField('CARAT', DecimalType(3, 2), nullable=True),
 StructField('COLOR', StringType(16777216), nullable=True),
 StructField('CLARITY', StringType(16777216), nullable=True),
 StructField('DEPTH', DecimalType(3, 1), nullable=True),
 StructField('TABLE_PCT', DecimalType(3, 1), nullable=True),
 StructField('PRICE', LongType(), nullable=True),
 StructField('X', DecimalType(4, 2), nullable=True),
 StructField('Y', DecimalType(4, 2), nullable=True),
 StructField('Z', DecimalType(4, 2), nullable=True),
 StructField('CUT', StringType(16777216), nullable=True)]

Finally, let's cast the decimal types to DoubleType() since DecimalType() isn't support by Snowpark ML at the moment.

In [8]:
for colname in ["CARAT", "X", "Y", "Z", "DEPTH", "TABLE_PCT"]:
    diamonds_df = diamonds_df.with_column(colname, diamonds_df[colname].cast(DoubleType()))

diamonds_df.show()

----------------------------------------------------------------------------------------------------
|"COLOR"  |"CLARITY"  |"PRICE"  |"CUT"      |"CARAT"  |"X"   |"Y"   |"Z"   |"DEPTH"  |"TABLE_PCT"  |
----------------------------------------------------------------------------------------------------
|E        |SI2        |326      |IDEAL      |0.23     |3.95  |3.98  |2.43  |61.5     |55.0         |
|E        |SI1        |326      |PREMIUM    |0.21     |3.89  |3.84  |2.31  |59.8     |61.0         |
|E        |VS1        |327      |GOOD       |0.23     |4.05  |4.07  |2.31  |56.9     |65.0         |
|I        |VS2        |334      |PREMIUM    |0.29     |4.2   |4.23  |2.63  |62.4     |58.0         |
|J        |SI2        |335      |GOOD       |0.31     |4.34  |4.35  |2.75  |63.3     |58.0         |
|J        |VVS2       |336      |VERY_GOOD  |0.24     |3.94  |3.96  |2.48  |62.8     |57.0         |
|I        |VVS1       |336      |VERY_GOOD  |0.24     |3.95  |3.98  |2.47  |62.3     |57.0 

### Write cleaned data to a Snowflake table

In [9]:
diamonds_df.write.mode('overwrite').save_as_table('diamonds')

In [10]:
session.close()