In [1]:
# IMPORTS

import json
from typing import NamedTuple

from google.cloud import aiplatform
from kfp import compiler, dsl
from kfp.dsl import component

from kfp import dsl
from kfp.dsl import Output, Dataset

In [2]:
# GLOBAL VARIABLES
PROJECT_ID = "vertexai-457414"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}
BUCKET_URI = f"gs://the_chustomer_churn/dataset/"  # @param {type:"string"}
PIPELINES_PATH = f"{BUCKET_URI}pipelines/"

In [3]:
aiplatform.init(project=PROJECT_ID, 
                location=LOCATION, 
                staging_bucket=BUCKET_URI)

In [4]:
@component(base_image="python:3.9",
              packages_to_install=[
                "pandas",
                "scikit-learn",
                "fsspec",
                "gcsfs" 
            ])
def load_data(data_raw_path: str,
              df_train: Output[Dataset],
              df_test: Output[Dataset],
              df_val: Output[Dataset]):

    import pandas as pd
    from sklearn.model_selection import train_test_split
    
    # VARIABLES
    TEST_SIZE = 0.15  # 15% para teste
    VAL_SIZE = 0.15   # 15% para validação
    TARGET_COLUMN = 'Churn'

    # LEITURA
    dataset = pd.read_csv(f"gs://the_chustomer_churn/dataset/dataset.csv")


    train_val_df, test_df = train_test_split(
        dataset,
        test_size=TEST_SIZE,
        random_state=42,
        stratify=dataset[TARGET_COLUMN]
    )


    # Etapa 2: Separar o restante (85%) em Treino (70%) e Validação (15%)
    # Precisamos recalcular a proporção de validação em relação ao 'train_val_df'
    # val_size_ratio = 15% / 85%
    val_size_ratio = VAL_SIZE / (1 - TEST_SIZE)
    
    train_df, val_df = train_test_split(
        train_val_df,
        test_size=val_size_ratio,
        random_state=42,
        stratify=train_val_df[TARGET_COLUMN] # Estratificar novamente
    )


    train_df.to_csv(df_train.path, index=False)
    test_df.to_csv(df_test.path, index=False)
    val_df.to_csv(df_val.path, index=False)





In [5]:
from kfp.dsl import (
    Input, 
    Model, 
    Metrics, 
    component
)
@component(base_image="python:3.9",
              packages_to_install=[
                "google-cloud-aiplatform", 
                "pandas",
                "scikit-learn",
                "fsspec",
                "gcsfs" 
            ])
def preprocess_features(
              df_train_csv: Input[Dataset],
              df_test_csv: Input[Dataset],
              df_val_csv: Input[Dataset],
              df_train_preprocessed: Output[Dataset],
              df_test_preprocessed: Output[Dataset],
              df_val_preprocessed: Output[Dataset]
              ):

    import pandas as pd

    def preprocess_features(df_input):
        """
        Função de LIMPEZA: Corrige tipos, trata nulos e mapeia o alvo.
        Não faz 'fit' ou 'transform' (scaling ou encoding).
        """
        # 1. Copiar o dataframe
        df = df_input.copy()
        
        # --- ETAPA 1: Correção de Tipos e Nulos ---
        print(f"Limpando TotalCharges...")
        df['TotalCharges'] = pd.to_numeric(df['TotalCharges'], errors='coerce')
        df['TotalCharges'].fillna(0, inplace=True)
        
        
        # --- ETAPA 2: Mapeamento APENAS DA COLUNA-ALVO ---
        # O 'y' (target) precisa ser uma coluna única (0 ou 1)
        if 'Churn' in df.columns:
            print(f"Mapeando coluna 'Churn' para 0/1...")
            df['Churn'] = df['Churn'].map({'Yes': 1, 'No': 0})
    
            
        # --- ETAPA 3: Preparação das Categóricas ---
        # Apenas garante que SeniorCitizen seja 'str' para o OneHotEncoder
        if 'SeniorCitizen' in df.columns:
            df['SeniorCitizen'] = df['SeniorCitizen'].astype(str)
            
        
        # --- ETAPA 4: Remoção de Colunas ---
        if 'customerID' in df.columns:
            df = df.drop(columns=['customerID'])
            
        return df


    
    df_train_raw = pd.read_csv(df_train_csv.path)
    df_test_raw = pd.read_csv(df_test_csv.path)
    df_val_raw = pd.read_csv(df_test_csv.path)


    df_train = preprocess_features(df_train_raw)
    
    print("Processando dataset de VALIDAÇÃO...")
    df_val = preprocess_features(df_val_raw)
    
    print("Processando dataset de TESTE...")
    df_test = preprocess_features(df_test_raw)
        
    df_train.to_csv(df_train_preprocessed.path, index=False)
    df_test.to_csv(df_test_preprocessed.path, index=False)
    df_val.to_csv(df_val_preprocessed.path, index=False)


In [28]:
@component(base_image="python:3.9",
              packages_to_install=[
                "google-cloud-aiplatform", 
                "pandas",
                "scikit-learn",
                "fsspec",
                "gcsfs" 
            ])



def transformers(
              # Entradas
              df_train_csv: Input[Dataset],
              df_test_csv: Input[Dataset],
              df_val_csv: Input[Dataset],

              # Saídas
              y_train_out: Output[Dataset],
              X_train_out: Output[Dataset],
              y_val_out: Output[Dataset],
              X_val_out: Output[Dataset],
              y_test_out: Output[Dataset],
              X_test_out: Output[Dataset],

              # Artifatos
              preprocessor_artifact: Output[Model]
              ):

    from sklearn.preprocessing import OneHotEncoder, MinMaxScaler
    from sklearn.compose import ColumnTransformer
    import pandas as pd

    
    df_train = pd.read_csv(df_train_csv.path)
    df_test = pd.read_csv(df_test_csv.path)
    df_val = pd.read_csv(df_test_csv.path)


    y_train = df_train.pop('Churn')
    y_val = df_val.pop('Churn')
    y_test = df_test.pop('Churn')
    
    # O que sobra em df_train, df_val, df_test é o nosso X
    X_train = df_train
    X_val = df_val
    X_test = df_test



    # --- 3. Definir Colunas Numéricas e Categóricas (para o Pipeline) ---

    numeric_features = ['tenure', 'MonthlyCharges', 'TotalCharges']
    
    categorical_features = [
        'gender', 'SeniorCitizen', 'Partner', 'Dependents', 
        'PhoneService', 'MultipleLines', 'InternetService', 
        'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 
        'TechSupport', 'StreamingTV', 'StreamingMovies', 
        'Contract', 'PaperlessBilling', 'PaymentMethod'
    ]
    
    # --- 4. Criar o Preprocessor (O "Cérebro") ---
    
    # Pipeline para features numéricas
    numeric_transformer = MinMaxScaler()
    
    # Pipeline para features categóricas
    categorical_transformer = OneHotEncoder(
        drop='first',              
        handle_unknown='ignore',   # Ignora categorias raras que apareçam no teste
        sparse_output=False        
    )
    
    # Juntar tudo no ColumnTransformer
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ],
        remainder='passthrough' 
    )
    
    
    # --- 5. FIT (Aprender) e TRANSFORM (Aplicar) ---
    
    # FIT (APRENDER) - SOMENTE COM X_train
    print("\n'Fitando' (aprendendo) o preprocessor com X_train...")
    preprocessor.fit(X_train)
    
    # TRANSFORM (APLICAR) - Em todos os datasets
    print("Transformando X_train...")
    X_train_processed = preprocessor.transform(X_train)
    
    print("Transformando X_val...")
    X_val_processed = preprocessor.transform(X_val)
    
    print("Transformando X_test...")
    X_test_processed = preprocessor.transform(X_test)
    
    print("\n✅ Processamento com Pipeline concluído com sucesso!")
    print(f"Formato do X_train processado: {X_train_processed.shape}")
    print(f"Formato do X_val processado: {X_val_processed.shape}")
    print(f"Formato do X_test processado: {X_test_processed.shape}")


    # --- 4. SALVAR OS DADOS PROCESSADOS NAS SAÍDAS ---
    print("Salvando dados processados em CSV...")
    try:
        col_names = preprocessor.get_feature_names_out()
    except Exception:
        # Fallback simples se get_feature_names_out falhar
        col_names = [f"feature_{i}" for i in range(X_train_processed.shape[1])]

    pd.DataFrame(X_train_processed, columns=col_names).to_csv(X_train_out.path, index=False)
    pd.DataFrame(X_val_processed, columns=col_names).to_csv(X_val_out.path, index=False)
    pd.DataFrame(X_test_processed, columns=col_names).to_csv(X_test_out.path, index=False)
    
    y_train.to_csv(y_train_out.path, index=False)
    y_val.to_csv(y_val_out.path, index=False)
    y_test.to_csv(y_test_out.path, index=False)

    print("\n✅ Processamento e salvamento concluídos!")

In [29]:
from kfp.dsl import component, Input, Output, Model, Dataset

@component(
    base_image="python:3.9",
    packages_to_install=[
        "pandas", "scikit-learn", "joblib", "xgboost", "fsspec", "gcsfs"
    ],
)
def train_model(
    x_train_processed: Input[Dataset],
    y_train: Input[Dataset],
    x_val_processed: Input[Dataset],
    y_val: Input[Dataset],
    scale_pos_weight: float,
    n_estimators: int,
    random_state: int,
    eval_metric: str,
    objective: str,
    trained_model: Output[Model],
):
    import pandas as pd
    import numpy as np
    import joblib
    import xgboost as xgb



    # --- ler dados ---
    X_train = pd.read_csv(x_train_processed.path)
    y_train_s = pd.read_csv(y_train.path)
    X_val = pd.read_csv(x_val_processed.path)
    y_val_s = pd.read_csv(y_val.path)

    # --- modelo ---
    model = xgb.XGBClassifier(
        objective=objective,
        scale_pos_weight=scale_pos_weight,
        random_state=random_state,
        n_estimators=n_estimators,
        eval_metric=eval_metric,
        n_jobs=-1,
        tree_method="hist",
    )

    # --- treino (sem logs/plots) ---
    model.fit(X_train, y_train_s, eval_set=[(X_val, y_val_s)], verbose=False)

    # --- salvar ---
    #joblib.dump(model, trained_model.path)

In [30]:
from kfp import dsl

@dsl.pipeline(
    name="customer",
    description="Pipeline from customer dataset",
    pipeline_root=PIPELINES_PATH,
)
def pipeline(
    # Parâmetros de entrada do Pipeline
    data_raw_path: str = "gs://the_chustomer_churn/dataset/dataset.csv",
    
    # Hiperparâmetros para o treinamento
    hp_scale_pos_weight: float = 3.0,  # Valor padrão, você pode mudar na UI
    hp_n_estimators: int = 150,
    hp_random_state: int = 42
):
    
    # 1. Carrega e divide os dados
    load_task = load_data(data_raw_path=data_raw_path)
    
    # 2. Faz a limpeza (ex: TotalCharges)
    preprocess_task = preprocess_features(
        df_train_csv = load_task.outputs["df_train"],
        df_test_csv = load_task.outputs['df_test'],
        df_val_csv = load_task.outputs['df_val']
    )

    # 3. Aplica o ColumnTransformer (MinMaxScaler, OneHotEncoder)
    transformer_task = transformers(
        df_train_csv = preprocess_task.outputs['df_train_preprocessed'],
        df_test_csv = preprocess_task.outputs['df_test_preprocessed'],
        df_val_csv = preprocess_task.outputs['df_val_preprocessed']
    )

    # --- AJUSTE AQUI ---
    # 4. Treina o modelo com os dados transformados
    train_task = train_model(
        # Dados vêm do 'transformer_task'
        x_train_processed=transformer_task.outputs["X_train_out"],
        y_train=transformer_task.outputs["y_train_out"],
        x_val_processed=transformer_task.outputs["X_val_out"],
        y_val=transformer_task.outputs["y_val_out"],
        
        # Hiperparâmetros vêm da definição do pipeline
        scale_pos_weight=hp_scale_pos_weight,
        n_estimators=hp_n_estimators,
        random_state=hp_random_state,
        
        # Parâmetros fixos
        objective="binary:logistic",
        eval_metric="logloss"
    )

In [31]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json")

In [32]:
aiplatform.init(project="vertexai-457414", location="us-central1")

job = aiplatform.PipelineJob(
    display_name="custumer-dataset-v1",
    template_path="pipeline.json",
    pipeline_root="gs://the_chustomer_churn/dataset/pipelines/"
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/666534829864/locations/us-central1/pipelineJobs/customer-20251026224156
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/666534829864/locations/us-central1/pipelineJobs/customer-20251026224156')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/customer-20251026224156?project=666534829864
PipelineJob projects/666534829864/locations/us-central1/pipelineJobs/customer-20251026224156 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/666534829864/locations/us-central1/pipelineJobs/customer-20251026224156 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/666534829864/locations/us-central1/pipelineJobs/customer-20251026224156 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/666534829864/locations/us-central1/pipelineJobs/customer-20251026224156 current state:
Pipel