## Data Science Academy

### Projeto e Implementação de Plataforma de Dados com Snowflake

### Lab 5

### Automatizando Staging Para Pipeline de Machine Learning com Snowpark

## Carregando Snowpark e Outros Pacotes

In [None]:
# Imports
import os
import pandas as pd
import numpy as np
import snowflake.connector
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import (IntegerType,
                                      StringType,
                                      StructField,
                                      StructType,
                                      DateType,
                                      BooleanType,
                                      DecimalType,
                                      FloatType,
                                      TimestampType,
                                      VariantType,
                                      ArrayType)
from snowflake.snowpark.context import get_active_session

## Carregando o Dataset e Fazendo o Split

In [None]:
# Carrega o dataset
df_dsa = pd.read_csv("dataset.csv")

In [None]:
# Split para criar índices e dividir os dados
indices = np.random.permutation(len(df_dsa))
split1 = int(len(df_dsa) / 3)
split2 = int(2 * len(df_dsa) / 3)

In [None]:
# Usa os splits para dividir o dataset em 3 partes
df1 = df_dsa.iloc[indices[:split1]]
df2 = df_dsa.iloc[indices[split1:split2]]
df3 = df_dsa.iloc[indices[split2:]]

In [None]:
# Salva os datasets
df1.to_csv("dataset1.csv", index = False)
df2.to_csv("dataset2.csv", index = False)
df3.to_csv("dataset3.csv", index = False)

## Criando a Sessão Snowpark e o Banco de Dados

In [None]:
# Cria a sessão Snowpark
session = get_active_session()

In [None]:
# Cria o banco de dados que vai receber os stages
session.sql("CREATE OR REPLACE DATABASE Database02").collect()

In [None]:
# Define o schema a ser usado
session.sql("USE SCHEMA PUBLIC;").collect()

In [None]:
# Verifica as informações
session.sql("SELECT current_warehouse(), current_database(), current_schema(), current_user(), current_role()").collect()

## Criando os Stages

In [None]:
# Define os nomes das áreas de stage
load_data_stage = "LOAD_DATA_STAGE"
model_stage = "MODEL_STAGE"
function_stage = "FUNCTION_STAGE"
package_stage = "PACKAGE_STAGE"

In [None]:
# Cria os stages
session.sql(f"CREATE OR REPLACE STAGE {load_data_stage}").collect()
session.sql(f"CREATE OR REPLACE STAGE {model_stage}").collect()
session.sql(f"CREATE OR REPLACE STAGE {function_stage}").collect()
session.sql(f"CREATE OR REPLACE STAGE {package_stage}").collect()

In [None]:
# Cria uma sequence para numeração
session.sql("CREATE OR REPLACE SEQUENCE MODEL_SEQ START WITH 1 INCREMENT BY 1;").collect()

## Carregando os Dados no Stage

In [None]:
# Nomes dos arquivos
file_path1 = ("dataset1.csv")
file_path2 = ("dataset2.csv")
file_path3 = ("dataset3.csv")

In [None]:
# Coloca os arquivos na área de stage de dados
session.file.put(file_path1, f"@{load_data_stage}")
session.file.put(file_path2, f"@{load_data_stage}")
session.file.put(file_path3, f"@{load_data_stage}")

In [None]:
# Lista o conteúdo da área de stage de dados
files = session.sql(f"LIST @{load_data_stage}").collect()

In [None]:
# Loop
for file in files:
    print(file)

## Criando Tabela Para Detalhes do Treinamento de Modelos

In [None]:
# Schema para os metadados
schema_log = StructType(
    [
        StructField("training_date", TimestampType()),
        StructField("model_id", StringType()),
        StructField("model_name", StringType()),
        StructField("optimization", BooleanType()),
        StructField("training_table", StringType()),
        StructField("feature_columns", ArrayType(StringType())),
        StructField("accuracy", FloatType()),
        StructField("precision", FloatType()),
        StructField("recall", FloatType()),
        StructField("f1_score", FloatType()),
        StructField("auc_roc", FloatType()),
        StructField("TN", IntegerType()),
        StructField("FP", IntegerType()),
        StructField("FN", IntegerType()),
        StructField("TP", IntegerType()),
    ]
)

In [None]:
# Cria o dataframe a partir do schema
df_log = session.create_dataframe([], schema = schema_log)

In [None]:
df_log.write.mode("overwrite").save_as_table("MODEL_TRAINING_INFO")

## Criando Tabela Para Resultados da Inferência

In [None]:
# Define o schema
schema_inference = StructType(
    [
        StructField("inference_date", TimestampType()),
        StructField("model_id", StringType()),
        StructField("training_table", StringType()),
        StructField("test_table", StringType()),
        StructField("accuracy", FloatType()),
        StructField("precision", FloatType()),
        StructField("recall", FloatType()),
        StructField("f1_score", FloatType()),
        StructField("auc_roc", FloatType()),
        StructField("TN", IntegerType()),
        StructField("FP", IntegerType()),
        StructField("FN", IntegerType()),
        StructField("TP", IntegerType()),
    ]
)

In [None]:
# Cria o dataframe
df_inference = session.create_dataframe([], schema = schema_inference)

In [None]:
# Salva o dataframe como tabela
df_inference.write.mode("overwrite").save_as_table("INFERENCE_RESULTS")

## Criando Tabela Para Model Catalog

In [None]:
# Cria um objeto do tipo struct
schema_model = StructType([StructField("model_name", StringType(), True)])

In [None]:
# Nomes dos modelos que serão criados
model_names = [["Random Forest"], 
               ["XGBoost"], 
               ["K-Nearest Neighbors"], 
               ["Support Vector Machine"]]

In [None]:
# Cria o dataframe
df_models_table = session.create_dataframe(model_names, schema = schema_model)

In [None]:
# Salva o dataframe como tabela
df_models_table.write.mode("overwrite").save_as_table("MODEL_CATALOG")

## Testando o Processo de Carga de Dados

In [None]:
# Define o schema
schema = StructType(
    [
        StructField("age", IntegerType()),
        StructField("sex", IntegerType()),
        StructField("cp", IntegerType()),
        StructField("trestbps", DecimalType()),
        StructField("chol", IntegerType()),
        StructField("fbs", DecimalType()),
        StructField("restecg", DecimalType()),
        StructField("thalach", DecimalType()),
        StructField("exang", DecimalType()),
        StructField("oldpeak", DecimalType()),
        StructField("slope", DecimalType()),
        StructField("ca", DecimalType()),
        StructField("thal", IntegerType()),
        StructField("target", IntegerType()),
    ]
)

In [None]:
# Variáveis com nome de arquivo e tabela
file_name = "dataset1.csv"  
table_name = "TEST_TABLE"

In [None]:
# Define snowflake dataframe
df_dsa_dados = (
    session.read.schema(schema)
    .options({"FIELD_DELIMITER": ",", "SKIP_HEADER": 1})
    .csv(f"@{load_data_stage}/{file_name}")
)

In [None]:
df_dsa_dados.show(5)

In [None]:
# Carrega os dados na tabela
copied_into_result = df_dsa_dados.copy_into_table(table_name, 
                                                  force = True, 
                                                  on_error = "CONTINUE")

In [None]:
# Cria o dataframe
df_dsa_dados_teste = session.table(table_name)

In [None]:
# Lista o conteúdo como dataframe do pandas
df_dsa_dados_teste.limit(5).to_pandas()

## Automatizando o Processo de Carga de Dados

In [None]:
# Função de automação
def dsa_carrega_dados(session: Session, file_name: str, table_name: str) -> str:

    # Imports para garantir portabilidade
    from snowflake.snowpark.types import (StructType, StructField, IntegerType, DecimalType)

    # Schema para os dados
    schema_heart = StructType(
        [
            StructField("age", IntegerType()),
            StructField("sex", IntegerType()),
            StructField("cp", IntegerType()),
            StructField("trestbps", DecimalType(10, 2)),
            StructField("chol", IntegerType()),
            StructField("fbs", DecimalType(10, 2)),
            StructField("restecg", DecimalType(10, 2)),
            StructField("thalach", DecimalType(10, 2)),
            StructField("exang", DecimalType(10, 2)),
            StructField("oldpeak", DecimalType(10, 2)),
            StructField("slope", DecimalType(10, 2)),
            StructField("ca", DecimalType(10, 2)),
            StructField("thal", IntegerType()),
            StructField("target", IntegerType()),
        ]
    )

    try:

        # Lê o arquivo e carrega na tabela
        session.read.option("FIELD_DELIMITER", ",").option("SKIP_HEADER", 1).option(
            "ON_ERROR", "CONTINUE"
        ).schema(schema_heart).csv(file_name).copy_into_table(table_name)

        return f"{file_name} carregado com sucesso na tabela '{table_name}'"

    except Exception as e:
        return f"Erro ao carregar os dados na tabela '{table_name}': {str(e)}"

In [None]:
# Variáveis para tabela de destino e arquivo
file_name = "@load_data_stage/dataset2.csv"  
table_name = "TEST_TABLE2"

In [None]:
# Executa a função de forma manual
dsa_carrega_dados(session, file_name, table_name)

In [None]:
# Cria o dataframe
df_dsa_dados_teste = session.table(table_name)

In [None]:
# Lista o conteúdo como dataframe do pandas
df_dsa_dados_teste.limit(5).to_pandas()

## Registrando a Stored Procedure Para Automação

In [None]:
# Registra a função como stored procedure
session.sproc.register(func = dsa_carrega_dados,
                       name = "proc_dsa_carrega_dados",
                       packages = ["snowflake-snowpark-python"],
                       is_permanent = True,
                       stage_location = f"@{function_stage}",
                       replace = True)

In [None]:
# Executa a função para cada arquivo
session.call("proc_dsa_carrega_dados", f"@{load_data_stage}/dataset1.csv", "DATA_TABLE_1")
session.call("proc_dsa_carrega_dados", f"@{load_data_stage}/dataset2.csv", "DATA_TABLE_2")
session.call("proc_dsa_carrega_dados", f"@{load_data_stage}/dataset3.csv", "DATA_TABLE_3")

In [None]:
# Drop das tabelas que não precisamos mais
session.sql(f"DROP TABLE IF EXISTS TEST_TABLE").collect()
session.sql(f"DROP TABLE IF EXISTS TEST_TABLE2").collect()

# Fim