In [5]:
import pathlib
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

class AmesDataProcessor:
    def __init__(self, data_dir):
        self.data_dir = pathlib.Path(data_dir)
        pd.set_option('display.max_rows', 500)
        
    def load_data(self):
        """Load the initial processed data from pickle file."""
        processed_file_path = self.data_dir / 'processed' / 'ames_with_correct_types.pkl'
        with open(processed_file_path, 'rb') as file:
            (self.data,
             self.continuous_variables,
             self.discrete_variables,
             self.ordinal_variables,
             self.categorical_variables) = pickle.load(file)
    
    @staticmethod
    def remap_categories(series: pd.Series, old_categories: tuple[str], new_category: str) -> pd.Series:
        """Remap multiple categories to a single new category."""
        series = series.cat.add_categories(new_category)
        remapped_items = series.isin(old_categories)
        series.loc[remapped_items] = new_category
        return series.cat.remove_unused_categories()
    
    def process_zoning(self):
        """Process MS.Zoning column."""
        selection = ~(self.data['MS.Zoning'].isin(['A (agr)', 'C (all)', 'I (all)']))
        self.data = self.data[selection]
        self.data['MS.Zoning'] = self.data['MS.Zoning'].cat.remove_unused_categories()
    
    def process_sale_type(self):
        """Process Sale.Type column."""
        self.data['Sale.Type'] = self.remap_categories(
            series=self.data['Sale.Type'],
            old_categories=('WD ', 'CWD', 'VWD'),
            new_category='GroupedWD'
        )
        self.data['Sale.Type'] = self.remap_categories(
            series=self.data['Sale.Type'],
            old_categories=('COD', 'ConLI', 'Con', 'ConLD', 'Oth', 'ConLw'),
            new_category='Other'
        )
    
    def process_conditions(self):
        """Process Condition columns."""
        for col in ('Condition.1', 'Condition.2'):
            self.data[col] = self.remap_categories(
                series=self.data[col],
                old_categories=('RRAn', 'RRAe', 'RRNn', 'RRNe'),
                new_category='Railroad'
            )
            self.data[col] = self.remap_categories(
                series=self.data[col],
                old_categories=('Feedr', 'Artery'),
                new_category='Roads'
            )
            self.data[col] = self.remap_categories(
                series=self.data[col],
                old_categories=('PosA', 'PosN'),
                new_category='Positive'
            )
        
        self.create_combined_condition()
        self.data = self.data.drop(columns=['Condition.1', 'Condition.2'])
    
    def create_combined_condition(self):
        """Create combined Condition column from Condition.1 and Condition.2."""
        self.data['Condition'] = pd.Series(
            index=self.data.index,
            dtype=pd.CategoricalDtype(categories=(
                'Norm', 'Railroad', 'Roads', 'Positive', 'RoadsAndRailroad'
            ))
        )
        
        # Set conditions based on rules
        norm_items = self.data['Condition.1'] == 'Norm'
        self.data['Condition'][norm_items] = 'Norm'
        
        railroad_items = (self.data['Condition.1'] == 'Railroad') & (self.data['Condition.2'] == 'Norm')
        self.data['Condition'][railroad_items] = 'Railroad'
        
        roads_items = (self.data['Condition.1'] == 'Roads') & (self.data['Condition.2'] != 'Railroad')
        self.data['Condition'][roads_items] = 'Roads'
        
        positive_items = self.data['Condition.1'] == 'Positive'
        self.data['Condition'][positive_items] = 'Positive'
        
        roads_and_railroad_items = (
            (self.data['Condition.1'] == 'Railroad') & (self.data['Condition.2'] == 'Roads')
        ) | (
            (self.data['Condition.1'] == 'Roads') & (self.data['Condition.2'] == 'Railroad')
        )
        self.data['Condition'][roads_and_railroad_items] = 'RoadsAndRailroad'
    
    def process_features(self):
        """Process miscellaneous features."""
        # Create HasShed feature
        self.data['HasShed'] = self.data['Misc.Feature'] == 'Shed'
        self.data = self.data.drop(columns='Misc.Feature')
        
        # Create HasAlley feature
        self.data['HasAlley'] = ~self.data['Alley'].isna()
        self.data = self.data.drop(columns='Alley')
    
    def process_exterior(self):
        """Process exterior-related columns."""
        # Fix inconsistencies in Exterior.2nd
        self.data['Exterior.2nd'] = self.remap_categories(
            series=self.data['Exterior.2nd'],
            old_categories=('Brk Cmn',),
            new_category='BrkComm'
        )
        self.data['Exterior.2nd'] = self.remap_categories(
            series=self.data['Exterior.2nd'],
            old_categories=('CmentBd',),
            new_category='CemntBd'
        )
        self.data['Exterior.2nd'] = self.remap_categories(
            series=self.data['Exterior.2nd'],
            old_categories=('Wd Shng',),
            new_category='WdShing'
        )
        
        # Sort categories
        for col in ('Exterior.1st', 'Exterior.2nd'):
            categories = self.data[col].cat.categories
            self.data[col] = self.data[col].cat.reorder_categories(sorted(categories))
        
        # Group rare materials
        mat_count = self.data['Exterior.1st'].value_counts()
        rare_materials = list(mat_count[mat_count < 40].index)
        self.data['Exterior'] = self.remap_categories(
            series=self.data['Exterior.1st'],
            old_categories=rare_materials,
            new_category='Other'
        )
        self.data = self.data.drop(columns=['Exterior.1st', 'Exterior.2nd'])
    
    def process_numerical_features(self):
        """Process numerical features."""
        # Transform SalePrice
        self.data['SalePrice'] = self.data['SalePrice'].apply(np.log10)
        
        # Handle Lot.Frontage
        self.data['Lot.Frontage'] = self.data['Lot.Frontage'].fillna(self.data['Lot.Frontage'].median())
        
        # Process garage age
        self.process_garage_age()
        
        # Process house and remodeling age
        self.process_house_ages()
        
        # Handle masonry veneer area
        self.data.loc[self.data['Mas.Vnr.Area'].isna(), 'Mas.Vnr.Area'] = 0.0
    
    def process_garage_age(self):
        """Process garage age-related features."""
        garage_age = self.data['Yr.Sold'] - self.data['Garage.Yr.Blt']
        garage_age[garage_age < 0.0] = 0.0
        self.data = self.data.drop(columns='Garage.Yr.Blt')
        self.data['Garage.Age'] = garage_age
        self.data['Garage.Age'] = self.data['Garage.Age'].fillna(self.data['Garage.Age'].median())
    
    def process_house_ages(self):
        """Process house age-related features."""
        remod_age = self.data['Yr.Sold'] - self.data['Year.Remod.Add']
        remod_age[remod_age < 0.0] = 0.0
        
        house_age = self.data['Yr.Sold'] - self.data['Year.Built']
        house_age[house_age < 0.0] = 0.0
        
        self.data = self.data.drop(columns=['Year.Remod.Add', 'Year.Built'])
        self.data['Remod.Age'] = remod_age
        self.data['House.Age'] = house_age
    
    def clean_data(self):
        """Perform final cleaning steps."""
        # Drop unnecessary columns
        columns_to_drop = ['Street', 'Utilities', 'Pool.QC', 'Fireplace.Qu',
                          'Garage.Cond', 'Garage.Qual', 'Heating']
        self.data = self.data.drop(columns=columns_to_drop)
        
        # Handle missing values
        self.data = self.data.dropna(axis=0)
        
        # Clean up categories
        for col in self.data.select_dtypes('category').columns:
            self.data[col] = self.data[col].cat.remove_unused_categories()
    
    def save_clean_data(self):
        """Save the cleaned data to a pickle file."""
        clean_data_path = self.data_dir / 'processed' / 'ames_clean.pkl'
        with open(clean_data_path, 'wb') as file:
            pickle.dump(self.data, file)
    
    def process_data(self):
        """Execute all data processing steps."""
        self.load_data()
        self.process_zoning()
        self.process_sale_type()
        self.process_conditions()
        self.process_features()
        self.process_exterior()
        self.process_numerical_features()
        self.clean_data()
        self.save_clean_data()
        return self.data

def main(path):
    data_dir = path
    processor = AmesDataProcessor(data_dir)
    processed_data = processor.process_data()
    print("Data processing completed successfully.")
    return processed_data



In [6]:
# Bibliotecas base
import numpy as np
import pandas as pd
import pathlib

# Scikit-learn - Preprocessamento
from sklearn.preprocessing import (
    StandardScaler,
    PolynomialFeatures,
    OneHotEncoder
)

# Scikit-learn - Modelos
from sklearn.linear_model import Ridge
from sklearn.ensemble import RandomForestRegressor

# Scikit-learn - Pipeline e Composição
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer

# Scikit-learn - Divisão de dados e Validação
from sklearn.model_selection import (
    train_test_split,
    GridSearchCV
)

# Scikit-learn - Métricas
from sklearn.metrics import mean_squared_error

In [7]:

# Função que carrega os dados de um arquivo CSV usando uma função customizada chamada `main`
# (Assume-se que a função `main()` já esteja definida em outro ponto do código)
data = main(pathlib.Path.cwd().parent / 'data')

# Realiza a codificação das variáveis categóricas com a técnica One-Hot Encoding
# O parâmetro `drop_first=True` é utilizado para evitar a multicolinearidade,
# removendo a primeira coluna codificada (efeito "dummy variable trap")
data = pd.get_dummies(data, drop_first=True)

# Converter todas as colunas que possuem dados booleanos para o tipo float
# Isso é necessário porque alguns modelos esperam que os dados estejam em formato numérico
data = data.astype({col: 'float' for col in data.select_dtypes(include=['bool']).columns})

# Separar o DataFrame em duas partes:
# `X` contém todas as colunas que representam as features (variáveis independentes),
# enquanto `y` é a coluna alvo (variável dependente) que estamos tentando prever, neste caso 'SalePrice'
X = data.drop(columns='SalePrice')  # Remover a coluna 'SalePrice' para criar as features
y = data['SalePrice']  # Selecionar 'SalePrice' como a variável alvo

# Até esse ponto:
# - O conjunto de dados foi carregado e preparado.
# - Todas as variáveis categóricas foram convertidas em variáveis numéricas utilizando One-Hot Encoding.
# - As colunas booleanas foram convertidas para o tipo float para garantir compatibilidade com os modelos.
# - `X` e `y` representam, respectivamente, as features e o target para modelagem preditiva.



Data processing completed successfully.


In [8]:
# Divisão dos dados em conjuntos de treino e teste
# X: Features (variáveis independentes)
# y: Target (variável dependente)
X_train, X_test, y_train, y_test = train_test_split(
    X,                  # Matrix de features
    y,                  # Vetor target
    test_size=0.2,      # 20% dos dados para teste
    random_state=42     # Semente para reprodutibilidade
)

# Criação do pipeline de preprocessamento e modelo
ridge_pipeline = Pipeline([
    ('scaler', StandardScaler()),     # Padronização das features (média 0, variância 1)
    ('poly', PolynomialFeatures(include_bias=False)),  # Geração de features polinomiais
    ('model', Ridge())                # Modelo de regressão Ridge
])

# Grade de hiperparâmetros para busca
Ridge_param_grid = {
    'poly__degree': [1, 2, 3],        # Graus polinomiais a serem testados
    'model__alpha': np.logspace(-3, 3, 7)  # Valores de regularização (10^-3 a 10^3)
}

# Configuração da busca em grade com validação cruzada
Ridge_grid_search = GridSearchCV(
    ridge_pipeline,         # Pipeline a ser otimizado
    Ridge_param_grid,       # Grade de parâmetros
    cv=5,                   # 5-fold cross-validation
    scoring='neg_mean_squared_error',  # Métrica de avaliação
    n_jobs=-1              # Usa todas as CPUs disponíveis
)

# Treina o modelo com todos os parâmetros da grade
Ridge_grid_search.fit(X_train, y_train)

# Retorna os melhores parâmetros e score
best_params = Ridge_grid_search.best_params_  # Melhores hiperparâmetros encontrados
best_score = Ridge_grid_search.best_score_    # Melhor score na validação cruzada
best_params, best_score

({'model__alpha': 100.0, 'poly__degree': 1}, -0.0030260112118220064)

In [9]:

# Criação do pipeline com Random Forest
rf_pipeline = Pipeline([
    ('model', RandomForestRegressor(n_jobs=-1))  # n_jobs=-1 utiliza todas as CPUs disponíveis
])

# Define a grade de hiperparâmetros para otimização
rf_param_grid = {
    'model__n_estimators': [50, 100],  # Número de árvores na floresta
    'model__max_depth': [5, 10]        # Profundidade máxima de cada árvore
}

# Configuração da busca em grade com validação cruzada
rf_grid = GridSearchCV(
    rf_pipeline,           # Pipeline a ser otimizado
    rf_param_grid,         # Grade de parâmetros
    cv=5,                  # 5-fold cross-validation
    scoring='neg_mean_squared_error',  # Métrica de avaliação
    n_jobs=-1             # Usa todas as CPUs disponíveis
)

# Treina o modelo com todos os parâmetros da grade
rf_grid.fit(X_train, y_train)

# Obtém os melhores parâmetros e score
best_params = rf_grid.best_params_    # Melhores hiperparâmetros encontrados
best_score = rf_grid.best_score_      # Melhor score na validação cruzada
best_params, best_score

({'model__max_depth': 10, 'model__n_estimators': 100}, -0.0035533534493707316)

In [11]:


# Obtém o melhor modelo da busca em grade
best_model = Ridge_grid_search.best_estimator_  # Extrai o modelo com melhores parâmetros
best_model_name = "Ridge"                      # Nome do modelo para referência

# Treina o melhor modelo com os dados de treino
best_model.fit(X_train, y_train)

# Faz previsões no conjunto de teste
test_predictions = best_model.predict(X_test)

# Calcula o RMSE (Root Mean Squared Error)
test_rmse = np.sqrt(mean_squared_error(y_test, test_predictions))

# Imprime o resultado do RMSE
print(f"Best model ({best_model_name}) test RMSE: {test_rmse:.4f}")

# Calcula o erro percentual médio
# Converte RMSE para escala original (desfaz a transformação logarítmica)
erro = 10**test_rmse - 1  # Subtrai 1 para obter a proporção do erro
erro_percentual = erro * 100  # Converte para percentual
print("Erro percentual: {:.2f}%".format(erro_percentual))

Best model (Ridge) test RMSE: 0.0432
Erro percentual: 10.46%


Os valores da coluna SalePrice estão em escala logarítmica de base 10. Portanto, para calcular o erro em termos absolutos, é necessário aplicar a função inversa, ou seja, 10^x. O mesmo se aplica ao preço das casas: por exemplo, um valor de 5 corresponde a um preço real de 10^5. O RMSE (erro quadrático médio) do nosso modelo de previsão de preços foi de 0.0432 na escala logarítmica, que corresponde a 10^0.0432 ≈ 1.106. Assim, o erro médio do modelo é de aproximadamente 10.6%.

A margem de erro de aproximadamente 10% apresentada pelo nosso modelo de previsão de preços imobiliários pode ser considerada bastante satisfatória quando analisamos o contexto completo do mercado. A precificação de imóveis é reconhecidamente um dos desafios mais complexos no campo de modelagem preditiva, pois envolve uma multiplicidade de variáveis que influenciam o valor final de uma propriedade.

Os fatores que afetam o preço de um imóvel vão desde aspectos quantitativos facilmente mensuráveis, como localização geográfica, metragem e número de cômodos, até elementos qualitativos mais subjetivos, como estado de conservação, qualidade dos acabamentos e apelo estético. Além disso, existem variáveis macroeconômicas que impactam diretamente o mercado, como taxas de juros, tendências locais e ciclos econômicos.

Modelos tradicionais costumam apresentar erros na faixa de 15-20%, nossa solução mantém a margem de erro em torno de 10%. É importante notar que mesmo especialistas humanos frequentemente divergem em suas avaliações, demonstrando a complexidade inerente à tarefa de precificação imobiliária.

Em suma, considerando a natureza complexa do mercado imobiliário, a presença de variáveis subjetivas difíceis de quantificar e a volatilidade inerente ao setor, uma margem de erro de 10% demonstra um desempenho adequado para aplicações práticas.