In [1]:
import os
from dotenv import load_dotenv
import pandas as pd
import snowflake.snowpark.functions as F
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.snowpark import Session

In [2]:
session = Session.builder.configs(SnowflakeLoginOptions()).getOrCreate()

load_dotenv()
data_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir)) + '/data'

SF_DATABASE:str = os.getenv('SNOWFLAKE_DATABASE')
SF_SCHEMA:str = os.getenv('SNOWFLAKE_SCHEMA')

SnowflakeLoginOptions() is in private preview since 0.2.0. Do not use it in production. 


In [3]:
#session.sql("CREATE DATABASE IF NOT EXISTS " + SF_DATABASE).collect()
session.sql("USE " + SF_DATABASE).collect()
session.sql("CREATE SCHEMA IF NOT EXISTS " + SF_SCHEMA ).collect()
session.sql("USE SCHEMA " + SF_SCHEMA ).collect()


[Row(status='Statement executed successfully.')]

In [4]:
session.sql("CREATE STAGE if not exists ML_DATA").collect()

[Row(status='ML_DATA already exists, statement succeeded.')]

In [7]:
titanic = pd.read_csv(
    "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/titanic.csv"
)
titanic.columns = [c.upper() for c in titanic.columns]

os.makedirs(data_dir + '/raw', exist_ok=True)
titanic.to_csv("../data/raw/titanic.csv", index=False, mode='w+')

In [8]:
session.file.put("../data/raw/titanic.csv", "@ml_data/raw", overwrite=True)

[PutResult(source='titanic.csv', target='titanic.csv.gz', source_size=57018, target_size=6528, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message='')]

In [9]:
# Create a Snowpark DataFrame that is configured to load data from the CSV file
titanic_df = (
    session.read.option("infer_schema", True)
    .option("PARSE_HEADER", True)
    .csv("@ml_data/raw/titanic.csv")
)
titanic_df.show()

-------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"SURVIVED"  |"PCLASS"  |"SEX"   |"AGE"  |"SIBSP"  |"PARCH"  |"FARE"   |"EMBARKED"  |"CLASS"  |"WHO"  |"ADULT_MALE"  |"DECK"  |"EMBARK_TOWN"  |"ALIVE"  |"ALONE"  |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
|0           |3         |male    |22.00  |1        |0        |7.2500   |S           |Third    |man    |True          |NULL    |Southampton    |False    |False    |
|1           |1         |female  |38.00  |1        |0        |71.2833  |C           |First    |woman  |False         |C       |Cherbourg      |True     |False    |
|1           |3         |female  |26.00  |0        |0        |7.9250   |S           |Third    |woman  |False         |NULL    |Southampton    |True     |True     |
|1           |1 

In [14]:
def fix_values(column):
    return F.upper(F.regexp_replace(F.col(column), "[^a-zA-Z0-9]+", "_"))


for col in ["SEX", "EMBARKED", "CLASS", "WHO", "EMBARK_TOWN"]:
    titanic_df = titanic_df.with_column(col, fix_values(col))

titanic_df.show()

-------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"SURVIVED"  |"PCLASS"  |"AGE"  |"SIBSP"  |"PARCH"  |"FARE"   |"ADULT_MALE"  |"DECK"  |"ALIVE"  |"ALONE"  |"SEX"   |"EMBARKED"  |"CLASS"  |"WHO"  |"EMBARK_TOWN"  |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
|0           |3         |22.00  |1        |0        |7.2500   |True          |NULL    |False    |False    |MALE    |S           |THIRD    |MAN    |SOUTHAMPTON    |
|1           |1         |38.00  |1        |0        |71.2833  |False         |C       |True     |False    |FEMALE  |C           |FIRST    |WOMAN  |CHERBOURG      |
|1           |3         |26.00  |0        |0        |7.9250   |False         |NULL    |True     |True     |FEMALE  |S           |THIRD    |WOMAN  |SOUTHAMPTON    |
|1           |1 

In [28]:
titanic_df.collect()

[Row(SURVIVED=0, PCLASS=3, AGE=Decimal('22.00'), SIBSP=1, PARCH=0, FARE=Decimal('7.2500'), ADULT_MALE=True, DECK=None, ALIVE=False, ALONE=False, SEX='MALE', EMBARKED='S', CLASS='THIRD', WHO='MAN', EMBARK_TOWN='SOUTHAMPTON'),
 Row(SURVIVED=1, PCLASS=1, AGE=Decimal('38.00'), SIBSP=1, PARCH=0, FARE=Decimal('71.2833'), ADULT_MALE=False, DECK='C', ALIVE=True, ALONE=False, SEX='FEMALE', EMBARKED='C', CLASS='FIRST', WHO='WOMAN', EMBARK_TOWN='CHERBOURG'),
 Row(SURVIVED=1, PCLASS=3, AGE=Decimal('26.00'), SIBSP=0, PARCH=0, FARE=Decimal('7.9250'), ADULT_MALE=False, DECK=None, ALIVE=True, ALONE=True, SEX='FEMALE', EMBARKED='S', CLASS='THIRD', WHO='WOMAN', EMBARK_TOWN='SOUTHAMPTON'),
 Row(SURVIVED=1, PCLASS=1, AGE=Decimal('35.00'), SIBSP=1, PARCH=0, FARE=Decimal('53.1000'), ADULT_MALE=False, DECK='C', ALIVE=True, ALONE=False, SEX='FEMALE', EMBARKED='S', CLASS='FIRST', WHO='WOMAN', EMBARK_TOWN='SOUTHAMPTON'),
 Row(SURVIVED=0, PCLASS=3, AGE=Decimal('35.00'), SIBSP=0, PARCH=0, FARE=Decimal('8.0500'), 

In [43]:
cleaned_dir = os.makedirs(data_dir + "/cleaned", exist_ok=True)

# Convert to Pandas so we can store it locally
titanic_pd = pd.DataFrame(titanic_df.collect(), )
titanic_pd.to_csv("../data/cleaned/titanic.csv", index=False)

session.file.put("../data/cleaned/titanic.csv", "@ml_data/cleaned", overwrite=True)
titanic_df.write.mode("overwrite").save_as_table("titanic")