# EXTRACT

In [1]:
import awswrangler as awr
import logging
import pandas as pd
import os

logging.basicConfig(
    level=logging.INFO,  # Exibe mensagens a partir de INFO
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler()  # Garante logs no console
    ]
)

logging.info('\n ----------------------------------------------------------------------------------')
logging.info('\n Executando Rotina - RELATORIO ATIVACOES PENDENTES')

class Extract:

    # FUNÇÃO CONSTRUTORA
    def __init__(self):
        #self.path = os.path.dirname(__file__) #atributo de instância que obtém o caminho do diretório dirname
        self.path = r"C:\Users\raphael.almeida\Documents\Processos\placas_acompanhamento"

    # FUNÇÃO PARA EXTRAÇÃO DE DADOS DE: ATIVAÇÕES
    def extract_all_ativacoes(self):
        """
        Retorna um DataFrame com os dados de ativações.
        """
        try:
            dir_query = os.path.join(self.path, 'sql', 'all_boards_ATIVOS.sql')
            with open(dir_query, 'r') as file:
                query = file.read()
            df_ativacoes = awr.athena.read_sql_query(query, database='silver')
            logging.info('\n ----------------------------------------------------------------------------------')
            logging.info('\n Relatorio ativacoes (Vivante)  - Dados Extraidos com sucesso!')
            return df_ativacoes
        except Exception as e:
            logging.info('\n ----------------------------------------------------------------------------------')
            logging.info(f'\n Falha ao Extrair relatorio ativacoes (Viavante): {e}')
            return None

    # FUNÇÃO PARA EXTRAÇÃO DE DADOS DE: CANCELAMENTO INTEGRAL
    def extract_all_cancelamentos(self):
        """
        Retorna um DataFrame com os dados de cancelamentos.
        """
        try:
            dir_query = os.path.join(self.path, 'sql', 'all_boards_CANCELADOS.sql')
            with open(dir_query, 'r') as file:
                query = file.read()
            df_cancelamentos = awr.athena.read_sql_query(query, database='silver')
            logging.info('\n ----------------------------------------------------------------------------------')
            logging.info('\n Relatorio cancelamentos  - Dados Extraidos com sucesso!')
            return df_cancelamentos
        except Exception as e:
            logging.info('\n ----------------------------------------------------------------------------------')
            logging.info(f'\n Falha ao Extrair relatorio cancelamentos: {e}')
            return None

    # FUNÇÃO PARA EXTRAÇÃO DE DADOS DE: CONFERÊNCIA
    def extract_conf_boards(self):
        """
        Retorna um DataFrame com os dados de conferência.
        """
        try:
            dir_query = os.path.join(self.path, 'sql', 'listagem_mestra.sql')
            with open(dir_query, 'r') as file:
                query = file.read()
            df_conferencia = awr.athena.read_sql_query(query, database='silver')
            logging.info('\n ----------------------------------------------------------------------------------')
            logging.info('\n Relatorio conferência  - Dados Extraidos com sucesso!')
            return df_conferencia
        except Exception as e:
            logging.info('\n ----------------------------------------------------------------------------------')
            logging.info(f'\n Falha ao Extrair relatorio conferência: {e}')
            return None

# Exemplo de como declarar as variáveis dos returns dos métodos para usar depois:
# Basta instanciar a classe e chamar os métodos, atribuindo o retorno a variáveis:

extract = Extract()
df_ativacoes = extract.extract_all_ativacoes()
df_cancelamentos = extract.extract_all_cancelamentos()
df_conferencia = extract.extract_conf_boards()

# Agora você pode usar df_ativacoes, df_cancelamentos, df_conferencia normalmente no seu código.


2025-08-22 09:01:11,392 - INFO - 
 ----------------------------------------------------------------------------------
2025-08-22 09:01:11,392 - INFO - 
 Executando Rotina - RELATORIO ATIVACOES PENDENTES
2025-08-22 09:01:11,414 - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2025-08-22 09:01:13,314 - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2025-08-22 09:01:14,726 - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2025-08-22 09:01:17,081 - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2025-08-22 09:01:18,002 - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2025-08-22 09:01:21,001 - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2025-08-22 09:01:22,933 - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2025-08-22 09:01:24,802 - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2025-08-22 09

In [2]:
df_ativacoes[df_ativacoes['empresa'] == 'Tag'].head()

Unnamed: 0,placa,chassi,id_placa,id_veiculo,id_carroceria,matricula,conjunto,unidade,consultor,status,...,empresa,coverage,beneficio,categoria,tipo_categoria,data_ativacao_beneficio,status_beneficio,data_atualizacao,consultor_ativo,id_beneficio
0,MKC0G31,9534N8243BR157808,1651,1651,,277,274,UNIDADE JOINVILLE,Ana Beatriz Prioste,ATIVO,...,Tag,678515,Rastreador p/ Veículo,PADRÃO,PADRÃO,2025-08-06,ATIVO,2025-08-15,S,44
1,BTO1021,9BM6931084B375803,1676,1676,,278,277,UNIDADE CAMPO GRANDE,Daiane Cristina Veiga da Silva,ATIVO,...,Tag,678592,Assistência a Reparos (COMPLEMENTO),CARROCERIA BAÚ,PADRÃO - 4%,2025-08-08,ATIVO,2025-08-15,S,39
2,IKK6B93,9EP07082021000077,19,0,19.0,2,4,UNIDADE CASCAVEL,Thiago Rosa De Figueiredo,ATIVO,...,Tag,676370,Rastreador p/ Reboque,PADRÃO,PADRÃO,2025-08-06,ATIVO,2025-08-13,S,45
3,BDE7E35,9BVRG40D3LE870664,19,19,,2,4,UNIDADE CASCAVEL,Thiago Rosa De Figueiredo,ATIVO,...,Tag,676367,Rastreador p/ Veículo,PADRÃO,PADRÃO,2025-08-06,ATIVO,2025-08-13,S,44
4,BDE7E35,9BVRG40D3LE870664,19,19,,2,4,UNIDADE CASCAVEL,Thiago Rosa De Figueiredo,ATIVO,...,Tag,676366,APP,PADRÃO,PADRÃO,2025-08-06,ATIVO,2025-08-13,S,48


# TRANSFORM

In [30]:
# IMPORTANDO MÓDULOS E PACOTES
import pandas as pd
import numpy as np
import datetime as dt
import logging

logging.basicConfig(
    level=logging.INFO,  # Exibe mensagens a partir de INFO
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler()  # Garante logs no console
    ]
)



def board_status_treatment(df, df_conf):

    try:
        status_filter_list = ['CANCELADO', 'CANCELADA', 'FINALIZADO', 'FINALIZADA', 'NAO RENOVADO']

        if df is None or df.empty:
            logging.info('\n ----------------------------------------------------------------------------------')
            logging.info('DataFrame vazio, retornando DataFrame vazio')
            return pd.DataFrame()

        if df_conf is None or df_conf.empty:
            logging.info('\n ----------------------------------------------------------------------------------')
            logging.info('DataFrame de conferência vazio, retornando DataFrame original')
            return df

        required_columns = ['placa', 'chassi', 'empresa', 'status']
        missing_columns = [col for col in required_columns if col not in df.columns]
        if missing_columns:
            logging.info('\n ----------------------------------------------------------------------------------')
            logging.info(f'Colunas necessárias ausentes no DataFrame: {missing_columns}')
            return df

        logging.info('\n ----------------------------------------------------------------------------------')
        logging.info(f'Shape do DataFrame antes do tratamento: {df.shape}')

        df['status_beneficio'] = 'NOVO'
        df['migration_from'] = np.nan

        df_conf_grouped = df_conf.groupby(['chassi', 'beneficio']).agg({
            'empresa': list,
            'data_ativacao_beneficio': list,
            'status_beneficio': list
        }).reset_index()

        conf_dict = df_conf_grouped.set_index(['chassi', 'beneficio']).to_dict('index')

        for chassi, beneficio in df[['chassi', 'beneficio']].drop_duplicates().values:
            if (chassi, beneficio) in conf_dict:
                conf_data = conf_dict[(chassi, beneficio)]

                if len(conf_data['empresa']) > 1:
                    dates = sorted([d for d in conf_data['data_ativacao_beneficio'] if pd.notna(d)])

                    if len(dates) > 1:
                        penultima_data = dates[-2]
                        idx_penultima = conf_data['data_ativacao_beneficio'].index(penultima_data)
                        status_penultimo = conf_data['status_beneficio'][idx_penultima]
                        empresa_penultima = conf_data['empresa'][idx_penultima]

                        mask = (df['chassi'] == chassi) & (df['beneficio'] == beneficio)

                        if status_penultimo not in status_filter_list:
                            if empresa_penultima != df.loc[mask, 'empresa'].iloc[0]:
                                df.loc[mask, 'status_beneficio'] = 'MIGRAÇÃO'
                                df.loc[mask, 'migration_from'] = empresa_penultima
                            else:
                                df.loc[mask, 'status_beneficio'] = 'RENOVAÇÃO'
                        else:
                            df.loc[mask, 'status_beneficio'] = 'REATIVAÇÃO'

        logging.info('\n ----------------------------------------------------------------------------------')
        logging.info(f'Processamento concluído com sucesso!')

        return df

    except Exception as e:
        logging.info('\n ----------------------------------------------------------------------------------')
        logging.info(f'Falha no tratamento de status das placas ativadas: {str(e)}')
        return df


try:
    today = dt.date.today()
    yesterday = today - dt.timedelta(days=1)
    friday = today - dt.timedelta(days=3)
    df_final_ativacoes = pd.DataFrame()

    # Tratamento dos benefícios e filtragem
    try:
        df_ativacoes['data_ativacao_beneficio'] = pd.to_datetime(df_ativacoes['data_ativacao_beneficio']).dt.date
        df_ativacoes['beneficio'] = df_ativacoes['beneficio'].replace(
            'REPARAÇÃO OU REPOSIÇÃO DO VEÍCULO', 'CASCO (VEÍCULO)'
        ).replace(
            'REPARAÇÃO OU REPOSIÇÃO DO (SEMI)REBOQUE', 'CASCO (R/SR)'
        ).replace(
            'REPARAÇÃO OU REPOSIÇÃO DO COMPLEMENTO', 'CASCO (COMPLEMENTO)'
        )
        df_ativacoes['migration_from'] = np.nan
        df_ativacoes = df_ativacoes.loc[df_ativacoes['beneficio'].str.contains('(CASCO|TERCEIRO|Assistência a reparos)', regex=True, case = False)]

        df_conferencia['beneficio'] = df_conferencia['beneficio'].replace(
            'REPARAÇÃO OU REPOSIÇÃO DO VEÍCULO', 'CASCO (VEÍCULO)'
        ).replace(
            'REPARAÇÃO OU REPOSIÇÃO DO (SEMI)REBOQUE', 'CASCO (R/SR)'
        ).replace(
            'REPARAÇÃO OU REPOSIÇÃO DO COMPLEMENTO', 'CASCO (COMPLEMENTO)'
        )
    except Exception as e:
        logging.info('\n ----------------------------------------------------------------------------------')
        logging.info(f'Falha ao filtrar dados de cancelamentos referente ao dia anterior: {e}')

    # Cancelamentos do dia anterior ou desde sexta
    try:
        df_cancelamentos['data_cancelamento'] = pd.to_datetime(df_cancelamentos['data_cancelamento']).dt.date
        if today.weekday() == 0:
            df_cancelamentos = df_cancelamentos[df_cancelamentos['data_cancelamento'].between(friday, today)]
        else:
            df_cancelamentos = df_cancelamentos[df_cancelamentos['data_cancelamento'] == yesterday]

        df_cancelamentos = df_cancelamentos.sort_values(by='data_cancelamento', ascending=True)
    except Exception as e:
        logging.info('\n ----------------------------------------------------------------------------------')
        logging.info(f'Falha ao filtrar dados de cancelamentos referente ao dia anterior: {e}')

    # Ativações do dia anterior ou desde sexta
    try:
        if not df_ativacoes.empty:
            if today.weekday() != 0:
                df_ativacoes_menos_ontem = df_ativacoes.loc[
                    ~(df_ativacoes['data_ativacao_beneficio'].isin([yesterday, today]))
                ]
                df_ativacoes_dia_anterior = df_ativacoes[df_ativacoes['data_ativacao_beneficio'] == yesterday]
            else:
                df_ativacoes_menos_ontem = df_ativacoes.loc[
                    (df_ativacoes['data_ativacao_beneficio'] < friday)
                ]
                df_ativacoes_dia_anterior = df_ativacoes[
                    df_ativacoes['data_ativacao_beneficio'].between(friday, yesterday)
                ]

            df_ativacoes_dia_anterior_tratado = board_status_treatment(
                df=df_ativacoes_dia_anterior, df_conf=df_conferencia
            )
            df_ativacoes_atualizado = pd.concat([df_ativacoes_menos_ontem, df_ativacoes_dia_anterior_tratado])

            logging.info('\n ----------------------------------------------------------------------------------')
            logging.info(f'Número de registros ativos na carteira tratado com os dados do dia anterior.')
    except Exception as e:
        logging.info('\n ----------------------------------------------------------------------------------')
        logging.info(f'Falha ao incluir registros ativos referente ao dia anterior na contagem . Revise o código: {e}')

    # Último tratamento do DataFrame de ativação
    try:
        df_final_ativacoes = df_ativacoes_atualizado[[
            'placa', 'chassi', 'id_placa', 'id_veiculo', 'id_carroceria', 'matricula', 'conjunto', 'unidade',
            'consultor', 'status_beneficio', 'cliente', 'data_registro', 'data_ativacao_beneficio', 'suporte',
            'data_filtro', 'empresa', 'migration_from'
        ]]
        df_final_ativacoes = df_final_ativacoes.drop_duplicates(subset='chassi')

        logging.info('\n ----------------------------------------------------------------------------------')
        logging.info(f'Processo final de tratamento de dataframe de ativação realizado com sucesso!')
    except Exception as e:
        logging.info('\n ----------------------------------------------------------------------------------')
        print(f'Falha ao tratar o dataframe de ativação: {e}')

    # Tratando dados nulos nos DataFrames
    try:
        df_final_ativacoes['placa'] = df_final_ativacoes['placa'].fillna('SEM-PLACA')
        df_final_ativacoes['chassi'] = df_final_ativacoes['chassi'].fillna('NULL')
        df_final_ativacoes['id_placa'] = df_final_ativacoes['id_placa'].fillna(0)
        df_final_ativacoes['id_veiculo'] = df_final_ativacoes['id_veiculo'].fillna(0)
        df_final_ativacoes['id_carroceria'] = df_final_ativacoes['id_carroceria'].fillna(0)
        df_final_ativacoes['matricula'] = df_final_ativacoes['matricula'].fillna(0)
        df_final_ativacoes['conjunto'] = df_final_ativacoes['conjunto'].fillna(0)
        df_final_ativacoes['unidade'] = df_final_ativacoes['unidade'].fillna('NULL')
        df_final_ativacoes['consultor'] = df_final_ativacoes['consultor'].fillna('NULL')
        df_final_ativacoes['status'] = df_final_ativacoes.get('status', pd.Series(['NULL']*len(df_final_ativacoes))).fillna('NULL')
        df_final_ativacoes['cliente'] = df_final_ativacoes['cliente'].fillna('NULL')
        df_final_ativacoes['data_registro'] = df_final_ativacoes['data_registro'].fillna(pd.Timestamp('1900-01-01'))
        if 'data_ativacao' in df_final_ativacoes.columns:
            df_final_ativacoes['data_ativacao'] = df_final_ativacoes['data_ativacao'].fillna(pd.Timestamp('1900-01-01'))
        df_final_ativacoes['suporte'] = df_final_ativacoes['suporte'].fillna('NULL')
        df_final_ativacoes['data_filtro'] = df_final_ativacoes['data_filtro'].fillna(pd.Timestamp('1900-01-01'))
        df_final_ativacoes['empresa'] = df_final_ativacoes['empresa'].fillna('NULL')

        df_cancelamentos['placa'] = df_cancelamentos['placa'].fillna('SEM-PLACA')
        df_cancelamentos['chassi'] = df_cancelamentos['chassi'].fillna('NULL')
        df_cancelamentos['id_placa'] = df_cancelamentos['id_placa'].fillna(0)
        df_cancelamentos['id_veiculo'] = df_cancelamentos['id_veiculo'].fillna(0)
        df_cancelamentos['id_carroceria'] = df_cancelamentos['id_carroceria'].fillna(0)
        df_cancelamentos['matricula'] = df_cancelamentos['matricula'].fillna(0)
        df_cancelamentos['conjunto'] = df_cancelamentos['conjunto'].fillna(0)
        df_cancelamentos['unidade'] = df_cancelamentos['unidade'].fillna('NULL')
        df_cancelamentos['status'] = df_cancelamentos['status'].fillna('NULL')
        df_cancelamentos['cliente'] = df_cancelamentos['cliente'].fillna('NULL')
        df_cancelamentos['data'] = df_cancelamentos['data'].fillna(pd.Timestamp('1900-01-01'))
        df_cancelamentos['data_cancelamento'] = df_cancelamentos['data_cancelamento'].fillna(pd.Timestamp('1900-01-01'))
        df_cancelamentos['usuario_cancelamento'] = df_cancelamentos['usuario_cancelamento'].fillna('NULL')
        df_cancelamentos['data_filtro'] = df_cancelamentos['data_filtro'].fillna(pd.Timestamp('1900-01-01'))
        df_cancelamentos['empresa'] = df_cancelamentos['empresa'].fillna('NULL')

        logging.info('\n ----------------------------------------------------------------------------------')
        logging.info('\n Processo de Transformacao de Dados concluido com sucesso!')
    except Exception as e:
        logging.info('\n ----------------------------------------------------------------------------------')
        logging.info(f'Falha ao realizar tratamento de dados: {e}')

except Exception as e:
    logging.info('\n ----------------------------------------------------------------------------------')
    logging.info(f'Falha ao realizar tratamento de dados: {e}')



  df_ativacoes = df_ativacoes.loc[df_ativacoes['beneficio'].str.contains('(CASCO|TERCEIRO|Assistência a reparos)', regex=True, case = False)]
2025-08-22 09:48:47,038 - INFO - 
 ----------------------------------------------------------------------------------
2025-08-22 09:48:47,038 - INFO - Shape do DataFrame antes do tratamento: (368, 26)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['status_beneficio'] = 'NOVO'
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['migration_from'] = np.nan
  df.loc[mask, 'migration_from'] = empresa_penultima
2025-08-22 0

In [32]:
df_final_ativacoes.shape[0]

32830

In [29]:
df_final_ativacoes = df_ativacoes_atualizado[[
    'placa', 'chassi', 'id_placa', 'id_veiculo', 'id_carroceria', 'matricula', 'conjunto', 'unidade',
    'consultor', 'status_beneficio', 'cliente', 'data', 'data_ativacao_beneficio', 'suporte',
    'data_filtro', 'empresa', 'migration_from'
]]
df_final_ativacoes = df_final_ativacoes.drop_duplicates(subset='chassi')

KeyError: "['data'] not in index"