# Análise de Apólices e Sinistros — Notebook

Pipeline completo (reprodutível) com preparação, análises, integração e relatório executivo.

In [None]:

from pathlib import Path
import pandas as pd, numpy as np

INPUT_AP = Path('/mnt/data/apolices.csv')
INPUT_SI = Path('/mnt/data/sinistros_extract/sinistros.csv')
OUT_DIR = Path('/mnt/data/analise_apolices')
OUT_DIR.mkdir(exist_ok=True, parents=True)

df = pd.read_csv(INPUT_AP, low_memory=False)
df.columns = [c.strip() for c in df.columns]
df.head(3)


## Integração com Sinistros e Export de Tabelas

In [None]:

import pandas as pd, numpy as np

def to_date(s): return pd.to_datetime(s, dayfirst=True, errors='coerce')
def to_num_br(s): 
    return pd.to_numeric(s.astype(str).str.replace('.', '', regex=False).str.replace(',', '.', regex=False), errors='coerce')

# Detect (reuso simplificado)
cols = {'apolice': 'Apólice', 'endosso': 'Endosso', 'desc_endosso': 'Descrição do Endosso', 'status': 'Status da Apólice', 'motivo_cancel': 'Motivo do Cancelamento', 'produto_nome': 'Nome do Produto', 'ramo_grupo': 'ID do Grupo do Ramo', 'uf': 'UF', 'cidade': 'Cidade', 'dt_emissao': 'Data de Emissão', 'dt_inicio': 'Data de Início da Vigência', 'dt_fim': 'Data de Fim da Vigência', 'sexo': 'Sexo', 'tipo_pessoa': 'Tipo de Pessoa'}

# Numerics e datas básicas
for c in [c for c in df.columns if any(k in c.lower() for k in ['valor','vl_','prêmio','premio','iof','custo','fracionamento'])]:
    if (c + ' (num)') not in df.columns:
        df[c + ' (num)'] = to_num_br(df[c])
df['_dt_ini'] = to_date(df[cols['dt_inicio']]) if cols['dt_inicio'] else pd.NaT
df['_dt_fim'] = to_date(df[cols['dt_fim']]) if cols['dt_fim'] else pd.NaT

col_premio = next((c for c in df.columns if c.endswith(' (num)') and ('prêmio' in c.lower() or 'premio' in c.lower())), None)
col_custo = next((c for c in df.columns if c.endswith(' (num)') and ('custo' in c.lower())), None)

# Carregar sinistros (se existir)
si_path = Path('/mnt/data/sinistros_extract/sinistros.csv')
if si_path.exists():
    si = pd.read_csv(si_path, low_memory=False)
    si.columns = [c.strip() for c in si.columns]
    col_si_apolice = 'Apólice' if 'Apólice' in si.columns else next((c for c in si.columns if 'apolice' in c.lower()), None)
    col_si_valor = 'Valor do Evento de Sinistro' if 'Valor do Evento de Sinistro' in si.columns else next((c for c in si.columns if 'valor' in c.lower() and 'sinistro' in c.lower()), None)
    col_si_data_ocorr = 'Data de Ocorrência do Sinistro' if 'Data de Ocorrência do Sinistro' in si.columns else next((c for c in si.columns if 'ocorr' in c.lower()), None)
    col_si_num = 'Número do Sinistro' if 'Número do Sinistro' in si.columns else next((c for c in si.columns if 'sinistro' in c.lower() and 'número' in c.lower()), None)

    si['_dt_ocorr'] = to_date(si.get(col_si_data_ocorr))
    si['_valor_evento'] = to_num_br(si.get(col_si_valor))

    ap_base = (
        pd.concat([
            df.groupby(cols['apolice'])[col_premio].sum(min_count=1).rename('premio_apolice') if cols['apolice'] and col_premio else pd.Series(dtype='float64'),
            df.groupby(cols['apolice'])[col_custo].sum(min_count=1).rename('custo_apolice') if cols['apolice'] and col_custo else pd.Series(dtype='float64'),
            df.groupby(cols['apolice'])[cols['produto_nome']].agg(lambda x: x.dropna().iloc[0] if len(x.dropna()) else np.nan).rename('produto') if cols['apolice'] and cols['produto_nome'] else pd.Series(dtype='object'),
            df.groupby(cols['apolice'])[cols['uf']].agg(lambda x: x.dropna().iloc[0] if len(x.dropna()) else np.nan).rename('uf') if cols['apolice'] and cols['uf'] else pd.Series(dtype='object'),
            df.groupby(cols['apolice'])[cols['status']].agg(lambda x: x.dropna().iloc[0] if len(x.dropna()) else np.nan).rename('status') if cols['apolice'] and cols['status'] else pd.Series(dtype='object'),
            df.groupby(cols['apolice'])['_dt_ini'].min().rename('dt_inicio_min') if cols['apolice'] else pd.Series(dtype='datetime64[ns]'),
        ], axis=1).reset_index()
    )

    si_agg = (
        si.groupby(col_si_apolice)
          .agg(qtd_sinistros=(col_si_num, 'nunique'),
               primeiro_sinistro=('_dt_ocorr','min'),
               valor_sinistros_total=('_valor_evento','sum'))
          .reset_index()
    )

    join_ap = ap_base.merge(si_agg, how='left', left_on=cols['apolice'], right_on=col_si_apolice)
    join_ap['tem_sinistro'] = join_ap['qtd_sinistros'].fillna(0).gt(0)
    join_ap['sinistralidade'] = join_ap['valor_sinistros_total'] / join_ap['premio_apolice']
    join_ap['tempo_ate_primeiro_sinistro_dias'] = (join_ap['primeiro_sinistro'] - join_ap['dt_inicio_min']).dt.days

    join_ap.to_csv(OUT_DIR/'20_join_por_apolice.csv', index=False)

    def summarize_by(group_col, min_pol=100):
        g = (
            join_ap.groupby(group_col, dropna=False)
                   .agg(apolices=(join_ap.columns[0], 'nunique'),
                        apolices_com_sinistro=('tem_sinistro','sum'),
                        premio_total=('premio_apolice','sum'),
                        sinistros_total=('valor_sinistros_total','sum'))
                   .assign(incidencia_sinistro=lambda d: d['apolices_com_sinistro']/d['apolices'],
                           sinistralidade=lambda d: d['sinistros_total']/d['premio_total'])
                   .sort_values('sinistralidade', ascending=False)
        )
        return g[g['apolices']>=min_pol]

    by_produto = summarize_by('produto', 100) if 'produto' in join_ap.columns else None
    by_uf = summarize_by('uf', 100) if 'uf' in join_ap.columns else None
    by_status = summarize_by('status', 100) if 'status' in join_ap.columns else None

    if by_produto is not None: by_produto.to_csv(OUT_DIR/'21_sinistralidade_por_produto.csv')
    if by_uf is not None: by_uf.to_csv(OUT_DIR/'22_sinistralidade_por_uf.csv')
    if by_status is not None: by_status.to_csv(OUT_DIR/'23_sinistralidade_por_status.csv')

    valid = join_ap['tem_sinistro'] & join_ap['tempo_ate_primeiro_sinistro_dias'].notna() & (join_ap['tempo_ate_primeiro_sinistro_dias']>=0)
    tempo = join_ap.loc[valid, 'tempo_ate_primeiro_sinistro_dias']
    tempo_stats = None
    if len(tempo):
        tempo_stats = {
            'qtd_validas': int(valid.sum()),
            'mediana': float(tempo.median()),
            'media': float(tempo.mean()),
            'q1': float(tempo.quantile(0.25)),
            'q3': float(tempo.quantile(0.75)),
            'pct_ate_30d': float((tempo<=30).mean()),
            'pct_ate_90d': float((tempo<=90).mean())
        }
    tempo_stats


## Relatório Executivo

In [None]:

from IPython.display import Markdown, display
import pandas as pd
OUT = Path('/mnt/data/analise_apolices')

def read_csv(name):
    p = OUT/name
    return pd.read_csv(p, index_col=0) if p.exists() else None

join_path = OUT/'20_join_por_apolice.csv'
join_ap = pd.read_csv(join_path, low_memory=False) if join_path.exists() else None

lines = []
if join_ap is not None:
    apolices = join_ap.iloc[:,0].nunique()
    ap_sin = int(join_ap['tem_sinistro'].sum())
    premio = float(join_ap['premio_apolice'].sum(skipna=True))
    sin_total = float(join_ap['valor_sinistros_total'].sum(skipna=True))
    sin_glob = sin_total/premio if premio else float('nan')

    valid = join_ap['tem_sinistro'] & join_ap['tempo_ate_primeiro_sinistro_dias'].notna() & (join_ap['tempo_ate_primeiro_sinistro_dias']>=0)
    tempo = join_ap.loc[valid, 'tempo_ate_primeiro_sinistro_dias']

    lines.append(f"- **Apólices (únicas)**: {apolices:,}")
    lines.append(f"- **Apólices com sinistro**: {ap_sin:,}  ({ap_sin/apolices:.2%})")
    lines.append(f"- **Prêmio total**: R$ {premio:,.0f}".replace(",", "X").replace(".", ",").replace("X", "."))
    lines.append(f"- **Sinistros total**: R$ {sin_total:,.0f}".replace(",", "X").replace(".", ",").replace("X", "."))
    lines.append(f"- **Sinistralidade global**: {sin_glob:.2%}")

    if len(tempo):
        lines.append(f"- **Tempo até 1º sinistro (mediana)**: {tempo.median():.0f} dias (média {tempo.mean():.0f} | Q1–Q3 {tempo.quantile(0.25):.0f}–{tempo.quantile(0.75):.0f} | ≤30d {(tempo<=30).mean():.2%} | ≤90d {(tempo<=90).mean():.2%})")

# Cancelamento (se gerado)
cancel_uf = read_csv("08_cancel_por_uf.csv")
if cancel_uf is not None and not cancel_uf.empty:
    worst_uf = cancel_uf.sort_values("taxa_cancel", ascending=False).head(3)
    lines.append("\n**UFs com maior taxa de cancelamento**:")
    for idx, r in worst_uf.iterrows():
        lines.append(f"- {idx}: taxa {r['taxa_cancel']:.2%} (apólices {int(r['qtd'])}, canceladas {int(r['cancelados'])})")

cancel_prod = read_csv("06_cancel_por_produto.csv")
if cancel_prod is not None and not cancel_prod.empty:
    worst_prod = cancel_prod.sort_values("taxa_cancel", ascending=False).head(3)
    lines.append("\n**Produtos com maior taxa de cancelamento**:")
    for idx, r in worst_prod.iterrows():
        lines.append(f"- {idx}: taxa {r['taxa_cancel']:.2%} (apólices {int(r['qtd'])}, canceladas {int(r['cancelados'])})")

display(Markdown("\n".join(lines) if lines else "Sem dados para o relatório."))
