In [None]:
#  import main packages
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
from pathlib import Path

# define variables
immo_vis_dir = "../../data/immo_vis/"
parquet_extension = ".parquet"
metropole_appartement_file = "ventes-metropole-appartement" + parquet_extension
metropole_maison_file = "ventes-metropole-maison" + parquet_extension

dpe_ranking     = ['0','A','B','C','D','E','F','F/G','G','NS','Unknown']
dpe_ranking_num = [10,1,2,3,4,5,6,6.5,7,9,10]
LargePlotActive = False
MediumPlotActive = False

In [None]:
# functions 
def print_numerical_isna(df) :
    columns = df.select_dtypes(include='number').columns.tolist()
    print (df[columns].isna().sum())

def print_categorial_isna(df) :
    columns = df.select_dtypes(include='category').columns.tolist()
    print (df[columns].isna().sum())     

def load_appartement_file () :
    start_path = Path(immo_vis_dir)
    final_path = start_path / metropole_appartement_file
    final_path.as_posix
    return pd.read_parquet(final_path.as_posix())


In [None]:
# import appartement file

start_path = Path(immo_vis_dir)
final_path = start_path / metropole_appartement_file
final_path.as_posix
df = load_appartement_file()
nb_rows= df.shape[0]
nb_cols= df.shape[1]

In [None]:
#  print main infos on appartmeent file


print ( f" {metropole_appartement_file} rows {nb_rows} columns {nb_cols}")
print (df.shape)
print (df.columns)
print (df.info())
print (df.describe())



In [None]:
#  print columns modalities
for i in df.columns:
    print(f'modalities  {i} are : ',(df[i].nunique()))

In [None]:
#  print comlmns nan values
for i in df.columns:
    print(f' {i} nan values {df[i].isna().sum()}  {(100*df[i].isna().sum()/len(df)):.2f}%')

In [None]:
# analyze type de bien
print (df['typedebien'].value_counts()) 
print (df['logement_neuf'].value_counts()) 
# print (df['typedebien'].value_counts(normalize=True)*100)
appartement_anciens = df.loc[df["typedebien"] == "a"]
appartement_neufs = df.loc[df["typedebien"] == "an"]

logement_anciens = df.loc[df["typedebien"] == "o"]
logement_neufs = df.loc[df["typedebien"] == "n"]

print ("appartement_neufs", appartement_neufs["logement_neuf"].value_counts())
print ("appartement_anciens", appartement_anciens["logement_neuf"].value_counts())


# select apppr
print ("appartements anciens , logement ancien ", appartement_anciens[appartement_anciens["logement_neuf"]== "o"].shape[0])
print ("appartements anciens , logement neuf ", appartement_anciens[appartement_anciens["logement_neuf"]== "n"].shape[0])

print ("appartements neufs , logement neuf ", appartement_neufs[appartement_neufs["logement_neuf"]== "n"].shape[0])
print ("appartements neufs , logement ancien ", appartement_neufs[appartement_neufs["logement_neuf"]== "o"].shape[0])

print (f" prix vente m2 appartements anciens  {appartement_anciens['prix_m2_vente'].median()}" )
print (f" prix vente m2 appartement_neufs {appartement_neufs['prix_m2_vente'].median()}")

print (f" prix vente m2 logements anciens  {logement_anciens['prix_m2_vente'].median()}" )
print (f" prix vente m2 logements neufs {logement_neufs['prix_m2_vente'].median()}")

print (f" dpeL appartements anciens  {appartement_anciens['dpeL'].mode()[0]}" )
print (f" dpeL  appartement_neufs {appartement_neufs['dpeL'].mode()[0]}")

print ("logement_neuf is not a relaible criteria")

In [None]:
#  modify logement_neuf according to type de bien
mask = (df["typedebien"] == "an") & (df["logement_neuf"]== "o")
df.loc[mask,'logement_neuf'] ="n"

mask = (df["typedebien"] == "a") & (df["logement_neuf"]== "n")
df.loc[mask,'logement_neuf'] ="o"

# replace an by a
df.loc[:,'typedebien'] ="a"

# process logement neuf as integer
df['logement_neuf'] = df['logement_neuf'].replace(['o','n'],[1,0])

print (df.typedebien.value_counts())
print (df.logement_neuf.value_counts())



## Drop columns that are not relevant

In [None]:
#rendre index=idannonce pour retrouver la ligne initiale
df.set_index('idannonce',drop=True,inplace=True)

#supprimer les colonnes non pertinentes pour les maisons
df.drop(columns=['type_annonceur', 'typedebien', 'typedetransaction','typedebien_lite',
       'prix_maison', 'prix_terrain', 'mensualiteFinance',"surface_terrain",
    #    'dpeC', #pas d'information supplémentaire sur dpeC par rapport à dpeL
    #    'date','eau',
       'categorie_annonceur',  'date', 'INSEE_COM', 'IRIS', 'duree_int',
       'TYP_IRIS_x', 'TYP_IRIS_y', 'GRD_QUART', 'UU2010', 'REG', 
       'loyer_m2_median_n6', 'nb_log_n6', 'taux_rendement_n6',
       'loyer_m2_median_n7', 'nb_log_n7', 'taux_rendement_n7'], 
       inplace=True)

## Process prix_m2_vente

In [None]:
# pre-analyse prix de vente au m2

if MediumPlotActive :
    plt.figure(figsize=(22,6))
    plt.subplot(151)
    sns.boxplot(df['prix_m2_vente'])
    plt.subplot(152)
    sns.histplot(df['prix_m2_vente'])
    plt.subplot(153)
    sns.scatterplot (df,x="surface",y="prix_m2_vente")
    plt.show()


In [None]:


# remove outliers

colonnes_num_prim = ['surface',  'prix_m2_vente']

# Création du masque en utilisant apply
mask = df[colonnes_num_prim].apply(lambda x: (x >= x.quantile(0.005)) & (x <= x.quantile(0.995))| x.isna()).all(axis=1)
#ne garder que les lignes qui sont entre q=0.005 et q=0.995
df=df[mask]
#Vérification des résultats
df[colonnes_num_prim].describe()

(df['prix_m2_vente']*df['surface']-df['prix_bien']).describe()

if MediumPlotActive :
    plt.figure(figsize=(22,6))
    plt.subplot(151)
    sns.boxplot(df['prix_m2_vente'])
    plt.subplot(152)
    sns.histplot(df['prix_m2_vente'])
    plt.subplot(153)
    sns.scatterplot (df,x="surface",y="prix_m2_vente")
    plt.show()

# 

## Traitement des colonnes  

In [None]:
# analyse NA values
import matplotlib.ticker as mtick

missing_values = df.isna().sum()
missing_values = missing_values[missing_values > 0].sort_values(ascending=False)
# full_columns = df.columns[ df.isna().any() == False ]
print (f" nb_rows {nb_rows} nb_cols {nb_cols}")
# print(f"columns without Nan values {full_columns.size} / {nb_cols}" )
# print (f"columns without Nans {full_columns}")
# print (missing_values)
if MediumPlotActive :
    plt.figure(figsize=(20,6))
    ax = plt.subplot(121)
    plt.plot (missing_values.index,missing_values.values)
    plt.axhline(y=int(nb_rows/2), color='r', linestyle='--', label='50%')
    plt.axhline(y=int(nb_rows*0.9), color='b', linestyle='--', label='90%')
    plt.legend()

    plt.xticks(rotation=80)
    plt.title(f"Missing values  nb-rows = {nb_rows}")
    ax2 = plt.subplot(122)
    missing_values_percent = (missing_values/nb_rows)*100.0
    plt.plot (missing_values_percent.index,missing_values_percent.values)
    plt.axhline(y=50.0, color='r', linestyle='--', label='50%')
    plt.axhline(y=90.0, color='b', linestyle='--', label='90%')
    plt.xticks(rotation=80)
    ax2.yaxis.set_major_formatter(mtick.PercentFormatter(100)) 
    plt.title(f"Missing values  percentage")
    plt.legend()
    plt.show()


## Process columns with too many NANs

In [None]:
# detect  columns with too many NANs (90%)
print (df.shape)
threshold = .9
nan_cols = []
for name,_ in df.items() :
    if df[name].isna().sum()/nb_rows> threshold :
        nan_cols.append(name) 
print (f"empty cols {nan_cols}")

In [None]:
# remove nan cols > threshold
df = df.drop(columns=nan_cols)

## Process numerical columns

In [None]:
# scatter plot of numerical variables / prix_m2_vente 

if LargePlotActive :
    columns = df.select_dtypes(include='number').columns.tolist()
    df.sort_values(by="prix_m2_vente",ascending=True)
    nrows = int(len(columns)/4) +1
    fig , axes = plt.subplots(nrows = nrows, ncols = 4, figsize = (30,nrows*4))
    index = 0
    for column in columns :
        if column not in ["idannonce","date"] :
            sns.scatterplot(ax=axes[int(index/4),index%4],y=df["prix_m2_vente"],x=df[column])
            index = index +1


In [None]:
# remove outliers .99 % except nb_logements_copro, charges_copro

#  etage contains negative values
df.loc[:,'etage'] = df['etage'].abs() 

columns = ['bain', 'nb_toilettes', 'nb_pieces','eau','bain','nb_pieces','nb_terraces','balcon','places_parking','nb_etages','etage']
#limiter les colonnes à q=0.99 pour éliminer les outliers
threshold = 0.99
for column in columns :
    df.loc[:,column] = df[column].clip(upper=df[column].quantile(threshold))


In [None]:
# display 
if LargePlotActive :
    nrows = int(len(columns)/4) +1
    fig , axes = plt.subplots(nrows = nrows, ncols = 4, figsize = (30,15*nrows))
    index = 0
    for column in columns :
        sns.scatterplot(ax=axes[int(index/4),index%4],y=df["prix_m2_vente"],x=df[column])
        index = index +1


In [None]:
#  remove Nan values for numerical columns
columns = df.select_dtypes(include='number').columns.tolist()
print (df[columns].isna().sum())


In [None]:

#  place de parking
df["places_parking"] = df["places_parking"].fillna(0) 
# nombres de toilettes ( can not be guessed)
df["nb_toilettes"] = df["nb_toilettes"].fillna(1).astype(int)
# logement neuf
df["logement_neuf"] = df["logement_neuf"].fillna(0)
#  nombre de terraces
df["nb_terraces"] = df["nb_terraces"].fillna(0)
df.loc[:,"nb_terraces"] = 0

print (df[columns].isna().sum())



In [None]:
# nb etages
print (df.nb_etages.value_counts(dropna=False))
fig = plt.figure(figsize=(16,8))
if MediumPlotActive :
    sns.countplot(data=df,x=df.nb_etages)
df["nb_etages"]= df["nb_etages"].fillna(df["nb_etages"].median())

In [None]:
# annee de construction
print (df.nb_logements_copro.value_counts(dropna=False,normalize=True))
if MediumPlotActive :
    sns.scatterplot(y=df["prix_m2_vente"],x=df["annee_construction"])
df["annee_construction"]= df["annee_construction"].fillna(0)


In [None]:
# nb logmeent copro
threshold=0.95
column="nb_logements_copro"
print (df[column].value_counts(dropna=False,normalize=True))
if MediumPlotActive :
    sns.scatterplot(y=df["prix_m2_vente"],x=df[column])
    plt.show()
df.loc[:,column] = df[column].clip(upper=df[column].quantile(threshold))
if MediumPlotActive :
    sns.scatterplot(y=df["prix_m2_vente"],x=df[column])
    plt.show()
print (df[column].median())
df[column]= df[column].fillna(0)



In [None]:
# charges copro
threshold=0.95
column="charges_copro"
print (df[column].value_counts(dropna=False,normalize=True))
if MediumPlotActive :
    sns.scatterplot(y=df["prix_m2_vente"],x=df[column])
    plt.show()
df.loc[:,column] = df[column].clip(upper=df[column].quantile(threshold))
if MediumPlotActive :
    sns.scatterplot(y=df["prix_m2_vente"],x=df[column])
    plt.show()
print (df[column].median())
df[column]= df[column].fillna(0)



In [None]:
#  remove Nan values for numerical columns
columns = df.select_dtypes(include='number').columns.tolist()
print (df[columns].isna().sum())

In [None]:
column="dpeC"
threshold=.99
print (df[column].value_counts(dropna=False,normalize=True))
df.loc[:,column] = df[column].clip(upper=df[column].quantile(threshold))
fig = plt.figure(figsize=(16,8))
if MediumPlotActive :
    sns.scatterplot(y=df[column],x=df["dpeL"])
    plt.show()
result = df.groupby("dpeL")[column].mean()
print (result)
fig = plt.figure(figsize=(16,8))
sns.scatterplot(y=result.values,x=result.index)
plt.show()

In [None]:
#  drop dpeC : colinearity with dpeL
column="dpeC"
df = df.drop(columns=[column])

In [None]:
#  print numercial Nan
print_numerical_isna(df)

## Process categorial columns

In [None]:
# display unique values for categorial columns for analysis

columns_categorial = df.select_dtypes(include="object").columns.tolist()
for column in columns_categorial:
    print(f"{column}: {df[column].unique()}")

In [None]:
#  Process dpel
#  NS means non significative
# how to take into account dpe zero ?
print(df["dpeL"].unique())
print(df["dpeL"].isna().sum())
print(df["dpeL"].value_counts())

df["dpeL"] = df["dpeL"].fillna("Unknown")

df["dpeL"] = df["dpeL"].replace(to_replace=["D - 231 kWh/m².year","D-218 kWh/m².year","Blank",'0 kWh/m².year'],value=["D","D","Unknown","0"])
df["dpeL"] = df["dpeL"].replace(to_replace=["E kWh/m².year","D kWh/m².year","C kWh/m².year","A kWh/m².year","856","'"],value=["E","D","C","A","Unknown","Unknown"])

df["dpeL"] = df["dpeL"].replace(to_replace=["VI"],value=["F/G"])
print(df["dpeL"].value_counts())


df['dpeL_num'] = df['dpeL'].replace(to_replace=dpe_ranking,value = dpe_ranking_num)

df['dpeL'] = pd.Categorical(df['dpeL'], dpe_ranking)
print(df["dpeL"].value_counts())

if MediumPlotActive :
    sns.histplot(df["dpeL"])
    plt.show()
    plt.figure(figsize=(10,8))
    sns.histplot(df["dpeL_num"])

In [None]:
# process categorial ges_class : gaz a effet de serre : set to unknown
print(df["ges_class"].value_counts())

df["ges_class"] = df["ges_class"].fillna("Unknown")

df["ges_class"] = df["ges_class"].replace(to_replace=["D - 49 kg CO2/m².year","E-41 kg CO2/m².year","Blank","0 kg CO2/m².year"],value=["D","E","Unknown","Unknown"])
df["ges_class"] = df["ges_class"].replace(to_replace=["VI"],value=["F/G"])
df["ges_class"] = df["ges_class"].replace(to_replace=["A kg CO2/m².year","E kg CO2/m².year","B kg CO2/m².year","C kg CO2/m².year","D kg CO2/m².year","F kg CO2/m².year","NS"],value=["A","E","B","C","D","F","Unknown"])
df['ges_class'] = pd.Categorical(df['ges_class'], ['0','A','B','C','D','E','F','F/G','G','Unknown'])

df['ges_class_num'] = df['ges_class'].replace(to_replace=dpe_ranking,value = dpe_ranking_num)

print(df["ges_class"].value_counts())

if MediumPlotActive :
    sns.histplot(df["ges_class"])
    plt.show()
    plt.figure(figsize=(10,8))
    sns.histplot(df["ges_class_num"])


In [None]:
# annon exclusive
print(df["annonce_exclusive"].value_counts())
df["annonce_exclusive"] = df["annonce_exclusive"].replace(to_replace=['Oui','Non','0'],value=[1,0,2]).astype(int)


In [None]:
print(df["cave"].value_counts())
df["cave"] = df["cave"].replace(to_replace=[np.nan,False,True],value=[0,0,1])

sns.histplot(df["cave"])


In [None]:
print(df["chauffage_energie"].value_counts())
df["chauffage_energie"] = df["chauffage_energie"].fillna("unknown")


df['chauffage_energie'] = df['chauffage_energie'].str.lower()
df['chauffage_energie'] = df['chauffage_energie'].replace(to_replace=[", "," ,",","],value=[",",",",","],regex=True)
df["chauffage_energie"] = df["chauffage_energie"].replace(to_replace=["électrique"],value=["elec"],regex=True)
print(df["chauffage_energie"].unique())

#  create sub-categories
energies = ["gaz","elec","bois","fioul"]
for energy in energies :
    target = "chauf-" + energy
    print(target)
    df[target] = df["chauffage_energie"].apply(lambda x : 1 if energy in x else 0)
    print (df[target].value_counts())

for energy in energies :
#  simplify chauffage-energie to first value
    df["chauffage_energie"] = df["chauffage_energie"].apply(lambda x : energy if x.startswith(energy) else x)

# df['chauffage_energie'] = pd.Categorical(df['chauffage_energie'], energies)

if MediumPlotActive :
    plt.figure(figsize=(14,8))
    sns.histplot(df["chauffage_energie"])

plt.figure(figsize=(14,6))
plt.subplot(141)
sns.histplot(df["chauf-fioul"],discrete=True)
plt.subplot(142)
sns.histplot(df["chauf-bois"],discrete=True)
plt.subplot(143)
sns.histplot(df["chauf-gaz"],discrete=True)
plt.subplot(144)
sns.histplot(df["chauf-elec"],discrete=True)
plt.show()


In [None]:
#  chauffage system
print(df["chauffage_systeme"].value_counts())

df["chauffage_systeme"] = df["chauffage_systeme"].fillna("unknown")

df['chauffage_systeme'] = df['chauffage_systeme'].str.lower()
df['chauffage_systeme'] = df['chauffage_systeme'].replace(to_replace=[", "," ,",","],value=[",",",",","],regex=True)

changes = {"fluide caloporteur" :"pompe à chaleur","panneau rayonnant" : "convecteur" }
for  old_name, new_name in changes.item() :
    df["chauffage_systeme"] = df["chauffage_systeme"].apply(lambda x : new_name if old_name in x else x)

#  create sub-categories
energies = ["climatisation","pompe à chaleur","convecteur","radiateur","chaudière","sol","poêle-bois"]
outers = ["climatisation","pompe-chaleur","convecteur","radiateur","chaudiere","sol","poele-bois"]

for energy,outer in zip(energies,outers) :
    target = "chauf_sys_" + outer
    print(target)
    df[target] = df["chauffage_systeme"].apply(lambda x : 1 if energy in x else 0)
    print (df[target].value_counts())
# sort by decreasing importance


# df['chauffage_systeme'] = pd.Categorical(df['chauffage_systeme'], energies)
MediumPlotActive = True
if MediumPlotActive :
    energies = ["climatisation révérsible","pompe à chaleur","sol","convecteur","radiateur","chaudière","poêle à bois"]
    for name in energies : 
        df["chauffage_systeme"] = df["chauffage_systeme"].apply(lambda x : name if name in x else x)
    plt.figure(figsize=(8,6))
    sns.histplot(df["chauffage_systeme"])
    plt.xticks(rotation=70)
    plt.show()

plt.figure(figsize=(20,6))

plt.subplot(171)
sns.histplot(df["chauf_sys_climatisation"],discrete=True)

plt.subplot(172)
sns.histplot(df["chauf_sys_pompe-chaleur"],discrete=True)

plt.subplot(173)
sns.histplot(df["chauf_sys_convecteur"],discrete=True)

plt.subplot(174)
sns.histplot(df["chauf_sys_radiateur"],discrete=True)

plt.subplot(175)
sns.histplot(df["chauf_sys_chaudière"],discrete=True)
plt.subplot(176)
sns.histplot(df["chauf_sys_sol"],discrete=True)
plt.subplot(177)
sns.histplot(df["chauf_sys_poele-bois"],discrete=True)

In [None]:
#porcess categorial  chauffage mode, 50% isna
print(df["chauffage_mode"].value_counts())
print(df["chauffage_mode"].isna().sum())
df["chauffage_mode"] = df["chauffage_mode"].fillna("unknown")
df["chauffage_mode"] = df["chauffage_mode"].str.lower()

print(df["chauffage_mode"].value_counts())

outers = ["individuel","collectif","central"]
for energy,outer in zip(energies,outers) :
    target = "chauffage_mode_" + outer
    print(target)
    df[target] = df["chauffage_systeme"].apply(lambda x : int(1) if energy in x else int(0))
    print (df[target].value_counts())


# df["chauffage_mode"] = df["chauffage_mode"].apply(lambda x : "Individuel"  if "Individuel" in x else x)
# df["chauffage_mode"] = df["chauffage_mode"].apply(lambda x : "Collectif"  if "Central" in x else x)
print(df["chauffage_mode"].value_counts())

if MediumPlotActive :
    plt.figure(figsize=(8,6))
    sns.histplot(df["chauffage_mode"])
    plt.xticks(rotation=70)
    plt.show()

plt.figure(figsize=(20,6))

plt.subplot(131)
sns.countplot(df["chauffage_mode_individuel"],discrete=True)

plt.subplot(132)
sns.countplot(df["chauffage_mode_collectif"],discrete=True)

plt.subplot(133)
sns.countplot(df["chauffage_mode_central"],discrete=True)



In [None]:
# print (df.exposition.unique())
import re

def encode_exposition_advanced(df):
    """
    Encodage avancé de la colonne exposition pour modèles ML
    """
    # Nettoyage initial 
    df['exposition_clean'] = df['exposition'].astype(str).str.lower()
    df['exposition_clean'] = df['exposition_clean'].str.replace(r'[^\w\s/-]', '', regex=True)
    
    # 1. Variables binaires par direction (ordre important pour éviter les conflits)
    def detect_directions(expo_text):
        """Détection intelligente des directions avec gestion des conflits"""
        if pd.isna(expo_text) or expo_text == 'nan':
            return {'nord': 0, 'sud': 0, 'est': 0, 'ouest': 0}
        
        text = str(expo_text).lower()
        directions = {'nord': 0, 'sud': 0, 'est': 0, 'ouest': 0}

       #Traitement des expositions en mélangeant Nord-Est avec les deux directions nord ET est 
        if re.search(r'\bnord\b', text) :
            directions['nord'] = 1
        if re.search(r'\bsud\b', text) :
            directions['sud'] = 1
        if re.search(r'\best\b', text) :
            directions['est'] = 1
        if re.search(r'\bouest\b', text) :
            directions['ouest'] = 1

        # Gestion des abréviations isolées (plus prudente)
        words = re.sub(r'[-+/]', ' ', text).split()
        for word in words:
            if word == 'n' :
                directions['nord'] = 1
            elif word == 's' :
                directions['sud'] = 1
            elif word == 'e' :
                directions['est'] = 1
            elif word == 'o' :
                directions['ouest'] = 1
        
        return directions
    
    # Application de la détection
    direction_results = df['exposition_clean'].apply(detect_directions)
    for direction in ['nord', 'sud', 'est', 'ouest']:
        df[f'expo_has_{direction}'] = [result[direction] for result in direction_results]

    return 

encode_exposition_advanced(df)

plt.figure(figsize=(20,6))

plt.subplot(141)
sns.histplot(df["expo_has_nord"],discrete=True)

plt.subplot(142)
sns.histplot(df["expo_has_est"],discrete=True)

plt.subplot(143)
sns.histplot(df["expo_has_ouest"],discrete=True)

plt.subplot(144)
sns.histplot(df["expo_has_sud"],discrete=True)


print(f" after processing {df['exposition'].unique()}")


In [None]:
#  drop categorial columns thzt are no more relevant
columns = ["exposition","chauffage_mode","chauffage_systeme","chauffage_energie","dpeL","ges_class"]
df.drop(columns=columns,inplace=True)

In [None]:
print_categorial_isna(df)
print_numerical_isna(df)