In [1]:
import config.settings as settings
import utils.system as system


In [6]:
import os
import glob
import time
import sqlite3
import pandas as pd
from config import settings
from utils import system


In [2]:
def load_db(db_file, columns=settings.cols_b3, cols=settings.cols_order):
    """
    Load data from an SQLite database into a DataFrame.
    
    This function connects to the specified SQLite database, reads the data from the 
    'finsheet' table into a DataFrame, converts the 'quarter' column to datetime format 
    and the 'version' column to numeric format. It also removes rows with empty values 
    in specified columns.

    Parameters:
    - db_file (str): Path to the SQLite database file.
    - cols (list): List of columns to check for empty values. Defaults to ['quarter', 'conta', 'valor', 'version'].

    Returns:
    - DataFrame: A DataFrame containing the data from the 'finsheet' table.
    """
    try:
        # Establish a connection to the SQLite database file
        conn = sqlite3.connect(db_file)
        
        # Read data from the 'finsheet' table into a DataFrame
        df = pd.read_sql_query("SELECT * FROM finsheet", conn)

        # Close the database connection
        conn.close()
        
        # Convert the 'quarter' column to datetime format, handling errors by setting invalid parsing to NaT
        df['quarter'] = pd.to_datetime(df['quarter'], errors='coerce')
        
        # Convert the 'version' column to numeric format, filling invalid parsing with -1 and converting to integer
        df['version'] = pd.to_numeric(df['version'], errors='coerce').fillna(-1).astype(int)

        # Remove rows with empty values in specified columns
        for col in cols:
            try:
                # Strip any whitespace from the column values and filter out empty strings
                df = df[df[col].str.strip() != ""]
            except Exception as e:
                # If an exception occurs (e.g., column not found), it is ignored and processing continues
                pass

        # Remove rows with empty 'company_name' values after converting to string and stripping whitespace
        df = df[df['company_name'].astype(str).str.strip() != ""]

        # Return the cleaned DataFrame
        return df
    except Exception as e:
        # Log any exceptions that occur during the process
        # system.log_error(e)
        return pd.DataFrame(columns=columns)


In [4]:
db_file = 'd:\\Fausto Stangler\\Documentos\\Python\\ASW\\backend\\data\\b3 .db'

In [8]:
df_math = load_db(db_file.replace('.db', ' math.db'))


In [30]:
company_name = 'EVORA SA'
tipo = 'DFs Consolidadas'
conta = '3.01'
quarter = '2014-12-31'
version = 2
valor = 2e+09 / 1000

filter = (df_math['company_name'] == company_name)
filter &= (df_math['tipo'] == tipo)
filter &= (df_math['conta'] == conta)
filter &= (df_math['quarter'] == quarter)

df_math.loc[filter, 'version'] = version
df_math.loc[filter, 'valor'] = valor


In [31]:
df_math[filter]

Unnamed: 0,nsd,tipo,setor,subsetor,segmento,company_name,quadro,quarter,conta,descricao,valor,version
26083,44223,DFs Consolidadas,,,,EVORA SA,Demonstração do Resultado,2014-12-31,3.01,Receita de Venda de Bens e/ou Serviços,2000000.0,2


In [19]:
df_math[filter]['tipo'].unique()

array(['DFs Consolidadas', 'DFs Individuais', 'Dados da Empresa'],
      dtype=object)

In [4]:
def b3_math(df, conta_prefix):
    """
    Apply B3 math calculations to the DataFrame.

    Parameters:
    - df (DataFrame): DataFrame to apply calculations on.
    - conta_prefix (str): Prefix of 'conta' to determine calculation logic.

    Returns:
    - DataFrame: DataFrame with updated values based on B3 math calculations.
    """
    try:
        # Initialize dictionaries to store indices and values for each quarter
        indices = {'March': None, 'June': None, 'September': None, 'December': None}
        values = {'March': 0, 'June': 0, 'September': 0, 'December': 0}

        # Define mapping of month to quarter names
        month_to_quarter = {3: 'March', 6: 'June', 9: 'September', 12: 'December'}

        # Iterate through each quarter to find max value and index
        for month, quarter_name in month_to_quarter.items():
            try:
                df_quarter = df[df['quarter'].dt.month == month]
                if not df_quarter.empty:
                    indices[quarter_name] = df_quarter.index[0]
                    values[quarter_name] = df_quarter['valor'].max()
            except Exception as e:
                system.log_error(e)

        # Extract indices and values
        i3, v3 = indices['March'], values['March']
        i6, v6 = indices['June'], values['June']
        i9, v9 = indices['September'], values['September']
        i12, v12 = indices['December'], values['December']

        # Apply B3 math logic based on quarter identifiers
        def apply_b3_math_logic(v3, v6, v9, v12, conta_prefix, settings, df):
            try:
                if conta_prefix in settings.last_quarters:
                    v12 -= (v9 + v6 + v3)
                elif conta_prefix in settings.all_quarters:
                    v6 -= v3
                    v9 -= (v6 + v3)
                    v12 -= (v9 + v6 + v3)
            except Exception as e:
                system.log_error(e)
            return v3, v6, v9, v12

        # Apply the logic and update values
        v3, v6, v9, v12 = apply_b3_math_logic(v3, v6, v9, v12, conta_prefix, settings, df)
        def update_dataframe(df, indices, values):
            for quarter, idx in indices.items():
                if idx is not None:
                    df.loc[idx, 'valor'] = values[quarter]

        # Update DataFrame with new values
        indices = {'March': i3, 'June': i6, 'September': i9, 'December': i12}
        values = {'March': v3, 'June': v6, 'September': v9, 'December': v12}
        update_dataframe(df, indices, values)

        return df
    except Exception as e:
        system.log_error(e)
        return df

def save_db(df, db_file):
    """
    Save the transformed DataFrame back to the database with a 'math' suffix.

    Parameters:
    - df (DataFrame): The transformed DataFrame.
    - db_file (str): Path to the original database file.
    """
    try:
        # Create a new database file name with a 'math' suffix
        math_db_file = db_file.replace('.db', ' math.db')
        conn = sqlite3.connect(math_db_file)
        # Save the DataFrame to the new database file
        df.to_sql('finsheet', conn, if_exists='replace', index=False)
        conn.close()
        print(f"Saved transformed data to {math_db_file}")
    except Exception as e:
        system.log_error(e)

## plots

In [None]:
import config.settings as settings
import utils.system as system

import sqlite3
import pandas as pd
import time
import plotly.express as px


In [None]:

db_name = 'b3 .db'
db_name = 'b3 COMUNICACOES.db'
db_path = 'd:\\Fausto Stangler\\Documentos\\Python\\ASW\\backend\\data\\' + db_name
db_path = 'd:\\Fausto Stangler\\Documentos\\Python\\ASW\\backend\\data\\b3  math.db'
# Connect to the SQLite database
conn = sqlite3.connect(db_path)

# Read the finsheet table into a DataFrame
query = "SELECT * FROM finsheet"
df = pd.read_sql_query(query, conn)

# Close the database connection
conn.close()

df['quarter'] = pd.to_datetime(df['quarter'], errors='coerce')
df = df.dropna(subset=['quarter'])
df = df[df['quarter'] > pd.to_datetime('2010-12-31')]

df.sort_values(by=['setor', 'subsetor', 'segmento', 'company_name', 'quarter', 'conta'], inplace=True)
df['valor'] = df['valor']/10**6
df.head(5)

In [None]:
df['company_name'].unique()

In [None]:
df['company_name'].unique()
company = 'CREMER SA'
conta = '3.01'
tipo = 'DFs Consolidadas'

filter = (
    (df['company_name'] == company) & 
    (df['conta'] == conta) & 
    (df['tipo'] == tipo)
    )
df_filtered = df[filter][['quarter', 'conta', 'valor']]
# Create the Plotly plot
fig = px.line(df_filtered, x='quarter', y='valor', title=f'Quarterly Values for {company} - Conta {conta}')

# Show the plot
fig.show()

df_filtered
