In [171]:
# USER_FLAG = "--user"
# !pip3 install {USER_FLAG} kfp --upgrade
# !pip3 install {USER_FLAG} google_cloud_pipeline_components --upgrade
# !pip3 install {USER_FLAG} 'apache-beam[gcp]'

## Import Bibliotecas

In [172]:
from typing import NamedTuple

from kfp.v2 import dsl
from kfp.v2.dsl import (pipeline,
                        Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        Markdown)

from kfp.v2 import compiler


from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

from google_cloud_pipeline_components import aiplatform as gcc_aip

In [173]:
PIPELINE_ROOT = 'gs://pipeline-ecommerce'

# 1. Componente: Captura dos Dados

In [174]:
@component(packages_to_install=["google-cloud-bigquery","db-dtypes", "pandas"],
          base_image="python:3.10.6",
          output_component_file="captura_dados.yaml")
def captura_dados():
    import logging

    import pandas as pd
    from google.cloud import bigquery
    
    PROJECT_ID = "comunidade-ds-420801"
    DATASET_ID = "tabela_teste"
    TABLE_RAW_ID = "ecommerce"
    TABLE_ID = "ecommerce_cds"
    
    def run_bq_query(sql: str, project_name: str) -> Union[str, pd.DataFrame]:
        """
        Run a BigQuery query and return the job ID or result as a DataFrame
        Args:
            sql: SQL query, as a string, to execute in BigQuery
        Returns:
            df: DataFrame of results from query,  or error, if any
        """
        
        bq_client = bigquery.Client(project=project_name)

        # Try dry run before executing query to catch any errors
        job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
        bq_client.query(sql, job_config=job_config)

        # If dry run succeeds without errors, proceed to run query
        job_config = bigquery.QueryJobConfig()
        client_result = bq_client.query(sql, job_config=job_config)

        job_id = client_result.job_id

        # Wait for query/job to finish running. then get & return data frame
        df = client_result.result().to_arrow().to_pandas()
        print(f"Finished job_id: {job_id}")
        return df
    
    
    query = f"""
                CREATE OR REPLACE TABLE
               `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}` (InvoiceNo STRING,
                StockCode STRING,
                Description STRING,
                Quantity INT64,
                InvoiceDate DATE,
                UnitPrice FLOAT64,
                CustomerID FLOAT64,
                Country STRING)
            PARTITION BY
              InvoiceDate AS (
              WITH
                not_nulls AS (
                SELECT
                  *
                FROM
                  `{PROJECT_ID}.{DATASET_ID}.{TABLE_RAW_ID}`
                WHERE
                  InvoiceDate <= CURRENT_DATE()
                  AND CustomerID IS NOT NULL
                  AND Description IS NOT NULL),
                filtering_features AS (
                SELECT
                  *
                FROM
                  not_nulls
                WHERE
                  UnitPrice >= 0.04
                  AND Country NOT IN ('European Community',
                    'Unspecified')
                  AND StockCode NOT IN ('POST',
                    'D',
                    'DOT',
                    'M',
                    'S',
                    'AMAZONFEE',
                    'm',
                    'DCGSSBOY',
                    'DCGSSGIRL',
                    'PADS',
                    'B',
                    'CRUK')
                  AND CustomerID != 16446)
              SELECT
                *
              FROM
                filtering_features);
    """
    
    run_bq_query(query, project_name=PROJECT_ID)
    logging.info(f'Tabela criada: {PROJECT_ID}.{DATASET_ID}.{TABLE_ID}')

In [175]:
print('teste')

teste


# 2. Componente: Preparação dos dados

In [176]:
@component(packages_to_install=["pandas", "google-cloud-bigquery", "db-dtypes", "pandas-gbq"],
    base_image="python:3.10.6",
    output_component_file="data_preparation_ecommerce.yaml")
def data_preparation():
    import os
    import logging
    from typing import Tuple

    import pandas as pd
    import pandas_gbq
    from google.cloud import bigquery
    
    logging.info('Iniciando o componente')
    
    PROJECT_ID = "comunidade-ds-420801"
    DATASET_ID = "tabela_teste"
    TABLE_RAW_ID = "ecommerce"
    TABLE_ID = "ecommerce_cds"
    TABLE_FILTERED_TEMP_ID = 'temp_data_filtered'
    TABLE_PURCHASES_TEMP_ID = 'temp_data_purchases'
    TABLE_RETURNS_TEMP_ID = 'temp_data_returns'
    PROJECT_NUMBER = os.environ["CLOUD_ML_PROJECT_ID"]

    def keep_features(dataframe: pd.DataFrame, keep_columns: list) -> pd.DataFrame:
        """
        Retorna um DataFrame com as colunas especificadas em keep_columns.

        Args:
            dataframe (pd.DataFrame): O DataFrame a ser processado.
            keep_columns (list): A lista de nomes de colunas a serem mantidas no DataFrame resultante.

        Returns:
            pd.DataFrame: O DataFrame resultante com apenas as colunas especificadas em keep_columns.
        """
        return dataframe[keep_columns]
    def column_to_int(dataframe: pd.DataFrame, column_name: str) -> bool:
        """
        Converte a coluna especificada em um dataframe para o tipo inteiro.

        Args:
            dataframe (pd.DataFrame): O dataframe a ser processado.
            column_name (str): O nome da coluna a ser convertida.

        Returns:
            bool: True se a conversão foi bem sucedida, False caso contrário.
        """
        try:
            dataframe[column_name] = dataframe[column_name].astype(int)
        except (ValueError, TypeError):
            # Lidar com valores ausentes e conversões inválidas
            return False

        # Retorna True se a conversão foi bem sucedida
        return True
    
    def column_to_date(dataframe: pd.DataFrame, column_name: str, date_format: str = None) -> bool:
        """
        Converte a coluna especificada em um dataframe para o tipo data.

        Args:
            dataframe (pd.DataFrame): O dataframe a ser processado.
            column_name (str): O nome da coluna a ser convertida.
            date_format (str, opcional): O formato de data personalizado. Se nenhum formato for especificado, o pandas usará o padrão 'YYYY-MM-DD'.

        Returns:
            bool: True se a conversão foi bem sucedida, False caso contrário.
        """
        try:
            if date_format:
                dataframe[column_name] = pd.to_datetime(dataframe[column_name], format=date_format)
            else:
                dataframe[column_name] = pd.to_datetime(dataframe[column_name])
        except (ValueError, TypeError):
            # Lidar com valores ausentes e conversões inválidas
            return False

        # Retorna True se a conversão foi bem sucedida
        return True


    def change_column_type(dataframe_raw: pd.DataFrame):
        """
        Changes the data type of a given column in a DataFrame.

        Args:
            dataframe_raw: A pandas DataFrame.

        Returns:
            None.
        """
        column_to_int(dataframe_raw, 'CustomerID')
        column_to_date(dataframe_raw, 'InvoiceDate')

    def filtering_features(dataframe_raw: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """
        Filters and preprocesses the input dataframe.

        Args:
            dataframe_raw: A pandas DataFrame containing raw sales data.

        Returns:
            Three pandas DataFrames containing the filtered returns and purchases data, and the filtered main data.
        """
        # Filter returns and purchases data
        df_returns = dataframe_raw.loc[dataframe_raw['Quantity'] < 0, ['CustomerID', 
                                                                       'Quantity']]
        df_purchases = dataframe_raw.loc[dataframe_raw['Quantity'] >= 0, :]

        # Filter main data
        df_filtered = keep_features(dataframe_raw, ['InvoiceNo', 'StockCode', 'Quantity',
                                                    'InvoiceDate', 'UnitPrice', 
                                                    'CustomerID', 'Country'])

        return df_filtered, df_purchases, df_returns

    def run_data_preparation(dataframe_raw: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """
        Preprocesses the input dataframe by performing column type conversion and filtering features.

        Args:
            dataframe_raw (pd.DataFrame): A pandas DataFrame containing raw sales data.

        Returns:
            A tuple of three pandas DataFrames: df_filtered, df_purchases, and df_returns.
            - df_filtered: A DataFrame containing the filtered main data.
            - df_purchases: A DataFrame containing the filtered purchases data.
            - df_returns: A DataFrame containing the filtered returns data.
        """
        change_column_type(dataframe_raw)
        return filtering_features(dataframe_raw)
    
    query_sql = f"""SELECT *
                    FROM  `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
                    WHERE InvoiceDate <= CURRENT_DATE """
    
    data = pd.read_gbq(query=query_sql, 
                         project_id=PROJECT_NUMBER) 
    logging.info(f'Tabela carregada: `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`')
    
    df_filtered, df_purchases, df_returns = run_data_preparation(data)
    
    pandas_gbq.to_gbq(df_filtered, f'{PROJECT_ID}.{DATASET_ID}.{TABLE_FILTERED_TEMP_ID}', project_id=PROJECT_NUMBER, if_exists='replace')
    pandas_gbq.to_gbq(df_purchases, f'{PROJECT_ID}.{DATASET_ID}.{TABLE_PURCHASES_TEMP_ID}', project_id=PROJECT_NUMBER, if_exists='replace')
    pandas_gbq.to_gbq(df_returns, f'{PROJECT_ID}.{DATASET_ID}.{TABLE_RETURNS_TEMP_ID}', project_id=PROJECT_NUMBER, if_exists='replace')
    
    logging.info(f'Tabelas criadas no BigQuery: {TABLE_FILTERED_TEMP_ID} e {TABLE_PURCHASES_TEMP_ID} e {TABLE_RETURNS_TEMP_ID}')

# 3. Componente: Feature Engeneering

In [177]:
@component(packages_to_install=["pandas", "google-cloud-bigquery", "db-dtypes", "pandas-gbq", "google-cloud"],
    base_image="python:3.10.6",
    output_component_file="feature_engineering_ecommerce.yaml")
def feature_engineering():
    from google.cloud import bigquery
    from google.cloud.exceptions import NotFound
    import pandas as pd
    import pandas_gbq
    import os
    import logging
    from functools import reduce
    from typing import Union
    
    PROJECT_ID = "comunidade-ds-420801"
    DATASET_ID = "tabela_teste"
    TABLE_ID = 'dados_engenharia_features'
    TABLE_RAW_ID = "ecommerce"
    TABLE_FILTERED_TEMP_ID = 'temp_data_filtered'
    TABLE_PURCHASES_TEMP_ID = 'temp_data_purchases'
    TABLE_RETURNS_TEMP_ID = 'temp_data_returns'
    PROJECT_NUMBER = os.environ["CLOUD_ML_PROJECT_ID"]
    
    logging.info('Iniciando o componente')
    def column_to_date(dataframe: pd.DataFrame, column_name: str, date_format: str = None) -> bool:
        """
        Converte a coluna especificada em um dataframe para o tipo data.

        Args:
            dataframe (pd.DataFrame): O dataframe a ser processado.
            column_name (str): O nome da coluna a ser convertida.
            date_format (str, opcional): O formato de data personalizado. Se nenhum formato for especificado, o pandas usará o padrão 'YYYY-MM-DD'.

        Returns:
            bool: True se a conversão foi bem sucedida, False caso contrário.
        """
        try:
            if date_format:
                dataframe[column_name] = pd.to_datetime(dataframe[column_name], format=date_format)
            else:
                dataframe[column_name] = pd.to_datetime(dataframe[column_name])
        except (ValueError, TypeError):
            # Lidar com valores ausentes e conversões inválidas
            return False

        # Retorna True se a conversão foi bem sucedida
        return True
    def keep_features(dataframe: pd.DataFrame, keep_columns: list) -> pd.DataFrame:
        """
        Retorna um DataFrame com as colunas especificadas em keep_columns.

        Args:
            dataframe (pd.DataFrame): O DataFrame a ser processado.
            keep_columns (list): A lista de nomes de colunas a serem mantidas no DataFrame resultante.

        Returns:
            pd.DataFrame: O DataFrame resultante com apenas as colunas especificadas em keep_columns.
        """
        return dataframe[keep_columns]
    
    def calculate_gross_revenue(dataframe_purchases: pd.DataFrame) -> pd.DataFrame:
        """
        Calcula a receita bruta de cada cliente com base nas colunas 'Quantity' e 'UnitPrice' e retorna
        um DataFrame com as colunas 'CustomerID' e 'gross_revenue'.

        Args:
            dataframe_purchases (pd.DataFrame): O DataFrame das compras contendo as colunas 'CustomerID', 'Quantity' e 'UnitPrice'.

        Returns:
            pd.DataFrame: O DataFrame resultante contendo as colunas 'CustomerID' e 'gross_revenue'.
        """
        # Verifica se as colunas necessárias estão presentes no DataFrame de entrada
        required_columns = {'CustomerID', 'Quantity', 'UnitPrice'}
        missing_columns = required_columns - set(dataframe_purchases.columns)
        if missing_columns:
            raise ValueError(f"O DataFrame de entrada está faltando as seguintes colunas: {missing_columns}")

        # Calcula a receita bruta e agrupa por CustomerID
        dataframe_purchases.loc[:, 'gross_revenue'] = dataframe_purchases.loc[:, 'Quantity'] * dataframe_purchases.loc[:, 'UnitPrice']
        grouped_df = dataframe_purchases.groupby('CustomerID').agg({'gross_revenue': 'sum'}).reset_index()

        return grouped_df

    def create_recency(dataframe_purchases: pd.DataFrame, dataframe_filtered: pd.DataFrame) -> pd.DataFrame:
        """
        Calcula a recência da última compra para cada cliente.

        Args:
            dataframe_purchases (pd.DataFrame): DataFrame com as informações de compras de todos os clientes.
            dataframe_filtered (pd.DataFrame): DataFrame filtrado apenas com as informações dos clientes que desejamos calcular a recência.

        Returns:
            pd.DataFrame: DataFrame com as colunas 'CustomerID' e 'recency_days', indicando a recência em dias da última compra para cada cliente.

        """
        # calcula a data da última compra de cada cliente
        df_recency = dataframe_purchases.loc[:, ['CustomerID', 'InvoiceDate']].groupby('CustomerID').max().reset_index()

        # calcula a recência em dias da última compra de cada cliente em relação à data mais recente da base de dados filtrada
        df_recency.loc[:, 'recency_days'] = (dataframe_filtered['InvoiceDate'].max() - df_recency['InvoiceDate']).dt.days

        # retorna o DataFrame apenas com as colunas 'CustomerID' e 'recency_days'
        return df_recency[['CustomerID', 'recency_days']]

    def create_quantity_purchased(dataframe_purchases: pd.DataFrame) -> pd.DataFrame:
        """
        Calcula a quantidade de produtos adquiridos por cada cliente.

        Args:
            dataframe_purchases (pd.DataFrame): DataFrame com as informações de compras de todos os clientes.

        Returns:
            pd.DataFrame: DataFrame com as colunas 'CustomerID' e 'qty_products', indicando a quantidade de produtos adquiridos por cada cliente.
        """
        # agrupa as informações de compras por CustomerID e conta o número de StockCode para cada grupo
        qty_purchased = dataframe_purchases.loc[:, ['CustomerID', 'StockCode']].groupby('CustomerID').count()

        # renomeia a coluna StockCode para qty_products e reseta o índice para transformar o CustomerID em uma coluna
        qty_purchased = qty_purchased.reset_index().rename(columns={'StockCode': 'qty_products'})

        # retorna o DataFrame com as colunas 'CustomerID' e 'qty_products'
        return qty_purchased

    def create_freq_purchases(dataframe_purchases: pd.DataFrame) -> pd.DataFrame:
        """
        Calculates the purchase frequency of each customer based on the purchase history.

        Parameters
        ----------
        dataframe_purchases : pd.DataFrame
            DataFrame with purchase history of each customer, containing columns CustomerID, InvoiceNo, and InvoiceDate.

        Returns
        -------
        pd.DataFrame
            DataFrame with the purchase frequency of each customer, containing columns CustomerID and frequency.
        """

        # Calculate time range of purchases for each customer
        df_aux = (dataframe_purchases[['CustomerID', 'InvoiceNo', 'InvoiceDate']]
                  .drop_duplicates()
                  .groupby('CustomerID')
                  .agg(max_=('InvoiceDate', 'max'),
                       min_=('InvoiceDate', 'min'),
                       days_=('InvoiceDate', lambda x: ((x.max() - x.min()).days) + 1),
                       buy_=('InvoiceNo', 'count'))
                  .reset_index())

        # Calculate frequency of purchases for each customer
        df_aux['frequency'] = df_aux[['buy_', 'days_']].apply(
            lambda x: x['buy_'] / x['days_'] if x['days_'] != 0 else 0, axis=1)

        return df_aux

    def create_qty_returns(dataframe_returns: pd.DataFrame) -> pd.DataFrame:
        """
        Computes the total quantity of returned products for each customer.

        Args:
            dataframe_returns: A pandas DataFrame containing information about returns.

        Returns:
            A pandas DataFrame with the total quantity of returned products for each customer.
        """
        # Validate input data
        # if dataframe_returns is None:
        #     raise ValueError("Input DataFrame is empty")
        # if not all(col in dataframe_returns.columns for col in ['CustomerID', 'Quantity']):
        #     raise ValueError("Input DataFrame must contain 'CustomerID' and 'Quantity' columns")

        # Compute quantity of returns
        df_returns = dataframe_returns[['CustomerID', 'Quantity']].groupby('CustomerID').sum().reset_index().rename(columns={'Quantity': 'qty_returns'})
        df_returns['qty_returns'] = df_returns['qty_returns']* -1

        return df_returns

    def run_feature_engineering(dataframe_filtered: pd.DataFrame, dataframe_purchases: pd.DataFrame, dataframe_returns: pd.DataFrame) -> pd.DataFrame:
        """
        Performs feature engineering on the input dataframes and returns a new dataframe with the engineered features.

        Args:
            dataframe_filtered: A pandas DataFrame containing filtered customer order data.
            dataframe_purchases: A pandas DataFrame containing customer purchase data.
            dataframe_returns: A pandas DataFrame containing customer return data.

        Returns:
            A pandas DataFrame with the engineered features for each customer.
        """
        # Check if input dataframes are empty
        if dataframe_filtered.empty:
            raise ValueError("Input DataFrame 'dataframe_filtered' is empty")
        if dataframe_purchases.empty:
            raise ValueError("Input DataFrame 'dataframe_purchases' is empty")
        # if dataframe_returns.empty:
        #     raise ValueError("Input DataFrame 'dataframe_returns' is empty")

        # Check if required columns are present in input dataframes
        required_columns = ['CustomerID', 'InvoiceDate', 'StockCode', 'Quantity', 'UnitPrice']
        for df, name in zip([dataframe_filtered, dataframe_purchases], ['dataframe_filtered', 'dataframe_purchases']):
            missing_columns = set(required_columns) - set(df.columns)
            if missing_columns:
                raise ValueError(f"Missing columns {missing_columns} in input DataFrame '{name}'")
        if 'CustomerID' not in dataframe_returns.columns:
            raise ValueError("Column 'CustomerID' not found in input DataFrame 'dataframe_returns'")
        if 'Quantity' not in dataframe_returns.columns:
            raise ValueError("Column 'Quantity' not found in input DataFrame 'dataframe_returns'")

        # Perform feature engineering
        df_fengi = keep_features(dataframe_filtered, ['CustomerID']).drop_duplicates(ignore_index=True)
        gross_revenue = calculate_gross_revenue(dataframe_purchases)
        df_recency = create_recency(dataframe_purchases, dataframe_filtered)
        df_qty_products = create_quantity_purchased(dataframe_purchases)
        df_freq = create_freq_purchases(dataframe_purchases)
        returns = create_qty_returns(dataframe_returns)

        # Merge dataframes
        dfs = [df_fengi, gross_revenue, df_recency, df_qty_products, df_freq, returns]
        df_fengi = reduce(lambda left,right: pd.merge(left, right, on='CustomerID', how='left'), dfs)

        # Fill NaN values
        df_fengi['qty_returns'] = df_fengi['qty_returns'].fillna(0)

        # Select final features and return dataframe
        features = ['CustomerID', 'gross_revenue', 'recency_days', 'qty_products', 'frequency', 'qty_returns']
        return keep_features(df_fengi, features).dropna()
    
    def table_exists(dataset_table_id: str) -> bool:
        client = bigquery.Client()

        try:
            client.get_table(dataset_table_id)  # Make an API request.
            return True
        except NotFound:
            return False
    
    def save_to_bigquery(dataframe: pd.DataFrame, project_name: str, dataset_table_name: str):
        client = bigquery.Client(project=project_name)

        # Load data to BQ
        job = client.load_table_from_dataframe(dataframe, dataset_table_name)
        job.result()
        
    def run_bq_query(sql: str, project_name: str) -> Union[str, pd.DataFrame]:
        """
        Run a BigQuery query and return the job ID or result as a DataFrame
        Args:
            sql: SQL query, as a string, to execute in BigQuery
        Returns:
            df: DataFrame of results from query,  or error, if any
        """
        
        bq_client = bigquery.Client(project=project_name)

        # Try dry run before executing query to catch any errors
        job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
        bq_client.query(sql, job_config=job_config)

        # If dry run succeeds without errors, proceed to run query
        job_config = bigquery.QueryJobConfig()
        client_result = bq_client.query(sql, job_config=job_config)

        job_id = client_result.job_id

        # Wait for query/job to finish running. then get & return data frame
        df = client_result.result().to_arrow().to_pandas()
        print(f"Finished job_id: {job_id}")
        return df

    logging.info('Carregando as tabelas da preparacao de dados')
    query_filtered = f"""SELECT *
                    FROM  `{PROJECT_ID}.{DATASET_ID}.{TABLE_FILTERED_TEMP_ID}`
                    WHERE InvoiceDate <= CURRENT_TIMESTAMP() """
    df_filtered = pd.read_gbq(query=query_filtered, 
                         project_id=PROJECT_NUMBER)
    
    query_purchases = f"""SELECT *
                    FROM  `{PROJECT_ID}.{DATASET_ID}.{TABLE_PURCHASES_TEMP_ID}`
                    WHERE InvoiceDate <= CURRENT_TIMESTAMP() """
    df_purchases = pd.read_gbq(query=query_purchases, 
                         project_id=PROJECT_NUMBER)
    
    query_returns = f"""SELECT *
                    FROM  `{PROJECT_ID}.{DATASET_ID}.{TABLE_RETURNS_TEMP_ID}`"""
    df_returns = pd.read_gbq(query=query_returns, 
                         project_id=PROJECT_NUMBER) 
    
    logging.info('Transformando a coluna InvoiceDate para o tipo DATE')
    column_to_date(df_filtered, 'InvoiceDate')
    column_to_date(df_purchases, 'InvoiceDate')
    
    logging.info(f'Iniciando a verificacao de existencia da tabela: {DATASET_ID}.{TABLE_ID}')
    # Verifica se a tabela existe
    if table_exists(f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"):
        logging.info('Tabela existente, inicia insercao de novos dados')
        
        sql_new_customers = f"""SELECT
                                      DISTINCT CustomerID
                                    FROM
                                      `{PROJECT_ID}.{DATASET_ID}.{TABLE_RAW_ID}`
                                    WHERE
                                      InvoiceDate = CURRENT_DATE()"""
        new_customers = pd.read_gbq(sql_new_customers, project_id=PROJECT_NUMBER)['CustomerID'].tolist()
       
        df_fengi = run_feature_engineering(df_filtered.loc[df_filtered['CustomerID'].isin(new_customers)], 
                                           df_purchases.loc[df_purchases['CustomerID'].isin(new_customers)], 
                                           df_returns.loc[df_returns['CustomerID'].isin(new_customers)])
        
        # Inserir os dados na tabela usando SQL
        pandas_gbq.to_gbq(df_fengi, f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}', project_id=PROJECT_NUMBER, if_exists='append')
        sql_update_new_customer = f"""
                                        UPDATE `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
                                        SET values = generate_uuid(),
                                        timestamp = current_timestamp()
                                        WHERE CustomerID IN {tuple(new_customers)}"""
        logging.info(sql_update_new_customer)
        run_bq_query(sql_update_new_customer, project_name=PROJECT_ID)
    else:
        # Cria a tabela e insere os dados
        logging.info('Tabela nao existente, cria a tabela e inicia insercao dos dados')
        df_fengi = run_feature_engineering(df_filtered, df_purchases, df_returns)
        pandas_gbq.to_gbq(df_fengi, f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}', project_id=PROJECT_NUMBER, if_exists='fail')
        query = f"""CREATE OR REPLACE TABLE `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}` as (
                    SELECT
                        *,
                        generate_uuid() as values,
                        current_timestamp() as timestamp,
                    FROM 
                        `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`);"""
        run_bq_query(query, project_name=PROJECT_ID)

## 4. Componente: Feature Store

In [178]:
@component(packages_to_install=["google-cloud-aiplatform", "pyarrow"],
    base_image="python:3.10.6",
    output_component_file="feature_store.yaml")
def create_feature_store():
    import os
    import logging

    from google.cloud import aiplatform
    from google.cloud.aiplatform import Feature, Featurestore
    #https://medium.com/google-cloud/how-do-you-use-feature-store-in-the-mlops-process-on-vertex-ai-802ddca2cac4
    #https://www.youtube.com/watch?v=jXD8Sfx4hvQ
    
    logging.info('Iniciando o componente')
    
    PROJECT_ID = "comunidade-ds-420801"
    DATASET_ID = "tabela_teste"
    TABLE_ID = 'dados_engenharia_features'
    FEATURESTORE_ID="ecommerce_feature_store"
    VALUES_ENTITY_ID = "values"
    VALUES_BQ_SOURCE_URI = f"bq://{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
    FEATURE_TIME = 'timestamp'
    REGION = "us-central1"
    
    project_number = os.environ["CLOUD_ML_PROJECT_ID"]
    aiplatform.init(project = project_number, location= REGION)
    
    try:
        # Checks if there is already a Featurestore
        ecommerce_feature_store = aiplatform.Featurestore(f"{FEATURESTORE_ID}")
        logging.info(f"""A feature store {FEATURESTORE_ID} ja existe.""")
    except:
        # Creates a Featurestore
        logging.info(f"""Criando a feature store: {FEATURESTORE_ID}.""")
        ecommerce_feature_store = aiplatform.Featurestore.create(
            featurestore_id=f"{FEATURESTORE_ID}",
            online_store_fixed_node_count=1,
            sync=True,
        )
        
    try:
        # get entity type, if it already exists
        values_entity_type = ecommerce_feature_store.get_entity_type(entity_type_id=VALUES_ENTITY_ID)
    except:
        # else, create entity type
        values_entity_type = ecommerce_feature_store.create_entity_type(
            entity_type_id=VALUES_ENTITY_ID, description="Values Entity", sync=True
        )
    
    values_feature_configs = {
                                "gross_revenue": {
                                    "value_type": "DOUBLE",
                                    "description": "Gross Revenue",
                                    "labels": {"status": "passed"},
                                },
                                "recency_days": {
                                    "value_type": "DOUBLE",
                                    "description": "Recency Days",
                                    "labels": {"status": "passed"},
                                },
                                "qty_products": {
                                    "value_type": "DOUBLE",
                                    "description": "Quantity products",
                                    "labels": {"status": "passed"},
                                },
                                "frequency": {
                                    "value_type": "DOUBLE",
                                    "description": "Frequency",
                                    "labels": {"status": "passed"},
                                },
                                "qty_returns": {
                                    "value_type": "INT64",
                                    "description": "Quantity returns",
                                    "labels": {"status": "passed"},
                            }}

    values_feature_ids = values_entity_type.batch_create_features(
        feature_configs=values_feature_configs, sync=True
    )
    
    VALUES_FEATURES_IDS = [feature.name for feature in values_feature_ids.list_features()]
    
    logging.info(f"""Ingerindo os dados na feature store: {FEATURESTORE_ID}.""")
    values_entity_type.ingest_from_bq(
                                        feature_ids=VALUES_FEATURES_IDS,
                                        feature_time=FEATURE_TIME,
                                        bq_source_uri=VALUES_BQ_SOURCE_URI,
                                        entity_id_field=VALUES_ENTITY_ID,
                                        disable_online_serving=True,
                                        worker_count=2,
                                        sync=True,
                                    )
    # enable api: https://console.developers.google.com/apis/api/cloudresourcemanager.googleapis.com/overview?project=343941956592%22
    #https://aiinpractice.com/gcp-mlops-vertex-ai-feature-store/
    #https://medium.com/hacking-talent/vertexais-feature-store-for-dummies-3d798b45ece4

# 5. Componente: Utilizar feature store batch

In [179]:
@component(packages_to_install=["google-cloud-aiplatform",
                                "google-cloud-bigquery", 
                                "db-dtypes",
                                "pandas"],
    base_image="python:3.10.6",
    output_component_file="batch_serve_fs.yaml")
def create_batch_serve_fs():
    import os
    import logging
    from typing import Union
    
    import pandas as pd
    from google.cloud import bigquery
    from google.cloud import aiplatform
    
    PROJECT_ID = "comunidade-ds-420801"
    DATASET_ID = "tabela_teste"
    REGION = "us-central1"
    TABLE_INSTACES_ID = "read_instances"
    TABLE_ID = 'dados_engenharia_features'
    SERVING_FEATURE_IDS = {"values": ["*"]}
    TABLE_TRAIN_ID = "dados_treinamento"
    TRAIN_TABLE_URI = f"bq://{PROJECT_ID}.{DATASET_ID}.{TABLE_TRAIN_ID}"
    FEATURE_STORE_NAME = 'ecommerce_feature_store'
    project_number = os.environ["CLOUD_ML_PROJECT_ID"]
    aiplatform.init(project = project_number, location = REGION)
    
    def run_bq_query(sql: str, project_name: str) -> Union[str, pd.DataFrame]:
        """
        Run a BigQuery query and return the job ID or result as a DataFrame
        Args:
            sql: SQL query, as a string, to execute in BigQuery
        Returns:
            df: DataFrame of results from query,  or error, if any
        """
        
        bq_client = bigquery.Client(project=project_name)

        # Try dry run before executing query to catch any errors
        job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
        bq_client.query(sql, job_config=job_config)

        # If dry run succeeds without errors, proceed to run query
        job_config = bigquery.QueryJobConfig()
        client_result = bq_client.query(sql, job_config=job_config)

        job_id = client_result.job_id

        # Wait for query/job to finish running. then get & return data frame
        df = client_result.result().to_arrow().to_pandas()
        print(f"Finished job_id: {job_id}")
        return df

    read_instances_query = f"""
                CREATE OR REPLACE TABLE `{PROJECT_ID}.{DATASET_ID}.{TABLE_INSTACES_ID}` as (
                    SELECT   
                        values,
                        timestamp,
                    FROM 
                        `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}` 
                );
                """
    
    logging.info("Criando a tabela de instancia")
    run_bq_query(read_instances_query, project_name=PROJECT_ID)
    
    logging.info(f"Iniciando o fornecimento das features da: {FEATURE_STORE_NAME}")
    ecommerce_feature_store = aiplatform.Featurestore(featurestore_name=FEATURE_STORE_NAME)
    
    logging.info(f"Executando o comando para o destino: {TRAIN_TABLE_URI} a partir da tabela: {TABLE_INSTACES_ID}")
    ecommerce_feature_store.batch_serve_to_bq(
        bq_destination_output_uri=TRAIN_TABLE_URI,
        serving_feature_ids=SERVING_FEATURE_IDS,
        read_instances_uri=f"bq://{PROJECT_ID}.{DATASET_ID}.{TABLE_INSTACES_ID}",
    )

# 6. Componente: Treinamento do Modelo

In [180]:
@component(packages_to_install=["google-cloud-aiplatform", 
                                "pandas", 
                                "pyarrow", 
                                "scikit-learn",
                                "google-cloud-bigquery", 
                                "db-dtypes"],
    base_image="python:3.10.6",
    output_component_file="model_train.yaml")
def model_train(model: Output[Artifact]):
    import os
    import pickle
    import pathlib
    import logging
    import datetime
    from typing import Union
    
    import pandas as pd
    
    from google.cloud import bigquery
    from sklearn.cluster import KMeans
    from google.cloud import aiplatform
    from sklearn.pipeline import Pipeline
    from sklearn.decomposition import PCA
    from sklearn.preprocessing import MinMaxScaler
    
    logging.info("Iniciando o componente")
    
    PROJECT_ID = "comunidade-ds-420801"
    DATASET_ID = "tabela_teste"
    REGION = "us-central1"
    TABLE_TRAIN_ID = "dados_treinamento"
    FEATURES = ['qty_products', 'qty_returns', 'frequency','recency_days']
    TARGET = 'gross_revenue'
    DEPLOY_VERSION = "sklearn-cpu.1-0"
    FRAMEWORK = "scikit-learn"
    REGION_SPLITTED = "us-central1".split("-")[0]
    DEPLOY_IMAGE = f"{REGION_SPLITTED}-docker.pkg.dev/vertex-ai/prediction/{DEPLOY_VERSION}:latest"
    MODEL_NAME = 'model.pkl'
    N_CLUSTERS = 8
    scaler = MinMaxScaler()
    
    project_number = os.environ["CLOUD_ML_PROJECT_ID"]
    aiplatform.init(project = project_number, location = REGION)
    
    def run_bq_query(sql: str, project_name: str) -> Union[str, pd.DataFrame]:
        """
        Run a BigQuery query and return the job ID or result as a DataFrame
        Args:
            sql: SQL query, as a string, to execute in BigQuery
        Returns:
            df: DataFrame of results from query,  or error, if any
        """
        
        bq_client = bigquery.Client(project=project_name)

        # Try dry run before executing query to catch any errors
        job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
        bq_client.query(sql, job_config=job_config)

        # If dry run succeeds without errors, proceed to run query
        job_config = bigquery.QueryJobConfig()
        client_result = bq_client.query(sql, job_config=job_config)

        job_id = client_result.job_id

        # Wait for query/job to finish running. then get & return data frame
        df = client_result.result().to_arrow().to_pandas()
        print(f"Finished job_id: {job_id}")
        return df
    
    logging.info("Carregando dados para o treinamento")
    df = run_bq_query(f"select * from `{PROJECT_ID}.{DATASET_ID}.{TABLE_TRAIN_ID}`", project_name=PROJECT_ID)
    
    logging.info("Iniciando o treinamento")
      
    X = df[FEATURES].copy()
    y = df[TARGET]

    model_pipeline = Pipeline(
        [
            ("pca", PCA(n_components=2)),
            ("scaler", scaler),
            ("clustering", KMeans(n_clusters=N_CLUSTERS, random_state=42)),
        ]
    )
    
    model_pipeline.fit(X, y)
    
    logging.info("Criando o modelo output")
    model.metadata["framework"] = FRAMEWORK
    model.metadata["containerSpec"] = {
        "imageUri": DEPLOY_IMAGE
    }
    

    file_name = model.path + f"/{MODEL_NAME}"
    

    pathlib.Path(model.path).mkdir()
    with open(file_name, "wb") as file:
        pickle.dump(model_pipeline, file)

# 7. Componente: Predição

In [181]:
@component(packages_to_install=["google-cloud-aiplatform",
                               "google-cloud-bigquery", 
                                "db-dtypes", 
                                "pandas", 
                                "scikit-learn",
                                "pandas-gbq"
                               ],
    base_image="python:3.10.6",
    output_component_file="batch_prediction.yaml")
def batch_prediction(model: Input[Model]):
    import os
    import pickle
    import logging
    
    import pandas_gbq
    import pandas as pd
    from google.cloud import aiplatform

    PROJECT_ID = "comunidade-ds-420801"
    DATASET_ID = "tabela_teste"
    REGION = "us-central1"
    TABLE_TO_PREDICT_ID = "dados_treinamento"
    MODEL_NAME = 'model.pkl'
    TABLE_SAVE_PREDICTIONS_ID = "dados_preditos"
    FEATURES = ['qty_products', 'qty_returns', 'frequency','recency_days'] #mesma ordem do treinamento
    SQL_TO_PREDICT_DATA = f"""SELECT * 
                        FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_TO_PREDICT_ID}`"""
    project_number = os.environ["CLOUD_ML_PROJECT_ID"]
    
    aiplatform.init(project = project_number, location = REGION)

    logging.info("Iniciando o componente")
    
    logging.info(f"Carregando o modelo: {MODEL_NAME}")
    file_name = model.path + f"/{MODEL_NAME}"
    with open(file_name, "rb") as file:
        model_pipeline = pickle.load(file)
    
    logging.info("Carregando dados a serem preditos")
    predict_data = pd.read_gbq(SQL_TO_PREDICT_DATA, project_id=project_number)
    
    logging.info(f"Iniciando a predicao dos dados: {PROJECT_ID}.{DATASET_ID}.{TABLE_TO_PREDICT_ID}")
    labels = model_pipeline.predict(predict_data[FEATURES])
    predict_data['Clusters'] = labels
    
    
    logging.info(f"Substituindo os dados preditos: {PROJECT_ID}.{DATASET_ID}.{TABLE_SAVE_PREDICTIONS_ID}")
    pandas_gbq.to_gbq(predict_data, f'{PROJECT_ID}.{DATASET_ID}.{TABLE_SAVE_PREDICTIONS_ID}', project_id=project_number, if_exists='replace')

# 8. Pipeline

In [182]:
PIPELINE_ROOT = 'gs://pipeline-ecommerce'

In [183]:
@pipeline(pipeline_root=PIPELINE_ROOT,
         name="ecommerce-clustering-pipeline")
def ecommerce_clustering_pipeline():
    PROJECT_ID = "comunidade-ds-420801"
    DATASET_ID = "tabela_teste"
    REGION = "us-central1"
    MODEL_NAME = "ecommerce-clustering"
    
    dataset_op = captura_dados()
    data_prep_op = data_preparation()
    data_prep_op.after(dataset_op)
    
    feature_engineering_op = feature_engineering()    
    feature_engineering_op.after(data_prep_op)
    
    feature_store_op = create_feature_store()
    feature_store_op.after(feature_engineering_op)
    
    batch_serve_fs_op = create_batch_serve_fs()
    batch_serve_fs_op.after(feature_store_op)
    
    model_train_op = model_train()
    model_train_op.after(batch_serve_fs_op)
    
    model_upload_op = gcc_aip.ModelUploadOp(
        project=PROJECT_ID,
        location=REGION,
        display_name=f"{MODEL_NAME}",
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
        #unmanaged_container_model=model_train_op.outputs["model"],
    ).after(model_train_op)
    
    batch_predict_op = batch_prediction(model=model_train_op.outputs["model"])

In [186]:
compiler.Compiler().compile(
    pipeline_func=ecommerce_clustering_pipeline,
    package_path='ecommerce_pipeline.json')

In [187]:
job = pipeline_jobs.PipelineJob(
    display_name="ecommerce-pipeline",
    template_path="ecommerce_pipeline.json",
    enable_caching=False
)

job.run(sync=False)

In [None]:
from google.cloud.aiplatform import Feature, Featurestore
fs = Featurestore(
     featurestore_name="ecommerce_feature_store",
     project="gcp-vertex",
     location="us-central1",
)
fs.delete(force=True)