# Imports and helper functions

In [1]:
import pandas as pd
from swifter import swifter
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 100)
import numpy as np
from awswrangler import config, athena
from sklearn.compose import ColumnTransformer, make_column_transformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import LabelEncoder,OneHotEncoder,FunctionTransformer, MinMaxScaler
from sklearn.impute import SimpleImputer
from category_encoders import TargetEncoder

from feature_engine.encoding import CountFrequencyEncoder
from sklearn.model_selection import train_test_split, GridSearchCV, cross_validate, StratifiedKFold
from sklearn.base import TransformerMixin
from sklearn import model_selection, metrics
import os
import s3fs
import boto3
import io
import json
from dotenv import load_dotenv
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat
from sagemaker.sklearn.processing import SKLearnProcessor, ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.tuner import ContinuousParameter, IntegerParameter
from sagemaker import Session
from sagemaker.workflow.properties import PropertyFile
from sagemaker.inputs import TrainingInput
from sagemaker.model import Model
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep 
from sagemaker.workflow.functions import JsonGet, Join
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.execution_variables import ExecutionVariables


load_dotenv()

AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
region = os.getenv('region')
session = boto3.Session(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    region_name=region
)
config.boto3_session = session

s3 = s3fs.S3FileSystem(
    anon=False, key=AWS_ACCESS_KEY_ID, secret=AWS_SECRET_ACCESS_KEY)
sm_client = session.client("sagemaker")
sagemaker_session = Session()


def data_description(df):
    print('Variables:\n\n{}'.format(df.dtypes), end='\n\n')
    print('Number of rows {}'.format(df.shape[0]), end='\n\n')
    print('Number of columns {}'.format(df.shape[1]), end='\n\n')
    print('NA analysis'.format(end='\n'))
    for i in df.columns:
        print('column {}: {} {}'.format(i,df[i].isna().any(), df[i].isna().sum()))

def consult_table_athena(database, table, boto3_session=None):
    config.aws_profile = 'default'
    config.region = 'us-east-1'

    query = f"SELECT * FROM {database}.{table}"

    df = athena.read_sql_query(query, database=database, boto3_session=boto3_session)

    return df


def unique_values_columns(df):
    """
    Display unique values for each object (or string) column in a DataFrame.
    
    Parameters:
    - df (DataFrame): Input DataFrame
    
    Returns:
    - dict: A dictionary with column names as keys and unique values as lists.
    """
    
    # Filter out only object or string type columns
    object_cols = df.select_dtypes(include=['object']).columns
    
    # Get unique values for each object column
    unique_values = {col: df[col].unique().tolist() for col in object_cols}
    
    return unique_values

def reduce_mem_usage(df, verbose=True):
    numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
    start_mem = df.memory_usage().sum() / 1024**2    
    for col in df.columns:
        col_type = df[col].dtypes
        if col_type in numerics:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)    
    end_mem = df.memory_usage().sum() / 1024**2
    if verbose: print('Mem. usage decreased to {:5.2f} Mb ({:.1f}% reduction)'.format(end_mem, 100 * (start_mem - end_mem) / start_mem))
    return df


def predict_in_batches(predictor, data, batch_size=5000):
    predictions = []
    for i in range(0, len(data), batch_size):
        # Extract the batch from the DataFrame
        batch = data.iloc[i:i + batch_size]

        # Convert the batch to a CSV string
        batch_csv = batch.to_csv(header=False, index=False).encode('utf-8')

        # Make a prediction
        batch_predictions = predictor.predict(batch_csv, initial_args={"ContentType": "text/csv"})
        predictions.append(batch_predictions.decode('utf-8'))
    return predictions

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/dbcordeiro@sefaz.al.gov.br/.config/sagemaker/config.yaml


INFO:botocore.credentials:Found credentials in environment variables.


# Data dictionary

**SEM_NOT** - Semana Epidemiológica do preenchimento da ficha de notificação

**SEM_PRI** - Semana Epidemiológica dos Primeiros Sintomas

**SG_UF_NOT** - abela com código e siglas das UF padronizados pelo IBGE.

**ID_REGIONA** - Regional de Saúde de Notificação Código (IBGE)

    Tabela com código e nomes Regional de Saúde
    das Regionais de Saúde dos onde está
    municípios de notificação localizado o
    padronizados pelo IBGE.
    Município realizou
    a notificação.

**CO_REGIONA** - Regional de Saúde de Notificação Código (IBGE)

    Tabela com código e nomes Regional de Saúde
    das Regionais de Saúde dos onde está
    municípios de notificação localizado o
    padronizados pelo IBGE.
    Município realizou
    a notificação.
    
**ID_MUNICIP** - Município Código (IBGE)

    Tabela com código e nomes Município onde
    dos
    Municípios está localizada a
    padronizados pelo IBGE.
    Unidade que
    realizou a
    notificação.

**CO_MUN_NOT** - Município Código (IBGE)

    Tabela com código e nomes Município onde
    dos
    Municípios está localizada a
    padronizados pelo IBGE.
    Unidade que
    realizou a
    notificação.

**CS_SEXO** - Sexo do paciente


**NU_IDADE_N** - Idade ou 
       Idade informada
       pelo paciente
       quando não se sabe
       a data de nascimento
       Na falta desse dado é registrada a idade aparente
       
       Se digitado a data de nascimento a idade é calculada automaticamente pelo sistema considerando o intervalo entre a data de nascimento e a datraa dos primeiros sintomas
       
       idade deve ser <= 150
       
    
**TP_IDADE** - Tipo de idade 1 - Dia, 2 - Mes, 3 - Ano

    Se digitado a data de nascimento, o campo Idade/Tipo é calculado e
    preenchido automaticamente pelo sistema: considerando o intervalo entre a
    data de nascimento e a data dos primeiros sintomas.
    Se a diferença for de 0 a 30 dias, o sistema grava em Idade = (nº dias) e em
    Tipo = 1-Dia. Por exemplo: se Data de nascimento = 05/12/2012 e Data dos
    1ºs sintomas = 11/12/2012, então Idade = 6 e Tipo = 1-Dia.
    Se a diferença for de 1 a 11 meses, o sistema grava em Idade = (nº meses) e
    em Tipo = 2-Mês. Por exemplo: se Data de nascimento = 05/10/2012 e Data
    dos 1ºs sintomas = 11/12/2012, então Idade = 2 e Tipo = 2-Mês.
    Se a diferença for maior ou igual a 12 meses, o sistema grava em Idade = (nº
    anos) e em Tipo = 3-Ano. Por exemplo: se Data de nascimento = 05/10/2011
    e Data dos 1ºs sintomas = 11/12/2012, então Idade = 1 e Tipo = 3-Ano.

**COD_IDADE** - Não existe a descrição no dicionario de dados.

**CS_GESTANT** - Gestante
    1-1º Trimestre
    2-2º Trimestre
    3-3º Trimestre
    4-Idade Gestacional
    Ignorada
    5-Não
    6-Não se aplica
    9-Ignorado
    
    Idade gestacional
    da paciente.
    
    Se selecionado categoria 2-Feminino no campo Sexo.
    Se selecionado sexo igual a Masculino ou a idade for menor ou igual a 9 anos
    o campo é preenchido automaticamente com 6-Não se aplica.
    Se selecionado sexo igual a Feminino e idade for maior que 9 anos, o campo
    não pode ser preenchido com 6-Não se aplica.

**CS_RACA** - Raça/Cor
    1-Branca
    2-Preta
    3-Amarela
    4-Parda
    5-Indígena
    9-Ignorado
    
    Cor ou raça declarada pelo
    paciente:
    Branca; Preta;
    Amarela; Parda
    (pessoa que se
    declarou mulata,
    cabocla, cafuza,
    mameluca ou
    mestiça de preto
    com pessoa de
    outra cor ou raça);
    e, Indígena.
    
**CS_ESCOL_N** - Escolaridade

    0-Sem escolaridade/
    1-Fundamental 1º ciclo (1ª a paciente.5ª série)
    2-Fundamental 2º ciclo (6ª a Para os níveis9ª série)fundamental e
    3- Médio (1º ao 3º ano)
    4-Superior
    5-Não se aplica
    9-Ignorado

**SG_UF** - UF

    Unidade federativa de residencia do paciente

**CS_ZONA** - Zona geografica de residencia do paciente

    1-Urbana
    2-Rural
    3-Periurbana
    9-Ignorado

**SURTO_SG** - Não está no dicionario

**NOSOCOMIAL** - Trata-se de caso nosocomial (infecção adquirida no hospital)?
    
    1-Sim
    2-Não
    9-Ignorado

**AVE_SUINO** - Paciente trabalha ou tem contato direto com aves, suínos, ou outro animal?
    
    1-Sim
    2-Não
    9-Ignorado

**FEBRE** - Sinais e Sintomas/Febre
    
    1-Sim
    2-Não
    9-Ignorado

**TOSSE** - Sinais e Sintomas/Tosse
    
    1-Sim
    2-Não

**GARGANTA** - Sinais e Sintomas/Dor de Garganta

    1-Sim
    2-Não
    9-Ignorado

**DISPNEIA** - Sinais e Sintomas/Dispneia

PESQUISA: Dispneia é o nome que se dá para a sensação de falta de ar.

    1-Sim
    2-Não
    9-Ignorado

**DESC_RESP** - Sinais e sintomas/desconforto respiratorio

    1-Sim
    2-Não
    9-Ignorado

**SATURACAO** - Sinais e Sintomas/Saturação O2< 95%

    1-Sim
    2-Não
    9-Ignorado

**DIARREIA** - Sinais e Sintomas/Diarreia

    1-Sim
    2-Não
    9-Ignorado

**VOMITO** - Sinais e Sintomas/Vômito

    1-Sim
    2-Não
    9-Ignorado

**OUTRO_SIN** - Sinais e Sintomas/Outros

    1-Sim
    2-Não
    9-Ignorado

**OUTRO_DES** - Sinais e sintomas/outros(String)

    Campo varchar

**PUERPERA** - Fatores de risco/ Puérpera
    
    Paciente é puérpera ou parturiente (mulher que pariu recentemente – até 45 dias do parto)?


    1-Sim
    2-Não
    9-Ignorado


**FATOR_RISC** - Fatores de risco(POSSUI ALGUM?)
    
    1-Sim
    2-Não
    9-Ignorado

**CARDIOPATI** - Fatores de risco/Cardiovascular Crônica Doença

    1-Sim
    2-Não
    9-Ignorado

**HEMATOLOGI** - Fatores de risco/Doença hematologica cronica

Paciente possui
Doença
Hematológica
Crônica?

    1-Sim
    2-Não
    9-Ignorado
    
**SIND_DOWN** - Fatores de risco/ Síndrome de  Down

    1-Sim
    2-Não
    9-Ignorado

**HEPATICA** - Fatores de risco/ Doença Hepática Crônica

Paciente possui
Doença Hepática
Crônica?

    1-Sim
    2-Não
    9-Ignorado

**ASMA** - Fatores de risco/ Asma

Paciente possui
Asma?

    1-Sim
    2-Não
    9-Ignorado

**DIABETES** - Fatores de risco/ Diabetes mellitus

Paciente possui
Diabetes mellitus?

    1-Sim
    2-Não
    9-Ignorado

**NEUROLOGIC** - Fatores de risco/doença Neurológica Crônica

    1-Sim
    2-Não
    9-Ignorado

**PNEUMOPATI** - Fatores de risco/outra Pneumatopatia Crônica

Pneumopatia é um termo que se refere em geral às doenças que afetam os pulmões

    1-Sim
    2-Não
    9-Ignorado

**IMUNODEPRE** - Fatores de risco/ Imunodeficiência ou Imunodepressão

Paciente possui
Imunodeficiência
ou Imunodepressão
(diminuição da
função do sistema
imunológico)?

    1-Sim
    2-Não
    9-Ignorado

**RENAL** - Fatores de risco/ Doença Renal Crônica

    1-Sim
    2-Não
    9-Ignorado

**OBESIDADE** - Fatores de risco/ Obesidade

    1-Sim
    2-Não
    9-Ignorado

**OBES_IMC** - Fatores de (Descrição IMC)
Valor  do IMC

Campo habilitado somente se selecionado 1 (Sim) no campo OBESIDADE

**OUT_MORBI** - Fatores de risco/ Outros

Paciente possui
outro(s) fator(es)
de risco?

    1-Sim
    2-Não
    9-Ignorado
    

**MORB_DESC** - Fatores de risco/outros(descrição)
Habilitado se selecionado categoria 1-Sim em Fatores de risco/Outros. OUT_MORBI

Varchar(30)

**VACINA** - Recebeu vacina contra Gripe na última campanha?

Informar se o
paciente foi
vacinado contra
gripe na última
campanha, após
verificar a
documentação /
caderneta.
Campo Essencial
VACINA
Caso o paciente
não tenha a
caderneta,
direcionar a
pergunta para ele
ou responsável e
preencher o campo
com o código
correspondente a
resposta.

    1-Sim
    2-Não
    9-Ignorado

**MAE_VAC** - Se < 6 meses: a mãe recebeu a vacina?

Se paciente < 6
meses, a mãe
recebeu vacina?

    1-Sim
    2-Não
    9-Ignorado

**M_AMAMENTA** - Se < 6 meses: a mãe amamenta a criança?

Se paciente < 6
meses, a mãe
amamenta a
criança?

    1-Sim
    2-Não
    9-Ignorado

**ANTIVIRAL** - Usou antiviral para gripe?

    1-Sim
    2-Não
    9-Ignorado

**TP_ANTIVIR** - Qual antiviral?

Habilitado se campo 42(ANTIVIRAL) Usou antiviral para gripe? for igual a 1. 


    1- Oseltamivir
    2- Zanamivir
    3- Outro, especifique

**HOSPITAL** - Houve internação?

Caso o campo não seja igual a 1 – Sim o sistema emitirá um aviso indicando
que não atende a definição de caso.

    1-Sim
    2-Não
    9-Ignorado

**UTI** - Internado em UTI?

    1-Sim
    2-Não
    9-Ignorado

**SUPORT_VEN** - Uso de suporte ventilatório?

suporte ventilatório no intuito de facilitar as trocas gasosas, reduzir a fadiga muscular, reduzir a dispnéia, melhorar a ventilação alveolar, aumentar a complacência pulmonar e a capacidade residual funcional, diminuir a morbidade e manter as vias aéreas abertas sem a necessidade de instituir uma via aérea 

    1-Sim, invasivo
    2-Sim, não invasivo
    3-Não
    9-Ignorado



**RAIOX_RES** - Raio X de Tórax

    1-Normal
    2-Infiltrado intersticial
    3-Consolidação
    4-Misto
    5-Outro
    6-Não realizado
    9-Ignorado

**RAIOX_OUT** - Raio X de Tórax/ Outro (especificar)

Informar o
resultado do RX de
tórax se
selecionado a
opção 5-Outro.

Habilitado de campo 57(RAIOX_RES) - Raio X de Tórax = 5 (Outro).

varchar(30)

**AMOSTRA** - Coletou amostra?

Foi realizado coleta 
de amostra para
realização de teste
diagnóstico?

1-Sim
2-Não
9-Ignorado

**TP_AMOSTRA** - Tipo de amostra

    1-Secreção de Naso-orofaringe
    2-Lavado Broco-alveolar
    3-Tecido post-mortem
    4-Outra, qual?
    5-LCR
    9-Ignorado


**OUT_AMOST** - Tipo de amostra/Outra

Descrição do tipo
da amostra clínica,
caso diferente das
listadas nas
categorias do
campo.

Campo habilitado se selecionado categoria 4-Outra, qual em Tipo de
amostra. em coluna TP_AMOSTRA

vachar(30)

<span style="color:red">**CLASSI_FIN**</span> - Classificação final do caso -- TARGET



    1-SRAG por influenza
    2-SRAG por outro vírus respiratório
    3-SRAG por outro agente etiologico qual
    4-SRAG não especificado
    5-SRAG por covid-19


**HISTO_VGM** - Não existe no dicionario de dados

**PAC_COCBO** - Ocupação

Tabela com código da
Ocupação da Classificação
Brasileira de Ocupações
(CBO).

Ocupação
profissional do
paciente

**PAC_DSCBO** - Ocupação

Tabela com código da
Ocupação da Classificação
Brasileira de Ocupações
(CBO).

Ocupação
profissional do
paciente

**OUT_ANIM** - Paciente trabalha ou tem contato  direto com aves, suínos/Outro animal (especificar)

Informar o animal
que o paciente
teve contato se
selecionado a
opção 3.

Habilitado de campo 33(AVE_SUINO) - Contato com outro animal = 3 (Outro).

**DOR_ABD** - Sinais e Sintomas/Dor abdominal

    1-Sim
    2-Não
    9-Ignorado

**FADIGA** - Sinais e Sintomas/Fadiga

    1-Sim
    2-Não
    9-Ignorado


**PERD_OLFT** - Sinais e Sintomas/Perda do Olfato

    1-Sim
    2-Não
    9-Ignorado

**PERD_PALA** - Sinais e Sintomas/Perda do Paladar

    1-Sim
    2-Não
    9-Ignorado

**TOMO_RES** - Aspecto Tomografia

    1-Tipico covid-19
    2- Indeterminado covid-19
    3- Atípico covid-19
    4- Negativo para Pneumonia
    5- Outro
    6-Não realizado
    9-Ignorado


**TOMO_OUT** - Aspecto Tomografia/Outro (especificar)

Informar o
resultado da
tomografia se
selecionado a
opção 5-Outro

Habilitado de campo 59 (TOMO_RES )- Aspecto Tomografia = 5 (Outro

varchar(100)

**VACINA_COV** - Recebeu vacina COVID-19?

    1-Sim
    2-Não
    9-Ignorado

**DOSE_1_COV** - Data 1ª dose da vacina COVID-19

Habilitado se campo 36(VACINA_COV) - Recebeu vacina COVID-19? for igual a 1.

Date
DD/MM/AAAA

**DOSE_2_COV** - Data 2ª dose da vacina COVID-19

Habilitado se campo 36(VACINA_COV) - Recebeu vacina COVID-19? for igual a 1.

Date
DD/MM/AAAA

**DOSE_REF** - Data da dose reforço da vacina COVID-19

Habilitado se campo 36(VACINA_COV) - Recebeu vacina COVID-19? for igual a 1.

Date
DD/MM/AAAA

**FNT_IN_COV** - Fonte dos dados/informação sobre a vacina COVID-19

Campo preenchido de acordo com a fonte dos dados/informação sobre a
vacina COVID-19, se foi digitada manualmente ou recuperada via
integração com a Base Nacional de Vacinação

    1- Manual
    2- Integração

**DELTA_UTI** - Não está no dicionario de dados

**ID** - Não esta no dicionario de dados

# Creation of model group package registry

In [None]:
# model_package_group_name = "respiratory-virus-classification"
# model_package_group_input = {
#     "ModelPackageGroupName": model_package_group_name,
#     "ModelPackageGroupDescription": "Respiratory virus classification project"
# }

# create_model_package_group_response = sm_client.create_model_package_group(
#     **model_package_group_input
# )

# Loading Data

In [2]:
# Loading raw data from athena
# Balancing classes
# Separate train and test data


# DATABASE = 'respiratory_db'
# TABLE = 'table_respiratory_traintrain_data'
# df = consult_table_athena(DATABASE, TABLE, boto3_session=session)

# class1 = df[df['classi_fin'] == 1].sample(31437, random_state=42)
# class2 = df[df['classi_fin'] == 2]
# class3 = df[df['classi_fin'] == 3]
# class4 = df[df['classi_fin'] == 4].sample(31437, random_state=42)
# class5 = df[df['classi_fin'] == 5].sample(31437, random_state=42)
# df_raw = pd.concat([class1, class2, class3, class4, class5], ignore_index=True)

# train, test = train_test_split(df_raw, test_size=0.15, random_state=42)

# df.to_parquet(s3.open("s3://sagemaker-traintest-respiratory-classification/data/raw/data.parquet", "wb"), index=False)
# train.to_parquet(s3.open("s3://sagemaker-traintest-respiratory-classification/data/raw/train.parquet", "wb"), index=False)
# train.to_parquet(s3.open("s3://sagemaker-traintest-respiratory-classification/data/raw/test.parquet", "wb"), index=False)


In [2]:
df = pd.read_parquet(s3.open("s3://sagemaker-traintest-respiratory-classification/data/raw/train.parquet", "rb"))

## Data description

In [3]:
df.columns = df.columns.str.lower()

In [5]:
data_description(df)

Variables:

sem_not         Int64
sem_pri         Int64
sg_uf_not      string
id_regiona     string
co_regiona    float64
id_municip     string
co_mun_not      Int64
cs_sexo        string
nu_idade_n      Int64
tp_idade        Int64
cod_idade       Int64
cs_gestant      Int64
cs_raca         Int64
cs_escol_n    float64
sg_uf          string
cs_zona       float64
surto_sg      float64
nosocomial    float64
ave_suino     float64
febre         float64
tosse         float64
garganta      float64
dispneia      float64
desc_resp     float64
saturacao     float64
diarreia      float64
vomito        float64
outro_sin     float64
outro_des      string
puerpera      float64
fator_risc      Int64
cardiopati    float64
hematologi    float64
sind_down     float64
hepatica      float64
asma          float64
diabetes      float64
neurologic    float64
pneumopati    float64
imunodepre    float64
renal         float64
obesidade     float64
obes_imc       string
out_morbi     float64
morb_desc      strin

In [4]:
df = reduce_mem_usage(df)

Mem. usage decreased to 36.62 Mb (43.7% reduction)


In [36]:
df.describe().T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
sem_not,109395.0,22.70233,14.467374,1.0,11.0,21.0,33.0,52.0
sem_pri,109395.0,22.54224,14.828901,1.0,10.0,20.0,33.0,52.0
co_regiona,97422.0,,,1331.0,1345.0,1382.0,1519.0,6256.0
co_mun_not,109395.0,353770.2,86174.568385,110002.0,311830.0,354130.0,411370.0,530010.0
nu_idade_n,109395.0,42.62514,30.546156,0.0,7.0,48.0,69.0,115.0
tp_idade,109395.0,2.837616,0.396031,1.0,3.0,3.0,3.0,3.0
cod_idade,109395.0,2880.241,412.158356,1000.0,3003.0,3048.0,3069.0,3115.0
cs_gestant,109395.0,5.817213,0.804062,0.0,5.0,6.0,6.0,9.0
cs_raca,109395.0,3.534695,2.88798,1.0,1.0,4.0,4.0,9.0
cs_escol_n,71783.0,,0.0,0.0,2.0,5.0,9.0,9.0


### Data Cleaning / NA analysis / Outliers analysis

In [5]:
# Sexo have Male, F and I, lets take off the I value has there are just 188 lines.
df = df[df['cs_sexo'] != 'I']
df['cs_sexo'] = df['cs_sexo'].astype('category')

df = df.drop('delta_uti', axis=1)

# Negative ages are excluded
df = df[~df['nu_idade_n'] <= 0]

# Remove demographic categories

df = df.drop(
    [
    'sg_uf_not',
    'id_regiona',
    'co_regiona',
    'id_municip',
    'co_mun_not',
    'sg_uf',
    'cod_idade',
    'cs_escol_n'
    ], axis=1
)

#replacing out of range values with the mode
df['tp_amostra'].replace(df['tp_amostra'].max(),df['tp_amostra'].mode()[0], inplace=True)

df['dor_abd'] = df['dor_abd'].swifter.apply(
    lambda x: df['dor_abd'].mode()[0] if x not in (1,2)
    else x
)

df['perd_olft'] = df['perd_olft'].swifter.apply(
    lambda x: df['perd_olft'].mode()[0] if x not in (1,2)
    else x
)
df['vacina_cov'] = df['vacina_cov'].swifter.apply(
    lambda x: df['vacina_cov'].mode()[0] if x not in (1,2)
    else x
)

df['perd_pala'] = df['perd_pala'].swifter.apply(
    lambda x: df['perd_pala'].mode()[0] if x not in (1,2)
    else x
)

float_cols = df.select_dtypes(['float16','float32','int64','float64']).columns
for col in float_cols:
    mode_value = df[col].mode()[0]
    df[col] = df[col].fillna(mode_value)

df[float_cols] = df[float_cols].astype('int8')

int_cols = df.select_dtypes(['int64','int8']).drop(['sem_pri','nu_idade_n'], axis=1).columns
df[int_cols] = df[int_cols].astype('category')

Dask Apply:   0%|          | 0/24 [00:00<?, ?it/s]

Dask Apply:   0%|          | 0/24 [00:00<?, ?it/s]

Dask Apply:   0%|          | 0/24 [00:00<?, ?it/s]

Dask Apply:   0%|          | 0/24 [00:00<?, ?it/s]

### Feature selection to remove unimportant features

In [6]:
# #Excluding text columns
# predictors = df.drop('classi_fin', axis=1)
# predictors = predictors.drop(predictors.select_dtypes(['object','string']), axis=1)
# response = df['classi_fin']

# X_train, X_val, y_train,y_val = train_test_split(
#     predictors, response, random_state=50, stratify=response
# )

# numerical_cols = predictors.select_dtypes(['int8']).columns.tolist()
# categorical_cols = predictors.select_dtypes(['category']).columns.tolist()

# categorical_imputer = ColumnTransformer(
#     [('cat_imputer', SimpleImputer(strategy='most_frequent'), categorical_cols)],
#     remainder='drop'
# )


# X_train_transformed = pd.DataFrame(
#     categorical_imputer.fit_transform(X_train),
#     columns=categorical_cols,
#     index=X_train.index
# )
# X_train_transformed = pd.concat([X_train_transformed,X_train[numerical_cols]],axis=1)
# X_train_transformed[categorical_cols] = X_train_transformed[categorical_cols].astype('category')

# X_val_transformed = pd.DataFrame(
#     categorical_imputer.transform(X_val),
#     columns=categorical_cols,
#     index=X_val.index
# )
# X_val_transformed = pd.concat([X_val_transformed,X_val[numerical_cols]],axis=1)
# X_val_transformed[categorical_cols] = X_val_transformed[categorical_cols].astype('category')

# categorical_transformer = Pipeline(
#     steps=[
#         ('encoder', CountFrequencyEncoder(encoding_method='frequency'))
#     ]
# )

# numerical_transformer = Pipeline(
#     steps=[
#         ('encoder', MinMaxScaler())
#     ]    
# )

# preprocessor = ColumnTransformer(
#     [
#         ('cat', categorical_transformer, categorical_cols),
#         ('num', numerical_transformer, numerical_cols)
#     ]   
# )

# X_train_transformed = pd.DataFrame(
#     preprocessor.fit_transform(X_train),
#     columns=predictors.columns,
#     index=X_train_transformed.index
# )

# X_val_transformed = pd.DataFrame(
#     preprocessor.transform(X_val),
#     columns=predictors.columns,
#     index=X_val_transformed.index
# )

# encoder = LabelEncoder()
# y_train = encoder.fit_transform(y_train)
# y_val = encoder.transform(y_val)

# classifiers = [
#     RandomForestClassifier(n_jobs=-1, random_state=42, class_weight='balanced_subsample'),
#     XGBClassifier(n_jobs=-1, random_state=42,objective='multi:softax', max_delta_step=1),
#     lgbm.LGBMClassifier(n_jobs=-1,  random_state=42, class_weight='balanced')
# ]

# for classifier in classifiers:
#     pipeline_1 = Pipeline(
#         steps= [
#         ('feature_selection', SelectFromModel(estimator=classifier))
#         ]
#     )
#     pipeline_2 = Pipeline(
#         steps= [
#         ('feature_selection', RFE(estimator=classifier))
#         ]
#     )
    
#     permutation_score = permutation_importance(
#         classifier.fit(X_train_transformed,y_train), X_val_transformed, y_val,
#         random_state=42, scoring='f1_weighted', n_repeats=10
#     )

#     importance = pd.DataFrame(
#         {'features':X_train_transformed.columns, 
#         'f1_weighted':permutation_score['importances_mean']}).sort_values(by='f1_weighted', ascending=False
#     )

#     print(
#         'model: {} \n features selected based on feature importance:{} \n\n'.format(pipeline_1['feature_selection'].estimator,
#         pipeline_1.fit(X_train_transformed,y_train).get_feature_names_out(input_features=None)) 
#         )
#     print(
#         'model: {} \n features_selected based on RFE:{} \n\n'.format(pipeline_2['feature_selection'].estimator,
#         pipeline_2.fit(X_train_transformed,y_train).get_feature_names_out(input_features=None))
#         )
#     print(importance, '\n\n\n')



#Features choosen to continue
feature_selected = [
    'id', 'sem_pri', 'nu_idade_n', 'saturacao', 'antiviral',
    'tp_antivir', 'hospital', 'classi_fin', 'uti','raiox_res', 
    'dor_abd', 'perd_olft', 'tomo_res','cs_raca', 'cs_zona', 
    'perd_pala','vacina_cov'
]

df = df[feature_selected]
#Replace 9 value (label is ignored) to mode of each column
df_category = (
    df.select_dtypes(include='category')
    .columns.to_list()
)
column_modes = df[df_category].mode().iloc[0]
for col_name in df_category:
    df[col_name] = df[col_name].replace(9, column_modes[col_name])
    

# #Save cleaned data in S3
# df.to_csv(
#     s3.open("s3://sagemaker-traintest-respiratory-classification/data/cleaned/df_cleaned.csv", "wb"),
#     index=False
# )

# Data preparation, hyperparameter tunning and training model pipeline

## Parameters definitions

In [80]:
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)
inference_instance_type = ParameterString(name="InferenceInstanceType", default_value="ml.c5.xlarge")
hpo_tuner_instance_type = ParameterString(name="HPOTunerScriptInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")
role = ParameterString(name='ExecutionRole', default_value=os.getenv('SAGEMAKER_IAM_ROLE'))
default_bucket = ParameterString(name="DefaultS3Bucket", default_value=os.getenv("S3"))
baseline_model_objective_value = ParameterFloat(name='BaselineModelObjectiveValue', default_value=0.4)
image_uri_tuning = ParameterString(name="ImageURITuning", default_value=os.getenv("ECR_IMAGE_TUNING"))
image_uri_model = ParameterString(name="ImageURIModel", default_value=os.getenv("ECR_IMAGE_MODEL"))
k = ParameterInteger(name="KFold", default_value=2)
max_jobs = ParameterInteger(name="MaxTrainingJobs", default_value=6)
max_parallel_jobs = ParameterInteger(name="MaxParallelTrainingJobs", default_value=2)
eta = ParameterFloat(name="eta", default_value=0.1)
max_depth = ParameterInteger(name="MaxDepth", default_value=2)
gamma = ParameterInteger(name="Gamma", default_value=1)
min_child_weight = ParameterInteger(name="MinChildWeight", default_value=5)
subsample = ParameterFloat(name="Subsample", default_value=1.0)
model_package_group_name = ParameterString(name="ModelGroupName", default_value=f"respiratory-virus-classification")

# Variables / Constants used throughout the pipeline
framework_version_sklearn = f"1.2-1"
# s3_bucket_base_path=f"s3://{default_bucket.default_value}"
# s3_bucket_base_path_train = f"{s3_bucket_base_path}/data/train"
# s3_bucket_base_path_test = f"{s3_bucket_base_path}/data/test"
# s3_bucket_base_path_cleaned = f"{s3_bucket_base_path}/data/cleaned"
# s3_bucket_base_path_processed = f"{s3_bucket_base_path}/data/processed"
# s3_bucket_base_path_evaluation = f"{s3_bucket_base_path}/evaluation"
# s3_bucket_base_path_jobinfo = f"{s3_bucket_base_path}/jobinfo"
# s3_bucket_base_path_output = f"{s3_bucket_base_path}/output"

s3_bucket_base_path = Join(on="", values=["s3://", default_bucket.expr])
s3_bucket_base_path_train = Join(on="", values=[s3_bucket_base_path, "/data/train"])
s3_bucket_base_path_test = Join(on="", values=[s3_bucket_base_path, "/data/test"])
s3_bucket_base_path_cleaned = Join(on="", values=[s3_bucket_base_path, "/data/cleaned"])
s3_bucket_base_path_processed = Join(on="", values=[s3_bucket_base_path, "/data/processed"])
s3_bucket_base_path_evaluation = Join(on="", values=[s3_bucket_base_path, "/evaluation"])
s3_bucket_base_path_jobinfo = Join(on="", values=[s3_bucket_base_path, "/jobinfo"])
s3_bucket_base_path_output = Join(on="", values=[s3_bucket_base_path, "/output"])

## Hyperparameter tunning with Cross-validation step

### Preprocessing Step

In [81]:
sklearn_processor = SKLearnProcessor(
    framework_version=framework_version_sklearn,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="prepocessing_kfold_split",
    role=role
)

preprocessing_kfold_split_step = ProcessingStep(
    name="PreprocessingCVStep",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(
            input_name="cleaned_data",
            source="s3://sagemaker-traintest-respiratory-classification/data/cleaned",
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/data/train/",
            destination="s3://sagemaker-traintest-respiratory-classification/data/train"
        ),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/data/test/",
            destination="s3://sagemaker-traintest-respiratory-classification/data/test"
        )
    ],
    code="../src/hyperparameter_tuning/preprocessing_cv.py"
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


### Hyperparameter tunning with CV

In [82]:
script_tuner = ScriptProcessor(
    image_uri=image_uri_tuning,
    command=["python3"],
    instance_type=hpo_tuner_instance_type,
    instance_count=processing_instance_count,
    base_job_name="KFoldCrossValidationHyperParameterTuner",
    role=role
)

evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

jobinfo = PropertyFile(
    name="JobInfo", output_name="jobinfo", path="jobinfo.json"
)


hyperparam_tunning_cv_step = ProcessingStep(
    name="HyperParameterTuningStep",
    processor=script_tuner,
    code="../src/hyperparameter_tuning/cv_hyperparameter_tunning.py",
    outputs=[
        ProcessingOutput(
            output_name="evaluation", 
            source="/opt/ml/processing/evaluation", 
            destination=s3_bucket_base_path_evaluation
        ),
        ProcessingOutput(
            output_name="jobinfo", 
            source="/opt/ml/processing/jobinfo", 
            destination=s3_bucket_base_path_jobinfo
        )
    ],
    job_arguments=[
        "-k", k.to_string(),
        "--image-uri-tuning", image_uri_tuning,
        "--train", s3_bucket_base_path_train, 
        "--test", s3_bucket_base_path_test,
        "--instance-type", training_instance_type,
        "--instance-count", training_instance_count.to_string(),
        "--output-path", s3_bucket_base_path_output,
        "--max-jobs", max_jobs.to_string(),
        "--max-parallel-jobs" , max_parallel_jobs.to_string(),
        "--region", region,
        "--role", role
    ],
    property_files=[evaluation_report],
    depends_on=["PreprocessingCVStep"]
)

## Training Model Step

### Preprocessing Step

In [83]:
preprocessing_step = ProcessingStep(
    name="PreprocessingStep",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(
            input_name="CleanedData",
            source="s3://sagemaker-traintest-respiratory-classification/data/cleaned",
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="X_train_transformed",
            source="/opt/ml/processing/output/X_transformed",
            destination="s3://sagemaker-traintest-respiratory-classification/data/processed"
        ),
        ProcessingOutput(
            output_name="y_train_transformed",
            source="/opt/ml/processing/output/y_transformed",
            destination="s3://sagemaker-traintest-respiratory-classification/data/processed"            
        ),
        ProcessingOutput(
            output_name="preprocessor",
            source="/opt/ml/processing/output/preprocessor",
            destination="s3://sagemaker-traintest-respiratory-classification/estimator/preprocessor"              
        )
    ],
    code="../src/model/preprocessing.py"
)


### Training final model with hyperparameters tunned

In [84]:
model_train_estimator = SKLearn(
    entry_point="train_final_model.py",
    image_uri=image_uri_model,
    instance_type=training_instance_type,
    source_dir="../src/model",
    output_path=s3_bucket_base_path_output,
    role=role
)

model_training_step = TrainingStep(
    name="ModelTrainingStep",
    estimator=model_train_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=s3_bucket_base_path_processed,
            content_type="text/csv"
        ),
        "jobinfo": TrainingInput(
            s3_data=s3_bucket_base_path_jobinfo,
            content_type="application/json"
        )
    },
    depends_on=["PreprocessingStep"]
)



# Register Model With Model Registry

In [85]:
model = Model(
    image_uri=model_train_estimator.image_uri,
    model_data=model_training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role
)

In [86]:
s3_uri = s3_uri = Join(on="", values=[hyperparam_tunning_cv_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"],"/evaluation.json"])
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=s3_uri,
        content_type="application/json"
    )
)


register_model_step = RegisterModel(
    name="RegisterModelStep",
    estimator=model_train_estimator,
    model_data=hyperparam_tunning_cv_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"],
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)



# Condition Step

In [87]:
condition_evaluate_model = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name="HyperParameterTuningStep",
        property_file=evaluation_report,
        json_path="multiclass_classification_metrics.F1_score.value"
    ),
    right=baseline_model_objective_value
)

condition_step = ConditionStep(
    name="ModelEvaluationStep",
    conditions=[condition_evaluate_model],
    if_steps=[preprocessing_step, model_training_step, register_model_step],
    else_steps=[]
)

# Final Pipeline

In [27]:
# check the step error
# sm_client.list_pipeline_execution_steps(PipelineExecutionArn="arn:aws:sagemaker:us-east-1:513734873949:pipeline/CrossValidationTunningPipeline/execution/0lqbi5tgjyt5")

In [88]:
pipeline_name = f"CrossValidationTunningPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        training_instance_count,
        inference_instance_type,
        hpo_tuner_instance_type,
        model_approval_status,
        role,
        default_bucket,
        baseline_model_objective_value,
        image_uri_model,
        image_uri_tuning,
        k,
        max_jobs,
        max_parallel_jobs,
        eta,
        max_depth,
        gamma,
        min_child_weight,
        subsample,
        model_package_group_name,
        framework_version_sklearn,
        s3_bucket_base_path,
        s3_bucket_base_path_train,
        s3_bucket_base_path_test,
        s3_bucket_base_path_cleaned,
        s3_bucket_base_path_processed,
        s3_bucket_base_path_evaluation,
        s3_bucket_base_path_jobinfo,
        s3_bucket_base_path_output
    ],
    pipeline_experiment_config=PipelineExperimentConfig(
        ExecutionVariables.PIPELINE_NAME,
        ExecutionVariables.PIPELINE_EXECUTION_ID
    ),
    steps=[preprocessing_kfold_split_step, hyperparam_tunning_cv_step, condition_step]
)

In [89]:
pipeline.upsert(role_arn=role.default_value)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:513734873949:pipeline/CrossValidationTunningPipeline',
 'ResponseMetadata': {'RequestId': '6277174c-f16a-4226-ad1f-bf424e283887',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '6277174c-f16a-4226-ad1f-bf424e283887',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '98',
   'date': 'Mon, 05 Feb 2024 15:55:53 GMT'},
  'RetryAttempts': 0}}

In [12]:
execution = pipeline.start()

execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:513734873949:pipeline/CrossValidationTunningPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:513734873949:pipeline/CrossValidationTunningPipeline/execution/0lqbi5tgjyt5',
 'PipelineExecutionDisplayName': 'execution-1706786704162',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2024, 2, 1, 8, 25, 4, 97000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 2, 1, 8, 25, 4, 97000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': '3701608e-9412-46f8-be84-0347e20f0733',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '3701608e-9412-46f8-be84-0347e20f0733',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '425',
   'date': 'Thu, 01 Feb 2024 11:25:04 GMT'},
  'RetryAttempts': 0}}

In [13]:
execution.wait()

WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"