# Imports

In [None]:
import os
import pandas as pd
from datetime import datetime
import configparser
import warnings

# Desativar as mensagens de aviso
warnings.filterwarnings("ignore")

pd.set_option('display.max_rows', 300)
pd.set_option('display.max_columns', 50)

now = datetime.now()

# Create obj config and load paths
config = configparser.ConfigParser()
config.read('config.ini')
paths = config['paths']

# Load Dataframes

In [None]:
def join_dataframes_from_directory(directory):
    dfs = []
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith(".xlsx"):
                filepath = os.path.join(root, file)
                df = pd.read_excel(
                    filepath, 
                    engine="openpyxl"
                )
                print(f'Readed {file}')
                df = df.sort_index(ascending=False)
                dfs.append(df)
    if len(dfs) > 0:
        df_result = pd.concat(dfs, ignore_index=True)
        print('Count df:', df_result.shape[0])
        return df_result
    else:
        print('No files found in the directory')
        return None

# Load main files
df = join_dataframes_from_directory(paths["path_b3_reports"])
df_depara_ativos = pd.read_csv(paths["path_categorizer_investments"], sep=",")
df_darfs_emitidas = pd.read_csv(paths["path_darfs_issued"], sep=",")

# Treatment Columns

In [None]:
# Treatment b3 reports
mask = ['Código de Negociação', 'Data do Negócio', 'Tipo de Movimentação', 'Quantidade', 'Preço', 'Valor']
df = df[mask]

df = df.rename(columns={"Código de Negociação": "Ativo"})
df = df.rename(columns={"Data do Negócio": "Data Negociacao"})
df = df.rename(columns={"Tipo de Movimentação": "Tipo de Movimentacao"})
df = df.rename(columns={"Preço": "Preco"})

df["Data Negociacao"] = pd.to_datetime(df["Data Negociacao"], format="%d/%m/%Y")
df["Data Mensal"] = df["Data Negociacao"].dt.to_period("M")

# Fix "Desdobramento de Cotas"

In [None]:
# Ativo: KNSC11
cond = (df['Ativo'] == 'KNSC11') & (df['Data Negociacao'] <= pd.to_datetime('2023-11-03'))
df.loc[cond, 'Preco'] = df.loc[cond, 'Preco'] / 10
df.loc[cond, 'Quantidade'] = df.loc[cond, 'Quantidade'] * 10

# Desdobramento de cotas na proporção de 1:10
# Ativo: CPTS11
cond = (df['Ativo'] == 'CPTS11') & (df['Data Negociacao'] <= pd.to_datetime('2023-09-25'))
df.loc[cond, 'Preco'] = df.loc[cond, 'Preco'] / 10
df.loc[cond, 'Quantidade'] = df.loc[cond, 'Quantidade'] * 10

# Desdobramento de cotas na proporção de 1:10
# Ativo: BBAS3FV
cond = (df['Ativo'] == 'BBAS3F') & (df['Data Negociacao'] <= pd.to_datetime('2024-04-15'))
df.loc[cond, 'Preco'] = df.loc[cond, 'Preco'] / 2
df.loc[cond, 'Quantidade'] = df.loc[cond, 'Quantidade'] * 2


# Sort and create new index

In [None]:
# Crie uma nova coluna com o índice de ordenação por Ativo
df['Movimentacao do Ativo'] = df.groupby('Ativo').cumcount() + 1

# Feature "Tipo de estrategia"

In [None]:
def validador_estrategia_trade(group):
    """
    Função para auxiliar na separação de Estrategias Day Trade e Long Trade
    Necessita de ajuste manual após validação, 
    deve separar as quantidades que pertencem determinada estrategia
    Ex: quantidade venda maior que quantidade compra,
        copiar linha e dividir o valor pelas quantidades de cada estrategia.
        linha 1 priorizar a venda das quantidades Day Trade.
        linha 2 separar quantidades restantes como Long Trade.
    """
    tem_venda = (group['Tipo de Movimentacao'] == 'Venda').any()
    tem_compra = (group['Tipo de Movimentacao'] == 'Compra').any()
    
    group['Verificar Estrategia'] = tem_venda and tem_compra
    return group

# Crie cópias das colunas 'Tipo Estrategia' e 'Verificar Estrategia'
df['Tipo Estrategia'] = 'Long Trade'
df['Verificar Estrategia'] = False

# Aplique a função validador_estrategia_trade no DataFrame original
df = df.groupby(['Ativo', 'Data Negociacao']).apply(validador_estrategia_trade).reset_index(drop=True)

# Feature "Tipo de investimento"

In [None]:
# Check Categoria de Ativos
ativos_sem_categoria = set(df["Ativo"].unique()).difference(df_depara_ativos["Ativo"].unique())
if len(ativos_sem_categoria) > 0:
    print("Há ativos sem categorias!")
    print("Ativos:", ativos_sem_categoria)
else:
    print("Todos os ativos contém categoria!")
    # Merge para obter os tipos de ativos e possibilitar o calculo de taxa IR
    df = df.merge(df_depara_ativos, on="Ativo", how="left")

# Feature "Investimentos", "Quantidades" and "Preço medio"  

In [None]:
def calcular_valores_negociados(group):
    """
    Função para calcular o investimento, 
    quantidade e preço médio acumulado de cada grupo de ativos.
    """
    investimento_acumulado = 0
    quantidade_acumulada = 0
    preco_medio = 0

    for index, row in group.iterrows():
        if row['Tipo de Movimentacao'] == 'Compra':
            investimento_acumulado += row['Valor']
            quantidade_acumulada += row["Quantidade"]
            preco_medio = investimento_acumulado / quantidade_acumulada

        elif row['Tipo de Movimentacao'] == 'Venda':
            valor_investido = preco_medio * row['Quantidade']
            investimento_acumulado -= valor_investido
            quantidade_acumulada -= row["Quantidade"]
            if (investimento_acumulado == 0) & (quantidade_acumulada == 0):
                preco_medio = 0
            elif (investimento_acumulado > 0) & (quantidade_acumulada > 0):
                preco_medio = investimento_acumulado / quantidade_acumulada
            else:
                # preciso checkar cada grupo de ativo manualmente
                print(investimento_acumulado, quantidade_acumulada, row['Ativo'])

        group.at[index, 'Investimento Atual'] = investimento_acumulado
        group.at[index, 'Quantidade Atual'] = quantidade_acumulada
        group.at[index, 'Preco Medio Atual'] = preco_medio
    return group

# Agrupar por "Ativo" e aplicar a função de cálculo do preço médio acumulado em cada grupo
df = df.groupby('Ativo').apply(calcular_valores_negociados).reset_index(drop=True)

# Create new columns using shift value
df['Investimento Anterior'] = df.groupby('Ativo')['Investimento Atual'].shift()
df['Quantidade Anterior'] = df.groupby('Ativo')['Quantidade Atual'].shift()
df['Preco Medio Anterior'] = df.groupby('Ativo')['Preco Medio Atual'].shift()

df['Investimento Anterior'] = df['Investimento Anterior'].fillna(0)
df['Quantidade Anterior'] = df['Quantidade Anterior'].fillna(0)
df['Preco Medio Anterior'] = df['Preco Medio Anterior'].fillna(0)

# Fix typings
df['Investimento Atual'] = df['Investimento Atual'].astype('float').round(2)
df['Quantidade Atual'] = df['Quantidade Atual'].astype('int')
df['Preco Medio Atual'] = df['Preco Medio Atual'].astype('float').round(2)

df['Investimento Anterior'] = df['Investimento Anterior'].astype('float').round(2)
df['Quantidade Anterior'] = df['Quantidade Anterior'].astype('int')
df['Preco Medio Anterior'] = df['Preco Medio Anterior'].astype('float').round(2)

# Feature "Taxa IR" and "Dedo Duro"

In [None]:
def definir_taxa_imposto_renda_dedo_duro_por_ativo(row):
    # Calcula a taxa de IR (Imposto de Renda) considerando 15% para ações e 20% para FIIs.
    # Ações IR: Day Trade 20%, Swing Trade 15% e Long Trade 15%. 
    # Calcular Dedo Duro da mesma forma, 
    # Dedo Duro: Day Trade 1%, Swing Trade 0.005% e Long Trade 0.005%
    if row["Tipo de Movimentacao"] == "Venda":
        if row["Tipo Estrategia"] == "Day Trade":
            row["Dedo Duro"] = row["Valor"] * 0.01
            row["Taxa IR"] = 0.20
        elif row["Tipo Estrategia"] == "Long Trade":
            if row["Tipo Ativo"] == "Fii":
                row["Dedo Duro"] = row["Valor"] * 0.00005
                row["Taxa IR"] = 0.20
            elif row["Tipo Ativo"] == "Acao":
                row["Dedo Duro"] = row["Valor"] * 0.00005
                row["Taxa IR"] = 0.15
            else: 
                row["Dedo Duro"] = "Outras Categorias"
                row["Taxa IR"] = "Outras Categorias"
        else:
            row["Dedo Duro"] = "Outras Estrategias"
            row["Taxa IR"] = "Outras Estrategias"
    else:
        row["Dedo Duro"] = 0
        row["Taxa IR"] = 0
    return row

# Define a taxa IR e Taxa de Retenção na fonte para cada negociação de venda
df = df.apply(definir_taxa_imposto_renda_dedo_duro_por_ativo, axis=1)

# Fix typings
df['Dedo Duro'] = df['Dedo Duro'].astype('float').round(2)
df['Taxa IR'] = df['Taxa IR'].astype('float').round(2)

# Feature "lucro bruto" and "prejuizo"

In [None]:
def calcular_lucro_bruto_prejuizo(row):
    """
    Calcula o Lucro Bruto e Prejuizo em determinados momentos (Parcial ou inteiro de venda)
    """
    if row['Tipo de Movimentacao'] == 'Venda':
        lucro_prejuizo = -(row["Preco Medio Anterior"] * row["Quantidade"]) + row["Valor"]
        if lucro_prejuizo > 0:
            row["Lucro Bruto"] = lucro_prejuizo
            row["Prejuizo"] = 0
        elif lucro_prejuizo < 0:
            row["Lucro Bruto"] = 0
            row["Prejuizo"] = abs(lucro_prejuizo)
        elif lucro_prejuizo == 0:
            row["Lucro Bruto"] = 0
            row["Prejuizo"] = 0
    else:
        row["Lucro Bruto"] = 0
        row["Prejuizo"] = 0
    return row

# cálculo do lucro e prejuizo em cada negociação de venda
df = df.apply(calcular_lucro_bruto_prejuizo, axis=1)

df['Lucro Bruto'] = df['Lucro Bruto'].astype('float').round(2)
df['Prejuizo'] = df['Prejuizo'].astype('float').round(2)

# Feature "Imposto de Renda"

In [None]:
def calcular_imposto_renda_esperado(row):
    # Calcula o valor do IR para cada transação
    # e desconta o "dedo duro" já pago nas transaçoes
    if row["Lucro Bruto"] > 0:
        row["IR Esperado"] = (row["Lucro Bruto"] * row["Taxa IR"]) - row["Dedo Duro"]
    else:
        row["IR Esperado"] = 0
    return row

df = df.apply(calcular_imposto_renda_esperado, axis=1)

df['IR Esperado'] = df['IR Esperado'].astype('float').round(2)

In [None]:
def calcular_lucro_real(row):
    # Calcula o valor Real de Lucro descontando os prezuizos e impostos
    if row["Lucro Bruto"] > 0:
        row["Lucro Real"] = row["Lucro Bruto"] - row["IR Esperado"]
    else:
        row["Lucro Real"] = 0
    return row

# Calcula o valor Real de Lucro descontando os prezuizos e impostos
df = df.apply(calcular_lucro_real, axis=1)

df['Lucro Real'] = df['Lucro Real'].astype('float').round(2)

# Save Consolidated

In [None]:
df.to_csv(paths['path_treated_b3_report'], index=False)

In [None]:
df_compra = df[df['Tipo de Movimentacao'] == 'Compra']
df_venda = df[df['Tipo de Movimentacao'] == 'Venda']

df_venda_mensal = df_venda.groupby(["Tipo Ativo", "Data Mensal", "Taxa IR", "Tipo Estrategia"]).agg(
    {
        "Valor": "sum",
        "IR Esperado": "sum",
        "Dedo Duro": "sum",
        "Lucro Bruto": "sum",
        "Prejuizo": "sum",
        "Lucro Real": "sum"
    }
).reset_index()

In [None]:
df_darfs_emitidas['Tipo Ativo'] = df_darfs_emitidas['Tipo Ativo'].astype('str')
df_darfs_emitidas['Data'] = pd.to_datetime(df_darfs_emitidas['Data'], format="%d/%m/%Y")
df_darfs_emitidas['IR Pago'] = df_darfs_emitidas['IR Pago'].astype('float').round(2)
df_darfs_emitidas["Data Mensal"] = df_darfs_emitidas["Data"].dt.to_period("M")

df_venda_mensal = df_venda_mensal.merge(
    df_darfs_emitidas[['Tipo Ativo', 'Data Mensal', 'IR Pago']], 
    on=['Tipo Ativo', 'Data Mensal'],
    how="left"
)
df_venda_mensal['IR Pago'] = df_venda_mensal['IR Pago'].fillna(0)

In [None]:
def isencao_imposto_renda(row):
    # Para vendas de ações com montante menor ou igual 
    # a 20.000,00 em um periodo de um 1 mes, será isento de declarar imposto sobre essas ações, 
    # sendo excessões, qualquer tipo de Day Trade ou qualquer outra categoria.
    isencao_imposto_renda = False
    if (row["Tipo Estrategia"] == "Long Trade") \
            & (row['Tipo Ativo'] == "Acao") \
            & (row["Valor"] < 20000):
        isencao_imposto_renda = True
    row["Isento IR"] = isencao_imposto_renda
    return row

df_venda_mensal = df_venda_mensal.apply(isencao_imposto_renda, axis=1)

# Feature "Compensação de Impostos"

In [None]:
def calcular_compensacao_impostos(group):
    prejuizo_acumulado = 0
    imposto_acumulado = 0

    for index, row in group.iterrows():
        prejuizo_compensado = row['Lucro Bruto'] - row['Prejuizo']

        if prejuizo_compensado < 0:
            group.at[index, 'Lucro Bruto Compensado'] = prejuizo_compensado
            prejuizo_acumulado += prejuizo_compensado
        else:
            group.at[index, 'Lucro Bruto Compensado'] = prejuizo_compensado
            prejuizo_acumulado = 0
        group.at[index, 'Prejuizo Acumulado'] = prejuizo_acumulado

        if prejuizo_compensado > 0:
            imposto_compensado = (prejuizo_compensado * row['Taxa IR']) - row['Dedo Duro']
        elif prejuizo_compensado == 0:
            imposto_compensado = prejuizo_compensado - row['Dedo Duro']
        else:
            imposto_compensado = 0              
        group.at[index, 'IR Compensado'] = imposto_compensado

        imposto_acumulado += imposto_compensado
        group.at[index, 'IR Acumulado'] = imposto_acumulado

        if row['IR Pago'] > 0:
            imposto_pendente = imposto_acumulado - row['IR Pago']
            if imposto_pendente < 0:
                imposto_pendente = 0
                imposto_acumulado = 0
        else:
            imposto_pendente = imposto_acumulado
        group.at[index, 'IR Pendente'] = imposto_pendente
        
    return group

df_venda_mensal = df_venda_mensal.groupby(['Tipo Ativo', 'Tipo Estrategia']).apply(calcular_compensacao_impostos).reset_index(drop=True)

df_venda_mensal['IR Compensado'] = df_venda_mensal['IR Compensado'].astype('float').round(2)
df_venda_mensal['IR Acumulado'] = df_venda_mensal['IR Acumulado'].astype('float').round(2)
df_venda_mensal['IR Pendente'] = df_venda_mensal['IR Pendente'].astype('float').round(2)

# Conclusão

In [None]:
print("-"*30, "\nRelatorio completo:\n", "-"*30)

print(f"Periodo de Negociações listadas: {df['Data Negociacao'].min().date()} até {df['Data Negociacao'].max().date()}")
print("Quantidade de compras totais negociadas:", df_compra[df_compra["Tipo de Movimentacao"] == "Compra"].shape[0])
print("Quantidade de vendas totais negociadas:", df_venda[df_venda["Tipo de Movimentacao"] == "Venda"].shape[0])
print("Total de Transações com imposto a declarar:", df_venda[df_venda['Lucro Bruto'] > 0].shape[0])
print("Total de Transações com prejuizo a compensar no imposto:", df_venda[df_venda['Prejuizo'] < 0].shape[0])
print("Total de Prejuizo obtido:", round(df_venda_mensal['Prejuizo'].sum(), 2))
print("Total de Lucro Real obtido:", round(df_venda_mensal['Lucro Real'].sum(), 2))

list_imposto_final = list()
for categoria in df["Tipo Ativo"].unique():
    
    print(f"\nCategoria de {categoria}(s):")
    df_categoria = df_venda_mensal[df_venda_mensal["Tipo Ativo"] == categoria]
    df_sem_isenção = df_categoria[df_categoria["Isento IR"] == False]
    
    total_ir = df_categoria["IR Esperado"].sum()
    print(f"Total de Imposto de Renda: R$ {total_ir:.2f}")

    total_ir_sem_isentos = df_sem_isenção["IR Esperado"].sum()
    print(f"Total de Imposto de Renda em não isentos: R$ {total_ir_sem_isentos:.2f}")

    list_imposto_final.append(total_ir_sem_isentos)

print(f"\nTotal de Impostos a Pagar: R$ {sum(list_imposto_final):.2f}")

print("\nRelatorio de Agrupamento Mensal")
print("Quantidade de isenções de IR:", df_venda_mensal[df_venda_mensal['Isento IR'] == True].shape[0])
print("Quantidade de não isenções de IR:", df_venda_mensal[df_venda_mensal['Isento IR'] == False].shape[0])

In [None]:
previous_month = (pd.to_datetime('today') - pd.DateOffset(months=1)).strftime('%Y-%m')
df_filtro_mes = df_venda_mensal[df_venda_mensal["Data Mensal"] == previous_month]
print("-"*30, f"\nRelatorio Mensal - {previous_month}\n", "-"*30)
print("Total de Transações com imposto a declarar:", df_filtro_mes[df_filtro_mes['IR Esperado'] > 0].shape[0])
print("Total de Transações com prejuizo a compensar no imposto:", df_filtro_mes[df_filtro_mes['Prejuizo'] < 0].shape[0])
print("Total de Prejuizo obtido:", round(df_filtro_mes['Prejuizo'].sum(), 2))
print("Total de Lucro Real obtido:", round(df_filtro_mes['Lucro Real'].sum(), 2))

list_imposto_final = list()
for categoria in df["Tipo Ativo"].unique():
    print(f"\nCategoria de {categoria}(s):")
    df_categoria = df_filtro_mes[df_filtro_mes["Tipo Ativo"] == categoria]
    df_sem_isenção = df_categoria[df_categoria["Isento IR"] == False]
    
    total_ir = df_categoria["IR Esperado"].sum()
    print(f"Total de Imposto de Renda: R$ {total_ir:.2f}")

    total_ir_sem_isentos = df_sem_isenção["IR Esperado"].sum()
    print(f"Total de Imposto de Renda em não isentos: R$ {total_ir_sem_isentos:.2f}")

    list_imposto_final.append(total_ir_sem_isentos)

print(f"\nTotal de Impostos a Pagar: R$ {sum(list_imposto_final):.2f}")

print("\nRelatorio de Agrupamento Mensal")
print("Quantidade de isenções de IR:", df_filtro_mes[df_filtro_mes['Isento IR'] == True].shape[0])
print("Quantidade de não isenções de IR:", df_filtro_mes[df_filtro_mes['Isento IR'] == False].shape[0])

In [None]:
mask = ['Tipo Ativo', 'Data Mensal', 'Tipo Estrategia', 
        'Valor', 'Taxa IR', 'Dedo Duro', 'Isento IR', 
        'Lucro Real', 'Lucro Bruto', 'Prejuizo', 'Lucro Bruto Compensado',
        'Prejuizo Acumulado', 'IR Esperado', 'IR Compensado', 
        'IR Acumulado', 'IR Pago', 'IR Pendente']
df_venda_mensal = df_venda_mensal[mask].sort_values(['Data Mensal'], ascending=False)

In [None]:
df_venda_mensal.to_csv(paths['path_treated_b3_report_monthly'], index=False)