In [None]:
import pandas as pd
import os
import pysubgroup as ps
import numpy as np
from bcb import sgs

In [None]:
# Junta todos os arquivos da série histórica que estão dentro de uma mesma pasta
path = "Dados/"
all_entries = os.listdir(path)
file_names = [entry for entry in all_entries if os.path.isfile(os.path.join(path, entry))]

itbi = pd.DataFrame({})
for file_name in file_names:
    temp_data = pd.read_csv(path+file_name, delimiter=";", low_memory=False)
    itbi = pd.concat([itbi, temp_data], axis=0)

itbi['Data Quitacao Transacao Formatada'] = pd.to_datetime(itbi['Data Quitacao Transacao'], format='%d/%m/%Y')
itbi['Ano Avaliacao'] = itbi['Data Quitacao Transacao Formatada'].dt.year
itbi['Mes Avaliacao'] = itbi['Data Quitacao Transacao Formatada'].dt.month
itbi.sort_values(by='Data Quitacao Transacao Formatada', ascending=True, inplace=True)

In [None]:
itbi = itbi.rename(columns={'Area Construida Adquirida': 'area',
                            'Padrao Acabamento Unidade': 'padrao',
                            'Valor Base Calculo': 'preco'})

# Padronização dos nomes
itbi['Bairro'] = itbi['Bairro'].str.strip().str.upper()
itbi['Descricao Tipo Ocupacao Unidade'] = itbi['Descricao Tipo Ocupacao Unidade'].str.strip().str.upper()

itbi.head()

## Pré-processamento dos dados

In [None]:
# Cria as novas variáveis e filtra os dados

itbi['preco'] = itbi['preco'].str.replace(".", "", regex=False)
itbi['preco'] = itbi['preco'].str.replace(",", ".")
itbi['preco'] = itbi['preco'].astype(float)

itbi['area'] = itbi['area'].str.replace(".", "", regex=False)
itbi['area'] = itbi['area'].str.replace(",", ".")
itbi['area'] = itbi['area'].astype(float)

itbi = itbi.query("area >= 50")
itbi = itbi.query("preco >= 100000")
itbi = itbi.query("`Descricao Tipo Ocupacao Unidade` == 'RESIDENCIAL'")

itbi['idade'] = itbi['Ano Avaliacao'] - itbi['Ano de Construcao Unidade']
itbi['valor_m2'] = itbi['preco'] / itbi['area']

itbi = itbi.query("idade >= 0 and idade <= 100")

In [None]:
# Vamos trazer os preços dos imóveis para valor presente, pois precisamos comparar seus valores em anos diferentes

# Busca os dados do IGP-M pela biblioteca do banco central e cria o deflator
print("Buscando dados do IGP-M no Banco Central...")
igpm = sgs.get({'IGP-M': 189}, start='2008-01-01').rename(columns={'IGP-M': 'igpm_mensal'})
igpm['igpm_mensal'] = igpm['igpm_mensal'] / 100

data_referencia = igpm.index.max()
print(f"Todos os valores serão corrigidos para a data de referência: {data_referencia.strftime('%B de %Y')}")

valor_ref_acumulado = (1 + igpm['igpm_mensal']).cumprod().loc[data_referencia]
igpm['fator_acumulado'] = (1 + igpm['igpm_mensal']).cumprod()
igpm['fator_correcao'] = valor_ref_acumulado / igpm['fator_acumulado']
igpm['mes_ano'] = igpm.index.to_period('M')

# Aplicação do deflator pelo mês e ano de avaliação
itbi['mes_ano'] = itbi['Data Quitacao Transacao Formatada'].dt.to_period('M')

# Junta o fator de correção ao seu DataFrame principal
print("Juntando o fator de correção aos dados dos imóveis...")
itbi = pd.merge(itbi, igpm[['mes_ano', 'fator_correcao']], on='mes_ano', how='left')


# Aplica a correção e criar as novas colunas
print("Calculando os valores corrigidos...")
itbi['preco_corrigido'] = itbi['preco'] * itbi['fator_correcao']
itbi['valor_m2_corrigido'] = itbi['preco_corrigido'] / itbi['area']

print("\nCorreção inflacionária aplicada com sucesso usando o mês exato!")
itbi[["Ano Avaliacao", "Mes Avaliacao", 'preco', 'preco_corrigido', 'valor_m2', 'valor_m2_corrigido']].head()

In [None]:
# Vamos realizar mais um refinamento nos dados: iremos eliminar os dados categóricos que aparecem poucas vezes
print(f"Tamanho original do dataset: {len(itbi):,}")

# Passo 1: Filtrar pelos tipos de imóvel de interesse
tipos_de_interesse = ['AP', 'CA']
itbi = itbi[itbi['Tipo Construtivo Preponderante'].isin(tipos_de_interesse)].copy()

print(f"Tamanho após filtrar por tipo de imóvel: {len(itbi):,}")

# Passo 2: Filtrar bairros com poucos imóveis
# Primeiro, contamos quantos imóveis cada bairro tem
contagem_bairros = itbi['Bairro'].value_counts()

# Definimos um limite mínimo. Podemos experimentar outros valores
limite_minimo_imoveis = 10 

# Pegamos a lista de bairros que ATENDEM ao critério
bairros_validos = contagem_bairros[contagem_bairros >= limite_minimo_imoveis].index

# Filtramos o DataFrame para manter apenas esses bairros
itbi = itbi[itbi['Bairro'].isin(bairros_validos)].copy()

print(f"Tamanho após remover bairros com menos de {limite_minimo_imoveis} imóveis: {len(itbi):,}")

In [None]:
print(itbi.shape)
itbi.describe()

In [None]:
# Iremos utilizar o ano de avaliação como variável categórica para termos acesso mais fácil à série histórica
print(f"Tipo original da coluna 'Ano Avaliacao': {itbi['Ano Avaliacao'].dtype}")

# CONVERTER O ANO PARA UM TIPO CATEGÓRICO (string é a forma mais simples)
itbi['Ano Avaliacao'] = itbi['Ano Avaliacao'].astype(str)

print(f"Novo tipo da coluna 'Ano Avaliacao': {itbi['Ano Avaliacao'].dtype}")

## Teste 0: padrões apenas com os bairros

In [None]:
target = ps.NumericTarget('valor_m2')

include = ['Bairro']
ignore = list(set(list(itbi)) - set(include))
search_space = ps.create_selectors(itbi, ignore=ignore)

# Usaremos a StandardQF, que mede a diferença da média do subgrupo em desvios padrão
# O parâmetro 'a' ajuda a ponderar o tamanho do subgrupo.
quality_function = ps.StandardQFNumeric(a=0.5)

In [None]:
# Criando a tarefa de descoberta de subgrupos
task = ps.SubgroupDiscoveryTask(
    itbi,
    target,
    search_space,
    result_set_size=20,  # Queremos os 10 melhores subgrupos
    depth=1,             # Profundidade máxima da descrição (ex: Bairro='SAVASSI' AND Tipo_Imovel='APARTAMENTO')
    qf=quality_function
)

# Executando a busca
result = ps.BeamSearch().execute(task)

In [None]:
media_global_nominal = itbi['valor_m2'].mean()
media_global_real = itbi['valor_m2_corrigido'].mean()
df_results = result.to_dataframe()
medias_reais_sg = []

for index, row in df_results.iterrows():
    subgrupo_obj = row['subgroup']
    dados_do_subgrupo = subgrupo_obj.covers(itbi)
    media_real = itbi[dados_do_subgrupo]['valor_m2_corrigido'].mean()
    medias_reais_sg.append(media_real)
df_results['mean_sg_corrigido'] = medias_reais_sg

print("="*60)
print("Análise Detalhada dos Subgrupos de Maior Impacto Nominal")
print("="*60)
print(f"Média Global Nominal: R$ {media_global_nominal:,.2f}")
print(f"Média Global Real (corrigida): R$ {media_global_real:,.2f}\n")


for index, row in df_results.iterrows():
    # Coletando os valores da linha
    qualidade = row['quality']
    descricao = row['subgroup']
    tamanho_subgrupo = row['size_sg']
    media_nominal_sg = row['mean_sg']
    media_real_sg = row['mean_sg_corrigido']

    # Calculando os dois impactos percentuais
    impacto_nominal = ((media_nominal_sg / media_global_nominal) - 1) * 100
    impacto_real = ((media_real_sg / media_global_real) - 1) * 100

    # Imprimindo o "card" de resultado para cada subgrupo
    print(f"Qualidade: {qualidade:.3f} | Subgrupo: {descricao}")
    print(f"   > Tamanho: {tamanho_subgrupo:,.0f} imóveis")
    print(f"   > Média Nominal: R$ {media_nominal_sg:,.2f}  (Impacto: {impacto_nominal:+.2f}%)")
    print(f"   > Média Real   : R$ {media_real_sg:,.2f}  (Impacto: {impacto_real:+.2f}%)")
    print("-" * 40)