# Fluxo Novo

Ordem das células

- Importando base de dados
- Definindo lojas que estarão no relatório
- Definindo dia do relatório
- Definindo emails que receberão o relatório
- Rodando o código para a criação do relatório
- Preparando o html
- Rodando Mock do disparador de email e disparando email

## Importando base de dados


In [20]:
# -> Rodar tudo para ter os dataframes corretos
from commom.database.queries.query_lojas import QUERY_LOJAS
from commom.database.queries.query_vendas_nf_novo import QUERY_VENDAS_NF_NOVO
from commom.database.queries.query_metas import QUERY_METAS
from commom.database.queries.query_parcelas import QUERY_PARCELAS
from commom.database.queries.query_vendas_nf import QUERY_VENDAS_NF
from commom.database.queries.query_vendedores import QUERY_VENDEDORES
from commom.database.queries.query_loja_vendedores import QUERY_LOJA_VENDEDORES
from commom.kpi_data.kpi_data_manager import DataHandler, KpiDataManager
from typing import Tuple
import numpy as np
import pandas as pd
import datetime
from datetime import timedelta
from innovation_messenger import Messenger
import os
from innovation_messenger import IEmailProperties
import missingno as msno

manager = KpiDataManager()
handler = DataHandler()

# Service
## -> inicio
###     -> fetch data from local pickle
df_fato_vendas = handler.read_from_local_pickle('df_fato_vendas.pkl')
df_lojas = handler.read_from_local_pickle('df_lojas.pkl')
df_vendedores = handler.read_from_local_pickle('df_vendedores.pkl')
df_vendas = handler.read_from_local_pickle('df_vendas.pkl')
df_parcelas = handler.read_from_local_pickle('df_parcelas.pkl')
df_metas = handler.read_from_local_pickle('df_metas.pkl')
df_vendedor_loja = handler.read_from_local_pickle('df_vendedor_loja.pkl')

###     -> fetch data from bq
# df = handler.read_from_bigquery(QUERY_VENDAS_NF_NOVO)
# df_lojas = handler.read_from_bigquery(QUERY_LOJAS)
# df_vendedores = handler.read_from_bigquery(QUERY_VENDEDORES)
# df_vendas = handler.read_from_bigquery(QUERY_VENDAS_NF)
# df_parcelas = handler.read_from_bigquery(QUERY_PARCELAS)
# df_metas = handler.read_from_bigquery(QUERY_METAS).rename(
#     columns={"cpf_vendedor": "cpf_vendedor_inteiro", "loja": "distributorId"}
# )
# df_vendedor_loja = handler.read_from_bigquery(QUERY_LOJA_VENDEDORES)

## Definindo lojas que estarão no relatório

In [21]:
lojaAtual = '10H1'
print('loja', lojaAtual)

loja 10H1


## Definindo data do relatório

In [22]:
yesterday_date = datetime.datetime(2025,1,20)
print('dia escolhido ', yesterday_date)

dia escolhido  2025-01-20 00:00:00


## Rodando código para criação do relatório

In [23]:
print('dia escolhido ', yesterday_date)
print('loja', lojaAtual)

def _abreviate_vendedor_name(full_name: str) -> str:
    full_name = full_name.replace("   ", " ")
    full_name = full_name.replace("  ", " ")

    if len(full_name.title().split(" ")) > 1:
        splited_name = full_name.title().split(" ")
        nome_completo = splited_name[0]
        splited_name.pop(0)

        for name in splited_name:
            nome_completo += " " + name[0] + "."

        return nome_completo

    return full_name

# ----------------------------------------
# ----------------------------------------

def build_df_vendas():
    df_vendas_temp = df_fato_vendas.copy()

    PDV_TYPE_LIST = ['Propria']

    mask_pdv = df_vendas_temp["type"].isin(PDV_TYPE_LIST)
    maksNull = df_vendas_temp['distributorId'].isnull()
    df_vendas_pdv = df_vendas_temp[mask_pdv & ~maksNull]
    df_vendas_pdv = df_vendas_pdv.copy()
    df_vendas_pdv['createdAt'] = pd.to_datetime(df_vendas_pdv['createdAt'])
    
    renameDict = {
        'displayCode': 'displaycode',
        'vendedor_esforco': 'cpf_vendedor_inteiro',
        'esforco_data': 'invoice_date'
    }

    df_vendas_pdv = df_vendas_pdv.reset_index(drop=True)
    
    df_vendas_pdv['year'] = df_vendas_pdv['createdAt'].dt.year
    df_vendas_pdv['month'] = df_vendas_pdv['createdAt'].dt.month
    df_vendas_pdv['day'] = df_vendas_pdv['createdAt'].dt.day
    
    df_vendas_pdv.rename(columns=renameDict, inplace=True)
    return df_vendas_pdv

# ----------------------------------------
# ----------------------------------------

# Pode ser melhorador para já devolver o cpf_vendedor com a loja eo nome
def build_vendedor():
    df_nome_vendedor = df_vendedores[["name", "cpf"]]
    df_nome_vendedor.columns = ["vendedor", "cpf_vendedor_inteiro"]
    df_nome_vendedor.loc[:, "vendedor"] = df_nome_vendedor["vendedor"].apply(_abreviate_vendedor_name)
    return df_nome_vendedor

# ----------------------------------------
# ----------------------------------------

def build_masks_meta_vendedor(periodo,df_metas:pd.DataFrame, date: datetime.datetime):
    mask_meta = df_metas["tipo_meta"] == "Meta Vendedor PDV"
    if periodo == "Dia":
        mask_metas_yesterday = df_metas["date"] == date
        # mask_metas_yesterday = df_metas["date"] == yesterday_date
        mask_meta = mask_meta & mask_metas_yesterday
    
    return mask_meta

def build_kpis_dataframe_vendedor(kpi_period_:str, df_vendas_pdv_: pd.DataFrame):
    maskmeta = build_masks_meta_vendedor(periodo=kpi_period_, df_metas=df_metas, date=yesterday_date)
    AGG_DICT = {"discount": "sum", "netValue": "sum", "cost": "sum", "store_cost": "sum"}

    GROUP_BY_COLUMNS_DICT = {
        "Ano": ["distributorId", "year","cpf_vendedor_inteiro"],
        "Mês": ["distributorId", "year", "month", "cpf_vendedor_inteiro"],
        "Dia": ["distributorId", "year", "month", "cpf_vendedor_inteiro"],
    }

    group_by_columns = GROUP_BY_COLUMNS_DICT[kpi_period_]
    
    df_kpis: pd.DataFrame = df_vendas_pdv_.groupby(group_by_columns).agg(AGG_DICT).reset_index()

    df_kpis = df_kpis.merge(
        df_vendas_pdv_.groupby(group_by_columns)["fiscalsParentOrder"].nunique().reset_index().rename(columns={"fiscalsParentOrder": "tkts"}),
        how="left",
        )

    df_kpis = df_kpis.merge(
        df_vendas_pdv_.groupby(group_by_columns)["amount"].sum().reset_index(), 
        how="left"
    )

    df_kpis = df_kpis.merge(
        df_metas[maskmeta].groupby(group_by_columns)["meta"].sum().reset_index(), 
        how="left"
    )
        
    group_by_columns.remove('distributorId')
    df_kpis = df_kpis.merge(
        df_vendas_pdv_.groupby(group_by_columns)["invoice_date"]
        .nunique()
        .reset_index()
        .rename(columns={"invoice_date": "days_of_work"}),
        how="left",
    )

    if kpi_period_ == "Dia":
        df_kpis["Dia"] = yesterday_date.day

    if kpi_period_ == "Mês":
        df_kpis["tkts_per_day"] = df_kpis["tkts"] / df_kpis["days_of_work"].round(2)
    else:
        df_kpis["tkts_per_day"] = df_kpis["tkts"]

    df_kpis["type"] = kpi_period_

    df_kpis["mkp"] = (df_kpis["netValue"] / df_kpis["cost"]).round(2)
    
    df_kpis["tkt_medio"] = df_kpis["netValue"] / df_kpis["tkts"].round(2)
    df_kpis["meta_percentual"] = (df_kpis["netValue"] / df_kpis["meta"] * 100).round(2)
    df_kpis["PA"] = df_kpis["amount"] / df_kpis["tkts"].round(2)
    df_kpis["discount_percentual"] = ((df_kpis["discount"] / df_kpis["netValue"]) * 100).round(2)
    
    return df_kpis

# ----------------------------------------
# ----------------------------------------

def build_masks_meta_loja(periodo:str,  df_vendas_:pd.DataFrame ,df_metas:pd.DataFrame, date: datetime.datetime):
    mask_meta = df_metas["tipo_meta"] == "Meta Loja PDV"
    mask_vendas = pd.Series(np.ones(df_vendas_.shape[0]).astype(bool))
    
    if periodo == "Dia":
        mask_metas_yesterday = df_metas["date"] == date
        mask_yesterday_vendas = df_vendas_["invoice_date"] == yesterday_date
        mask_vendas = mask_yesterday_vendas
        

        mask_metas_yesterday = df_metas["date"] == yesterday_date
        mask_meta = mask_meta & mask_metas_yesterday

    return mask_vendas, mask_meta


def build_kpis_dataframe_loja(kpi_period_, df_vendas_pdv_:pd.DataFrame):
    maskVendas, maskMeta = build_masks_meta_loja(periodo=kpi_period_, df_vendas_= df_vendas_pdv, df_metas=df_metas, date=yesterday_date)
    AGG_DICT = {"discount": "sum", "netValue": "sum", "cost": "sum"}

    GROUP_BY_COLUMNS_DICT = {
        "Ano": ["distributorId", "year"],
        "Mês": ["distributorId", "year", "month"],
        "Dia": ["distributorId", "year", "month", 'day'],
    }

    group_by_columns = GROUP_BY_COLUMNS_DICT[kpi_period_]
    
    df_kpis: pd.DataFrame = df_vendas_pdv_[maskVendas].groupby(group_by_columns).agg(AGG_DICT).reset_index()

    df_kpis = df_kpis.merge(
        df_vendas_pdv_.groupby(group_by_columns)["fiscalsParentOrder"].nunique().reset_index().rename(columns={"fiscalsParentOrder": "tkts"}),
        how="left",
        )

    df_kpis = df_kpis.merge(
        df_vendas_pdv_.groupby(group_by_columns)["amount"].sum().reset_index(), 
        how="left"
    )

    df_kpis = df_kpis.merge(
        df_metas[maskMeta].groupby(group_by_columns)["meta"].sum().reset_index(), 
        how="left"
    )
    
    if kpi_period_ == "Dia":
        df_kpis["Dia"] = yesterday_date.day
    
    if kpi_period_ == "Dia":
        df_kpis["tkts_per_day"] = df_kpis["tkts"]
    if kpi_period_ == "Mês":
        df_kpis["tkts_per_day"] = df_kpis["tkts"] / yesterday_date.day
    if kpi_period_ == "Ano":
        df_kpis["tkts_per_day"] = df_kpis["tkts"] / yesterday_date.timetuple().tm_yday 
     
    df_kpis["type"] = kpi_period_
    df_kpis["mkp"] = (df_kpis["netValue"] / df_kpis["cost"]).round(2)
    df_kpis["tkt_medio"] = (df_kpis["netValue"] / df_kpis["tkts"]).round(2)
    df_kpis["meta_percentual"] = (df_kpis["netValue"] / df_kpis["meta"] * 100).round(2)
    df_kpis["PA"] = (df_kpis["amount"] / df_kpis["tkts"]).round(2)
    df_kpis["discount_percentual"] = ((df_kpis["discount"] / df_kpis["netValue"]) * 100).round(2)
    
    return df_kpis

# ----------------------------------------
# ----------------------------------------

def format_kpi_loja(id_loja: str, df_kpi:pd.DataFrame):
    mask = df_kpi["distributorId"] == id_loja

    df_store = df_kpi[mask].fillna(0).copy()
    df_store_current_month = df_store.loc[df_store.groupby(["type"])["month"].idxmax()].drop_duplicates()
    
    rename_dict = {
        "netValue": "Venda Loja (PDV)",
        "meta": "Meta Loja (PDV)",
        "meta_percentual": "Meta % (PDV)",
        "mkp": "Markup",
        "tkts_per_day": "Tickets/Dia",
        "PA": "Peças por Atendimento",
        "tkt_medio": "Ticket Médio",
        "discount_percentual": "Desconto Médio %",
        "type": "Período",
    }

    format_money = "R$ {:,.0f}"
    format_percentage = "{:,.0f}%"

    format_dict = {
        "Venda Loja (PDV)": format_money,
        "Meta Loja (PDV)": format_money,
        "Meta % (PDV)": format_percentage,
        "Markup": "{:,.2f}",
        "Tickets/Dia": "{:,.1f}",
        "Peças por Atendimento": "{:,.1f}",
        "Ticket Médio": format_money,
        "Desconto Médio %": format_percentage,
    }

    df_store_formated = df_store_current_month[rename_dict.keys()].round(2).rename(columns=rename_dict)

    for key, format in format_dict.items():
        df_store_formated[key] = df_store_formated[key].map(format.format)

        if format == format_money:
            df_store_formated[key] = df_store_formated[key].astype(str).str.replace(",", ".")

    try:
        df_store_formated = df_store_formated.set_index("Período").reset_index().T[[1, 2, 0]]
    except KeyError:
        df_store_formated = pd.DataFrame()
    
    return df_store_formated

# ----------------------------------------
# ----------------------------------------

def format_kpi_vendedor(id_loja:str, kpi_df_vendedor:pd.DataFrame, vendedor_loja_df:pd.DataFrame, periodo: str):
    format_money = "R$ {:,.0f}"
    format_percentage = "{:,.0f}%"
    
    # Tudo deveria vir do build vendedor
    df_nome_vendedor = build_vendedor()
    maskvendedorLoja = vendedor_loja_df['distributorId'] == id_loja
    vendedor_loja_df = vendedor_loja_df[maskvendedorLoja].copy()
    listCpf = vendedor_loja_df['employeeCpf'].tolist()
    # ----
    
    mask = kpi_df_vendedor["distributorId"] == id_loja
    df_vendedor = kpi_df_vendedor[mask].reset_index(drop=True)
    df_vendedor["Mês"] = df_vendedor["month"].fillna(0)

    mask_mes = df_vendedor["Mês"] == yesterday_date.month
    mask_ano = df_vendedor["year"] == yesterday_date.year
    
    if periodo == "Dia":
        mask_type = df_vendedor["type"].isin(["Dia"])
        mask_dia = df_vendedor['Dia'] == yesterday_date.day
        mask_completa = mask_type & mask_dia & mask_mes & mask_ano
    else:
        mask_type = df_vendedor["type"].isin(["Mês"])
        mask_completa = mask_type & mask_mes & mask_ano
        
    df_vendedor_filtro: pd.DataFrame = df_vendedor[mask_completa].reset_index(drop=True)
    
    df_vendedor_filtro["normalized_net_value"] = (
        df_vendedor_filtro["netValue"] * 100 / round(df_vendedor_filtro["netValue"].max(), 2)
    )
    
    df_vendedor_filtro["net_value_share"] = (
        df_vendedor_filtro["netValue"] * 100 / round(df_vendedor_filtro["netValue"].max(), 2)
    )
    
    maskCpf = df_vendedor_filtro['cpf_vendedor_inteiro'].isin(listCpf)
    df_vendedor_filtro = df_vendedor_filtro[maskCpf]
    df_vendedor_filtro = df_nome_vendedor.merge(df_vendedor_filtro, on="cpf_vendedor_inteiro").drop(
        columns="cpf_vendedor_inteiro"
    )
    
    if periodo == "Dia":
        rename_dict = {
            "vendedor": "Vendedor",
            "netValue": "Acumulado no Dia",
            "normalized_net_value": "Homogeneidade",
            "meta": "Meta do Dia",
            "meta_percentual": "Meta %",
            "mkp": "Markup",
            "tkts_per_day": "Tickets",
            "PA": "Peças por Atendimento",
            "tkt_medio": "Ticket Médio",
            "discount_percentual": "Desconto Médio %",
        }
        
        format_dict = {
            "Acumulado no Dia": format_money,
            "Homogeneidade": format_percentage,
            "Meta do Dia": format_money,
            "Meta %": format_percentage,
            "Markup": "{:,.2f}",
            "Tickets": "{:,.0f}",
            "Peças por Atendimento": "{:,.1f}",
            "Ticket Médio": format_money,
            "Desconto Médio %": format_percentage,
        }
        
        df_vendedor_formatado = (
            df_vendedor_filtro[rename_dict.keys()]
            .sort_values("normalized_net_value", ascending=False)
            .rename(columns=rename_dict)
        )
        
    else:
        rename_dict = {
            "vendedor": "Vendedor",
            "days_of_work": "Dias Trabalhados",
            "netValue": "Acumulado no Mês",
            "normalized_net_value": "Homogeneidade",
            "meta": "Meta do Mês",
            "meta_percentual": "Meta %",
            "mkp": "Markup",
            "tkts_per_day": "Tickets/Dia",
            "PA": "Peças por Atendimento",
            "tkt_medio": "Ticket Médio",
            "discount_percentual": "Desconto Médio %",
        }
        
        format_dict = {
            "Acumulado no Mês": format_money,
            "Homogeneidade": format_percentage,
            "Meta do Mês": format_money,
            "Meta %": format_percentage,
            "Markup": "{:,.2f}",
            "Tickets/Dia": "{:,.1f}",
            "Peças por Atendimento": "{:,.1f}",
            "Ticket Médio": format_money,
            "Desconto Médio %": format_percentage,
        }

        df_vendedor_formatado = (
            df_vendedor_filtro[rename_dict.keys()]
            .sort_values("normalized_net_value", ascending=False)
            .round(2)
            .rename(columns=rename_dict)
        )

    for key, format in format_dict.items():
        df_vendedor_formatado[key] = df_vendedor_formatado[key].map(format.format)
        if format == format_money:
            df_vendedor_formatado[key] = df_vendedor_formatado[key].astype(str).str.replace(",", ".")

    df_vendedor_formatado = df_vendedor_formatado.reset_index(drop=True).T.reset_index().T

    return df_vendedor_formatado.set_index(0)

# ----------------------------------------
# ----------------------------------------

df_vendas_pdv = build_df_vendas()

kpis_loja_dict_ = {}
kpis_vendedor_dict_ = {}
for kpi_period_ in ["Dia", "Mês", "Ano"]:
    kpis_loja_dict_[f"kpis_loja_{kpi_period_.lower()}"] = build_kpis_dataframe_loja(kpi_period_=kpi_period_, df_vendas_pdv_=df_vendas_pdv)
    kpis_vendedor_dict_[f"kpis_vendedor_{kpi_period_.lower()}"] = build_kpis_dataframe_vendedor(kpi_period_= kpi_period_, df_vendas_pdv_=df_vendas_pdv)

df_kpis_loja_list = pd.concat(list(kpis_loja_dict_.values()))
df_kpis_vendedor_list = pd.concat(list(kpis_vendedor_dict_.values()))

maskFiltroLoja = df_kpis_loja_list['distributorId'] == lojaAtual
maskFiltroVendedor = df_kpis_vendedor_list['distributorId'] == lojaAtual

df_kpi_loja_final = df_kpis_loja_list[maskFiltroLoja].reset_index(drop=True)
df_kpi_vendedor_final = df_kpis_vendedor_list[maskFiltroVendedor].reset_index(drop=True)

loja = format_kpi_loja(lojaAtual, df_kpi_loja_final)
dia = format_kpi_vendedor(lojaAtual,df_kpi_vendedor_final, df_vendedor_loja, 'Dia')
mes = format_kpi_vendedor(lojaAtual,df_kpi_vendedor_final, df_vendedor_loja, 'Mês')


dia escolhido  2025-01-20 00:00:00
loja 10H1


## Criando html

In [24]:
relatorioDiaStr = yesterday_date.strftime("%d-%m-%Y")

emailBody = ""

htmlValue = """
<html>
  <head></head>
  <body>
    <h1>Relatorio loja - {1}</h1>
    <h2>Data: {2}</h2>
    {0}
  </body>
  <br/><br/>
</html>
""".format(loja.to_html(), lojaAtual, yesterday_date.strftime('%d-%m-%Y'))
emailBody += htmlValue

htmlValue = """
<html>
  <head></head>
  <body>
  <h1>Relatorio vendedor - Visão diária</h1>
  <p>Loja:{1} | Data: {2}</p>
    {0}
  </body>
  <br/><br/>
</html>
""".format(dia.to_html(),lojaAtual, yesterday_date.strftime('%d-%m-%Y'))
emailBody += htmlValue

htmlValue = """
<html>
  <head></head>
  <body>
  <h1>Relatorio vendedor - Visão mensal</h1>
  <p>Loja:{1} | Data: {2}</p>
    {0}
  </body>
  <br/><br/>
</html>
""".format(mes.to_html(),lojaAtual, yesterday_date.strftime('%m-%Y'))
emailBody += htmlValue
emailBody

'\n<html>\n  <head></head>\n  <body>\n    <h1>Relatorio loja - 10H1</h1>\n    <h2>Data: 20-01-2025</h2>\n    <table border="1" class="dataframe">\n  <thead>\n    <tr style="text-align: right;">\n      <th></th>\n      <th>1</th>\n      <th>2</th>\n      <th>0</th>\n    </tr>\n  </thead>\n  <tbody>\n    <tr>\n      <th>Período</th>\n      <td>Dia</td>\n      <td>Mês</td>\n      <td>Ano</td>\n    </tr>\n    <tr>\n      <th>Venda Loja (PDV)</th>\n      <td>R$ 1.182</td>\n      <td>R$ 103.364</td>\n      <td>R$ 1.641.805</td>\n    </tr>\n    <tr>\n      <th>Meta Loja (PDV)</th>\n      <td>R$ 0</td>\n      <td>R$ 139.893</td>\n      <td>R$ 1.665.970</td>\n    </tr>\n    <tr>\n      <th>Meta % (PDV)</th>\n      <td>0%</td>\n      <td>74%</td>\n      <td>99%</td>\n    </tr>\n    <tr>\n      <th>Markup</th>\n      <td>inf</td>\n      <td>3.90</td>\n      <td>3.68</td>\n    </tr>\n    <tr>\n      <th>Tickets/Dia</th>\n      <td>3.0</td>\n      <td>12.2</td>\n      <td>154.8</td>\n    </tr>\n   

## Enviando relatório por email

In [25]:
emailRecipients = ['joao.garcia@ammovarejo.com.br']
print('recipientes: ', emailRecipients)

recipientes:  ['joao.garcia@ammovarejo.com.br']


In [26]:
import os
import smtplib
from dataclasses import dataclass
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Any, Callable, Dict, List, Optional, Union
from commom.logger import logger
from commom.base_classes.base_sender import BaseMessenger
from innovation_messenger.config import config

@dataclass
class IEmailProperties:
    subject: str
    body: str
    recipient: Union[str, List[str]]
    file_name: Optional[str] = None


def send_email_html(email_properties: IEmailProperties) -> bool:
    message = MIMEMultipart()
    message["Subject"] = email_properties.subject
    message["From"] = 'inovacao@ammovarejo.com.br'
    messageTo: str

    if isinstance(email_properties.recipient, List):
        messageTo = ", ".join(email_properties.recipient)
    else:
        messageTo = email_properties.recipient

    message["To"] = messageTo
    html_part = MIMEText(email_properties.body, 'html')
    message.attach(html_part)
    if isinstance(email_properties.recipient, List):
        messageTo = ", ".join(email_properties.recipient)
    else:
        messageTo = email_properties.recipient
        
    with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
        try:
            server.login(user='inovacao@ammovarejo.com.br', password='inovaAM!05')
            server.sendmail('inovacao@ammovarejo.com.br', email_properties.recipient, message.as_string())
            return True
        except Exception as e:
            logger.error(e)
            return False


email_properties = IEmailProperties(
    subject=f'Teste - Base nova - Loja {lojaAtual} - Dia {relatorioDiaStr}',
    recipient=emailRecipients,
    body=emailBody,
)

send_email_html(
  email_properties=email_properties,
)


True