# Configurações Globais

In [12]:
import pandas as pd
import duckdb
from sklearn.preprocessing import MinMaxScaler

files_to_analyze = [
    'data/receipts-000000000000.csv',
    'data/items-000000000000.csv',
    'data/payments-000000000000.csv',
    'data/discounts-000000000000.csv',
    'data/torque.csv'
]

# Análise Exploratória de Dados

In [14]:
try:
    con = duckdb.connect(database=':memory:', read_only=False)
    print("Conexão com DuckDB ok!")

    for file_name in files_to_analyze:
        print(f"\n\n{'='*20} Análise do Arquivo: {file_name} {'='*20}")

        if 'items-000000000000.csv' in file_name:
            read_command = f"read_csv_auto('{file_name}', quote='\"')"
        else:
            read_command = f"read_csv_auto('{file_name}')"

        try:
            print("\n[1] Head dos Dados:")
            print(con.execute(f"SELECT * FROM {read_command} LIMIT 3;").fetchdf())

            print("\n[2] Estrutura da Tabela:")
            df_info = con.execute(f"DESCRIBE SELECT * FROM {read_command};").fetchdf()
            print(df_info)

            print("\n[3] Quantidade de Valores Nulos por Coluna:")
            null_counts_sql = [f'COUNT(*) - COUNT("{col}") AS "{col}_nulls"' for col in df_info['column_name']]
            query_nulls = f"SELECT {', '.join(null_counts_sql)} FROM {read_command};"
            print(con.execute(query_nulls).fetchdf())

        except Exception as e:
            print(f"Erro ao processar o arquivo {file_name}: {e}")
            continue

    print("\n[4a] Quantidade de Recibos por Loja:")
    query_receipts_count = """
        SELECT
            shop_id,
            COUNT(identifier) as total_recibos
        FROM read_csv_auto('data/receipts-000000000000.csv')
        GROUP BY shop_id
        ORDER BY total_recibos DESC;
    """
    print(con.execute(query_receipts_count).fetchdf())

except Exception as e:
    print(f"Erro geral na célula de Análise Exploratória: {e}")

Conexão com DuckDB ok!



[1] Head dos Dados:
                             identifier             shop_id  \
0  576f1b6e-e971-11ee-9aa3-0242ac1c000c  highway-praca-ekin   
1  576f0ba6-e971-11ee-9aa3-0242ac1c000c  highway-praca-ekin   
2  576ed2e4-e971-11ee-9aa3-0242ac1c000c  highway-praca-ekin   

            timestamp  receipt_number  total_value  delivery  canceled  staff  \
0 2024-01-01 22:05:11               8         37.5     False     False  False   
1 2024-01-01 23:35:35              13         49.5     False     False  False   
2 2024-01-01 21:43:00               6         23.0     False     False  False   

  operation_date  pdv  fee  change           date_time  totem  
0     2024-01-01    1    0     0.0 2024-01-01 19:05:11  False  
1     2024-01-01    1    0     0.0 2024-01-01 20:35:35  False  
2     2024-01-01    1    0     0.0 2024-01-01 18:43:00  False  

[2] Estrutura da Tabela:
       column_name column_type null   key default extra
0       identifier     VARCHAR  YES  N

# Transformação dos Dados e Construção do ISF (Índice de Saúde Financeira)

In [15]:
con = duckdb.connect(database=':memory:', read_only=False)

try:
    # Agrega as métricas principais por dia e por loja
    daily_metrics_query = f"""
    WITH receipts_daily AS (
        -- Agrega a receita líquida e a contagem de transações por dia
        SELECT
            shop_id,
            operation_date,
            SUM(total_value + fee) AS net_revenue,
            COUNT(identifier) AS transaction_count
        FROM read_csv_auto('data/receipts-000000000000.csv')
        WHERE canceled = false
        GROUP BY shop_id, operation_date
    ),
    discounts_daily AS (
        -- Agrega os descontos com a tabela de receita para obter shop_id e operation_date
        SELECT
            r.shop_id,
            r.operation_date,
            SUM(d.total) AS total_discount
        FROM read_csv_auto('data/discounts-000000000000.csv', quote='\"') AS d
        JOIN read_csv_auto('data/receipts-000000000000.csv') AS r ON d.fk_receipt_identifier = r.identifier
        WHERE d.canceled = false AND r.canceled = false
        GROUP BY r.shop_id, r.operation_date
    )
    -- Junta todas as métricas diárias
    SELECT
        rd.shop_id,
        rd.operation_date,
        rd.net_revenue,
        rd.transaction_count,
        COALESCE(dd.total_discount, 0) AS total_discount, -- Se não tiver desconto no dia, o valor é 0
        t.net_torque
    FROM receipts_daily AS rd
    LEFT JOIN discounts_daily AS dd ON rd.shop_id = dd.shop_id AND rd.operation_date = dd.operation_date
    JOIN read_csv_auto('data/torque.csv') AS t ON rd.shop_id = t.shop_id AND rd.operation_date = t.operation_date
    ORDER BY rd.shop_id, rd.operation_date;
    """
    df_daily_health = con.execute(daily_metrics_query).fetchdf()

    # Calcular o percentual de desconto diário
    df_daily_health['discount_perc'] = (df_daily_health['total_discount'] / (df_daily_health['net_revenue'] + 1)) * 100 # o "+ 1" é pra não ter divisão por zero, se a receita seja 0 em algum dia

    # Normaliza as métricas usando MinMaxScaler
    scaler = MinMaxScaler()
    metrics_to_normalize = ['net_revenue', 'net_torque', 'discount_perc']

    metrics_normalized = df_daily_health[metrics_to_normalize].copy()
    metrics_normalized.loc[:, metrics_to_normalize] = scaler.fit_transform(metrics_normalized[metrics_to_normalize])

    metrics_normalized.loc[:, 'discount_perc'] = 1 - metrics_normalized['discount_perc'] # Inverte o percentual de desconto

    # Pesos de cada métrica
    weights = {'net_revenue': 0.45, 'net_torque': 0.35, 'discount_perc': 0.20}

    # Cálcula o ISF e adiciona no DF
    df_daily_health['isf'] = (
        metrics_normalized['net_revenue'] * weights['net_revenue'] +
        metrics_normalized['net_torque'] * weights['net_torque'] +
        metrics_normalized['discount_perc'] * weights['discount_perc']
    ) * 100

    # DF final para exportar
    df_looker_export = df_daily_health[[
        'operation_date',
        'shop_id',
        'isf',
        'net_revenue',
        'net_torque',
        'discount_perc'
    ]].copy()

    # Garante que a data está no formato correto
    df_looker_export['operation_date'] = pd.to_datetime(df_looker_export['operation_date']).dt.date

    print("\nPré-visualização dos dados a serem exportados:")
    print(df_looker_export.head())

except Exception as e:
    print(f"Erro: {e}")
finally:
    con.close()


Pré-visualização dos dados a serem exportados:
  operation_date                  shop_id        isf  net_revenue  net_torque  \
0     2024-02-23  highway-adidas-shopping  54.047074      5877.50   52.013274   
1     2024-02-24  highway-adidas-shopping  56.243288      6026.50   52.864035   
2     2024-02-25  highway-adidas-shopping  51.351134      5083.04   51.604467   
3     2024-02-27  highway-adidas-shopping  46.621581      3389.00   50.582090   
4     2024-02-28  highway-adidas-shopping  46.300752      3736.50   48.843137   

   discount_perc  
0       1.760653  
1       0.738283  
2       1.662064  
3       0.471976  
4       0.735786  


# Ranking

In [16]:
df_looker_export['operation_date'] = pd.to_datetime(df_looker_export['operation_date'])

# Filtra os dados para apenas o mês de Junho de 2025
df_junho_2025 = df_looker_export[(df_looker_export['operation_date'].dt.month == 6) & (df_looker_export['operation_date'].dt.year == 2025)].copy()

if df_junho_2025.empty:
    print("\nNão foram encontrados dados para o mês de Junho de 2025.")
    print("Por favor, verifique o intervalo de datas dos seus arquivos de dados.")
else:
    # Agrupar por loja e calcular a média do ISF e de outras métricas para o mês
    ranking_junho = df_junho_2025.groupby('shop_id').agg(
        isf_medio=('isf', 'mean'),
        receita_total_mes=('net_revenue', 'sum'),
        torque_medio=('net_torque', 'mean'),
        desconto_perc_medio=('discount_perc', 'mean')
    ).reset_index()

    # Ordenar o ranking pelo ISF médio, do maior para o menor
    ranking_junho_final = ranking_junho.sort_values(by='isf_medio', ascending=False).reset_index(drop=True)

    print(f"\n{'='*20} Ranking de Saúde Financeira (ISF Médio) - Junho/2025 {'='*20}")
    print(ranking_junho_final)


                        shop_id  isf_medio  receita_total_mes  torque_medio  \
0  highway-avenida-nova-balanca  56.921065          103113.67     68.858865   
1       highway-adidas-shopping  55.216086          127783.70     61.064379   
2     highway-rua-bens-perdidos  54.584016          100043.93     65.876105   
3            highway-praca-ekin  50.299590           76473.06     60.255156   

   desconto_perc_medio  
0             2.009104  
1             1.362024  
2             2.068642  
3             1.842762  


In [17]:
df_looker_export['operation_date'] = pd.to_datetime(df_looker_export['operation_date'])

# Filtra os dados para apenas o ano de 2025
df_2025 = df_looker_export[(df_looker_export['operation_date'].dt.year == 2025)].copy()

if df_2025.empty:
    print("\nNão foram encontrados dados para o ano de 2025.")
    print("Por favor, verifique o intervalo de datas dos seus arquivos de dados.")
else:
    # Agrupar por loja e calcular a média do ISF e de outras métricas para o mês
    ranking_2025 = df_2025.groupby('shop_id').agg(
        isf_medio=('isf', 'mean'),
        receita_total_mes=('net_revenue', 'sum'),
        torque_medio=('net_torque', 'mean'),
        desconto_perc_medio=('discount_perc', 'mean')
    ).reset_index()

    # Ordenar o ranking pelo ISF médio, do maior para o menor
    ranking_2025_final = ranking_2025.sort_values(by='isf_medio', ascending=False).reset_index(drop=True)

    print(f"\n{'='*20} Ranking de Saúde Financeira (ISF Médio) - 2025 {'='*20}")
    print(ranking_2025_final)


                        shop_id  isf_medio  receita_total_mes  torque_medio  \
0       highway-adidas-shopping  54.755490          733218.32     60.993903   
1     highway-rua-bens-perdidos  52.762191          597368.16     62.707770   
2  highway-avenida-nova-balanca  52.287645          459278.54     65.319818   
3            highway-praca-ekin  48.965403          535562.81     58.446373   

   desconto_perc_medio  
0             1.203669  
1             2.024049  
2             1.647298  
3             1.935428  


# Exportação para o Looker

In [18]:
output_filename = 'daily_financial_health.csv'
df_looker_export.to_csv(output_filename, index=False)