### Data preprocessing pipeline

In [None]:
class NNTransformer(TransformerMixin):
    def __init__(self, n_neighbors, columns):
        self.n_neighbors = n_neighbors
        self.columns = columns
        self.median = 0

    def fit(self, X, y):
        self.nn = NearestNeighbors(n_neighbors=self.n_neighbors)
        self.nn.fit(X[self.columns], y)
        self.price_sqm = (y / X['size']).replace(np.inf, np.nan).values
        self.median = np.median(self.price_sqm)
        return self

    def transform(self, X):
        nn_id = self.nn.kneighbors(X[self.columns])[1]
        nn_price = np.nanmean(self.price_sqm[nn_id[:, 1:]], axis=1)
        nn_price = np.where(np.isnan(nn_price), self.median, nn_price)
        return nn_price

In [None]:
class DataPreprocess(BaseEstimator, TransformerMixin):

    def __init__(self, feat_to_impute, feat_by_prop_type, upper_threshold, feat_to_drop, departments, 
                 city_population, n_neighbors):
        self.medians = {}
        self.medians_by_prop_type = {}
        self.feat_to_impute = feat_to_impute
        self.feat_by_prop_type = feat_by_prop_type
        self.upper_threshold = upper_threshold
        self.feat_to_drop = feat_to_drop
        self.departments = departments
        self.city_population = city_population
        self.n_neighbors = n_neighbors
        self.nn_transformer = NNTransformer(self.n_neighbors, ['approximate_latitude', 'approximate_longitude'])


    def fit(self, X, y):
        X = self._impute_missing_values(X)
        self._get_medians(X)
        self.nn_transformer.fit(X, y)
        return self


    def transform(self, X, y=None):
        df = self._impute_missing_values(X)
        df = self._impute_median(df)
        df = self._feature_engineering(df)
        df = self._trimming_outliers(df)
        df = self._features_selection(df)
        df['nearest_price_sqm'] = self.nn_transformer.transform(df)
        return df


    def _impute_missing_values(self, X):

        X.loc[X.property_type.isin(['appartement', 'loft', 'chambre', 'duplex', 'gîte', 'péniche', 'atelier']) 
              & X.land_size.isna(), 'land_size'] = 0

        X.loc[X.property_type.isin(['terrain', 'terrain à bâtir', 'parking']) & (X['size'] == X.land_size), 'size'] = 0

        rows = X.property_type.isin(['terrain à bâtir', 'terrain', 'parking']) & X.land_size.isna() & ~X['size'].isna()
        X.loc[rows, 'land_size'] = X.loc[rows, 'size']
        X.loc[rows, 'size'] = 0

        X.loc[X.property_type.isin(['terrain', 'terrain à bâtir', 'parking']) & X['size'].isna(), 'size'] = 0

        X.loc[X.property_type.isin(['parking', 'terrain', 'terrain à bâtir']) & X.energy_performance_value.isna(), 'energy_performance_value'] = 0
        X.loc[X.property_type.isin(['parking', 'terrain', 'terrain à bâtir']) & X.energy_performance_category.isna(), 'energy_performance_category'] = 'Not applicable'
        X.loc[X.property_type.isin(['parking', 'terrain', 'terrain à bâtir']) & X.ghg_value.isna(), 'ghg_value'] = 0
        X.loc[X.property_type.isin(['parking', 'terrain', 'terrain à bâtir']) & X.ghg_category.isna(), 'ghg_category'] = 'Not applicable'

        X['floor'] = X['floor'].fillna(0)

        X.loc[X.property_type.isin(['parking', 'terrain', 'terrain à bâtir']) & X.nb_rooms.isna(), 'nb_rooms'] = 0
        X.loc[X.property_type.isin(['parking', 'terrain', 'terrain à bâtir']) & X.nb_bedrooms.isna(), 'nb_bedrooms'] = 0
        X.loc[X.property_type.isin(['parking', 'terrain', 'terrain à bâtir']) & X.nb_bathrooms.isna(), 'nb_bathrooms'] = 0

        X.loc[X['property_type'] == 'chambre', 'nb_bedrooms'] = 1
        return X


    def _get_medians(self, X):
        for col in self.feat_to_impute:
            if X[col].dtype == float:
                self.medians[col] = X[col].median()
            else:
                numerical_col = col.replace('category', 'value')
                self.medians[col] = X[X[numerical_col] == X[numerical_col].median()][col].iloc[0]
        self.medians_by_prop_type = X.groupby('property_type')[feat_by_prop_type].median().to_dict()


    def _impute_median(self, X):
        for col in self.feat_by_prop_type:
            X[col] = X[col].fillna(X['property_type'].map(self.medians_by_prop_type[col]))
        for col in self.feat_to_impute:
            X[col] = X[col].fillna(self.medians[col])
        return X


    def _trimming_outliers(self, X):
        for col in self.upper_threshold:
            X[col] = X[col].apply(lambda x: self.upper_threshold[col] if x > self.upper_threshold[col] else x)
        X.loc[X['size'].between(1, 10), 'size'] = 10
        X.loc[X.property_type.isin(['appartement', 'loft', 'chambre', 'duplex']) & 
              (X['size'] > 500) & (X['size'] / (10 * X['nb_rooms']) <= 50), 'size'] = X['size'] / 10
        X.loc[X.property_type.isin(['appartement', 'loft', 'chambre', 'duplex']) & 
              (X['size'] > 500) & (X['size'] / (10 * X['nb_rooms']) > 50), 'size'] = X['size'] / 100
        return X


    def _feature_engineering(self, X):
        X.loc[X['nb_bathrooms'] > 1, 'nb_bathrooms'] = 1
        X.loc[X['nb_bedrooms'] > X['nb_rooms'], 'nb_bedrooms'] = X['nb_rooms']
        X.loc[X['nb_bathrooms'] > X['nb_rooms'], 'nb_bathrooms'] = X['nb_rooms']

        X['department_num'] = X['postal_code'].astype(str).str[:-3].astype(int)
        X = pd.merge(X, self.departments, on='department_num', how='left')
        X['city_type'] = np.where(X['capital'] == X['city'], 'admin', 'minor')
        X = X.drop(['capital', 'department_num'], axis=1)
        X.loc[X.city.str.startswith(('lyon', 'marseille')) | X.city == 'bastia', 'city_type'] = 'admin'
        X.loc[X.city.str.startswith('paris'), 'city_type'] = 'primary'

        X = pd.merge(X, self.city_population, on='city', how='left')
        X['city_population'] = X['city_population'].fillna(0)
        X['city_population'] = np.where(X['city_population'] > 100000, '>100K', 
                                        np.where(X['city_population'] < 50000, '<50K', '50-100K'))
        X.loc[X.city.str.startswith(('lyon', 'marseille', 'paris')), 'city_population'] = '>100K'
        X['bedrooms_over_rooms'] = (X['nb_bedrooms'] / (X['nb_rooms'].replace(0, np.nan))).replace(np.nan, 0)
        X['size_over_rooms'] = (X['size'] / (X['nb_rooms'].replace(0, np.nan))).replace(np.nan, 0)
        X['size_over_landsize'] = (X['size'] / (X['land_size'].replace(0, np.nan))).replace(np.nan, 0)
        return X


    def _features_selection(self, X):
        X = X.rename(columns={"nb_bathrooms": "has_a_bathroom", "nb_parking_places": "has_a_parking_place", 
                              "nb_boxes": "has_a_box", "nb_terraces": "has_a_terrace"})
        X = X.drop(self.feat_to_drop, axis=1)
        return X

In [None]:
class CategEncoder(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.mapping_city_type = {'minor': 0, 'admin': 1, 'primary': 2}
        self.mapping_population = {'<50K': 0, '50-100K': 1, '>100K': 2}
    
    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        X['city_type'] = X['city_type'].map(self.mapping_city_type)
        X['city_population'] = X['city_population'].map(self.mapping_population)
        X[['department_name', 'property_type']] = X[['department_name', 'property_type']].astype("category")
        return X

In [None]:
feat_by_prop_type = ['size', 'land_size', 'nb_rooms', 'nb_bedrooms', 'nb_bathrooms']
feat_to_impute = ['energy_performance_value', 'energy_performance_category', 'ghg_value', 'ghg_category', 'size', 'land_size', 'nb_rooms', 'nb_bedrooms', 'nb_bathrooms']
feat_to_drop = ['exposition', 'city', 'postal_code', 'energy_performance_category', 'ghg_category', 'id_annonce']

In [None]:
upper_threshold = {'size': 2000,
                   'land_size': 20000,
                   'energy_performance_value': 1000,
                   'ghg_value': 400,
                   'nb_rooms': 30,
                   'floor': 22}

In [None]:
numerical = [
    'approximate_latitude',
    'approximate_longitude',
    'size',
    'land_size',
    'energy_performance_value',
    'ghg_value',
    'floor',
    'nb_rooms',
    'nb_bedrooms',
    'nb_photos',
    'nearest_price_sqm',
    'price_pred'
    ]

### Performance evaluation metrics

In [None]:
def get_metrics(y_true, y_pred):

    mae = round(mean_absolute_error(y_true, y_pred), 1)
    mdae = round(median_absolute_error(y_true, y_pred), 1)
    rmse = round(np.sqrt(mean_squared_error(y_true, y_pred)), 2)
    mape = '{:.2%}'.format(mean_absolute_percentage_error(y_true, y_pred))
    mdape = '{:.2%}'.format(((y_true - y_pred) / y_true).abs().median())
    r_squared = '{:.2%}'.format(r2_score(y_true, y_pred))
    metrics = pd.DataFrame({'mae': mae,
                            'mdae': mdae,
                            'mape': mape,
                            'mdape': mdape,
                            'rmse': rmse,
                            'r_squared':r_squared},
                           index=[0])
    return metrics

### Embeddings

In [None]:
effnet = efficientnet_b1(weights=EfficientNet_B1_Weights.DEFAULT)

img_embedder = nn.Sequential(*list(effnet.children())[:-1])
effnet.eval()

In [None]:
def preprocess_image(image_path):
    preprocess = transforms.Compose([transforms.Resize(256),
                                     transforms.CenterCrop(224),
                                     transforms.ToTensor(),
                                     transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                                          std=[0.229, 0.224, 0.225]),
                                    ])
    return preprocess(Image.open(image_path)).unsqueeze(0)

In [None]:
def embedd(image_path):
    with torch.no_grad():
        output = img_embedder(preprocess_image(image_path)).squeeze()
    return output

In [None]:
def get_embeddings(df, img_dir, saving_path):
    df_embed = {}
    for id_ann in tqdm(df.id_annonce):
        image_embeddings = []
        ann_path = os.path.join(img_dir, 'ann_' + str(id_ann))
        
        for img_name in os.listdir(ann_path):
            if not img_name.endswith(('.jpg', '.png')):
                continue
            embed = embedd(os.path.join(ann_path, img_name))
            image_embeddings.append(embed)

        image_embeddings = torch.stack(image_embeddings).mean(dim=0)
        df_embed[id_ann] = image_embeddings.numpy()

    df_embed = pd.DataFrame.from_dict(df_embed, orient='index')
    df_embed.rename(columns={'Unnamed: 0': 'id_annonce'}, inplace=True)
    df_embed.to_csv(saving_path)

In [None]:
get_embeddings(X_train, img_dir, emb_dir)

### Hyperparameter tuning with Optuna

In [None]:
def callback(study, trial):
    if study.best_trial.number == trial.number:
        study.set_user_attr(key='best_booster', value=trial.user_attrs['best_booster'])

#### LGBM

In [None]:
def objective_lgbm(trial):
    
    model_params = {
        'objective': 'regression',
        'verbosity': -1,
        'random_state': 42,
        'num_leaves': trial.suggest_categorical('num_leaves', [32, 64, 128, 256, 512]),
        'max_depth': trial.suggest_int('max_depth', 3, 20),
        'n_estimators': 2000,
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2, log=True),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.2, 1.0, step=0.05),
        'subsample': trial.suggest_float('subsample', 0.05, 1.0, step=0.05),
        'subsample_freq': 1,
        'min_child_samples': trial.suggest_int('min_child_samples', 5, 100, step=5)
    }

    prep_params = {
        'n_neighbors': trial.suggest_int('n_neighbors', 5, 50)
    }

    preprocessor = DataPreprocess(feat_to_impute, feat_by_prop_type, upper_threshold, feat_to_drop, departments, 
                                  city_population, **prep_params)
    col_transform = ColumnTransformer([('numerical_trans', RobustScaler(), numerical)], remainder='passthrough', 
                                      verbose_feature_names_out=False)
    
    pipe_prep = Pipeline([
        ('preprocess', preprocessor),
        ('encode', CategEncoder()),
        ('transform', col_transform)]
    )
    model = LGBMRegressor(**model_params)
    pipe = make_pipeline(pipe_prep,
                         TransformedTargetRegressor(
                             regressor=model,
                             func=np.log1p,
                             inverse_func=np.expm1
                         ),
                         verbose=0
                        )
    
    pipe.fit(X_train, y_train)
    cv_scores = -cross_val_score(pipe, X_train, y_train, scoring='neg_mean_absolute_percentage_error', cv=10)

    trial.set_user_attr(key='best_booster', value=pipe)

    return np.mean(cv_scores)

In [None]:
with warnings.catch_warnings():
    warnings.filterwarnings(action='ignore', message='Mean of empty slice')
    warnings.simplefilter(action='ignore', category=FutureWarning)
    lgbm_study = optuna.create_study(study_name='finetuning_lgbm', direction='minimize')
    lgbm_study.optimize(objective_lgbm, n_trials=150, callbacks=[callback])

#### XGBoost

In [None]:
def objective_xgboost(trial):

    model_params = {
        'objective': 'reg:squarederror',
        'enable_categorical': True,
        'n_estimators': 2000,
        'verbosity': 0,
        'random_state': 42,
        'max_depth': trial.suggest_int('max_depth', 1, 10),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2, log=True),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.2, 1.0, step=0.05),
        'subsample': trial.suggest_float('subsample', 0.05, 1.0, step=0.05),
        'min_child_weight': trial.suggest_int('min_child_weight', 1, 20)
    }

    prep_params = {
        'n_neighbors': trial.suggest_int('n_neighbors', 5, 50)
    }

    preprocessor = DataPreprocess(feat_to_impute, feat_by_prop_type, upper_threshold, feat_to_drop, departments, city_population, **prep_params)
    col_transform = ColumnTransformer([('numerical_trans', RobustScaler(), numerical)], remainder='passthrough', verbose_feature_names_out=False)
    
    pipe_prep = Pipeline([
        ('preprocess', preprocessor),
        ('encode', CategEncoder()),
        ('transform', col_transform)]
    )
    model = XGBRegressor(**model_params)
    pipe = make_pipeline(pipe_prep,
                         TransformedTargetRegressor(
                             regressor=model,
                             func=np.log1p,
                             inverse_func=np.expm1
                         ), 
                         verbose=0
                        )
    
    pipe.fit(X_train, y_train)
    cv_scores = -cross_val_score(pipe, X_train, y_train, scoring='neg_mean_absolute_percentage_error', cv=10)

    trial.set_user_attr(key='best_booster', value=pipe)

    return np.mean(cv_scores)

In [None]:
with warnings.catch_warnings():
    warnings.filterwarnings(action='ignore', message='Mean of empty slice')
    warnings.simplefilter(action='ignore', category=FutureWarning)
    xgb_study = optuna.create_study(study_name='finetuning_xgboost', direction = 'minimize')
    xgb_study.optimize(objective_xgboost, n_trials=150, callbacks=[callback])

#### Catboost

In [None]:
def objective_catboost(trial):
    
    model_params = {
        'cat_features': ['property_type', 'department_name'],
        'iterations': 2000,
        'verbose': False,
        'random_state': 42,
        'depth': trial.suggest_int('max_depth', 1, 10),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2, log=True),
        'colsample_bylevel': trial.suggest_float('colsample_bylevel', 0.2, 1.0, step=0.05),
        'subsample': trial.suggest_float('subsample', 0.05, 1.0, step=0.05),
        'min_data_in_leaf': trial.suggest_int('min_data_in_leaf', 5, 100, step=5)
    }

    prep_params = {
        'n_neighbors': trial.suggest_int('n_neighbors', 5, 50)
    }

    preprocessor = DataPreprocess(feat_to_impute, feat_by_prop_type, upper_threshold, feat_to_drop, departments, city_population, **prep_params)
    col_transform = ColumnTransformer([('numerical_trans', RobustScaler(), numerical)], remainder='passthrough', verbose_feature_names_out=False)
    
    pipe_prep = Pipeline([
        ('preprocess', preprocessor),
        ('encode', CategEncoder()),
        ('transform', col_transform)]
    )
    model = CatBoostRegressor(**model_params)
    pipe = make_pipeline(pipe_prep,
                         TransformedTargetRegressor(
                             regressor=model,
                             func=np.log1p,
                             inverse_func=np.expm1
                         ),
                         verbose=1
                        )
    
    pipe.fit(X_train, y_train)
    cv_scores = -cross_val_score(pipe, X_train, y_train, scoring='neg_mean_absolute_percentage_error', cv=10)

    trial.set_user_attr(key='best_booster', value=pipe)

    return np.mean(cv_scores)

In [None]:
with warnings.catch_warnings():
    warnings.filterwarnings(action='ignore', message='Mean of empty slice')
    warnings.simplefilter(action='ignore', category=FutureWarning)
    cat_study = optuna.create_study(study_name='finetuning_catboost', direction = 'minimize')
    cat_study.optimize(objective_catboost, n_trials=150, callbacks=[callback])