# Load communes

In [None]:
import json
import pandas as pd
import unicodedata

def load_data():
    data = []
    with open('data/communes.json') as f:
        for line in f:
            data.append(json.loads(line))
    return data

def format_url_ville_data(feature, kind="log"):

    # Récupération du nom et du code de la commune
    name, code = feature['name'], feature['id']
    
    # Suppression des accents
    name = ''.join((c for c in unicodedata.normalize('NFD', name) if unicodedata.category(c) != 'Mn'))

    # Remplacement des espaces et autres caractères non alphanumériques par des tirets
    name = name.replace(" ", "-")
    name = name.replace("'", "-")
    name = name.replace("œ", "oe")
    name = ''.join(e for e in name if e.isalnum() or e == '-')

    # Statistique type
    kinds = {
        "log": "logement",
        "pop": "nombre-d-habitants"
    }

    return f"https://ville-data.com/{kinds[kind]}/{name}-33-{code}"



communes = pd.DataFrame(load_data())
communes['url_log'] = communes.apply(lambda x: format_url_ville_data(x, "log"), axis=1)
communes['url_pop'] = communes.apply(lambda x: format_url_ville_data(x, "pop"), axis=1)
communes

# Scrap housing and population data from the web

In [None]:
from selenium import webdriver
from selenium.webdriver.common.by import By
import time

# Open the browser
driver = webdriver.Chrome()
driver.get("https://ville-data.com/")

time.sleep(2)

# Consent ville-data.com cookies
query = '//button[@class="fc-button fc-cta-consent fc-primary-button"]'
buttons = driver.find_element(By.XPATH, query)
buttons.click()

### Scrap population

In [None]:
from selenium.webdriver.common.by import By
import re 
import time

def extract_pop():
    # Find number of housing
    try:
        query = '//div[contains(@id, "Population d")]'
        text = driver.find_element(By.XPATH, query).find_element(By.TAG_NAME, 'p').text
        regex = r'Il y a (\d+(?:\s*\d+)*) habitants'
        match = re.search(regex, text)
        population = int(match.group(1).replace(' ', ''))
    except:
        population = None

    return {
        'population': population
    }

def update_pop(row):
    driver.get(row['url_pop'])
    time.sleep(1)
    values = extract_pop()
    for key, value in values.items():
        row[key] = value
    return row


# Update values with selenium
communes = communes.apply(update_pop, axis=1)
communes.head()

### Scrap housing

In [None]:
from selenium.webdriver.common.by import By
import re 
import time

def extract_log():
    # Find number of housing
    try:
        query = '//div[contains(@id, "Nombre de logements à")]'
        text = driver.find_element(By.XPATH, query).find_element(By.TAG_NAME, 'p').text
        regex = r'(\d+(?:\s*\d+)*)\s*logements'
        match = re.search(regex, text)
        number_of_housing = int(match.group(1).replace(' ', ''))
    except:
        number_of_housing = None

    # Find number of house
    try:
        query = '//div[contains(@id, "Nombre de maisons à")]'
        text = driver.find_element(By.XPATH, query).find_element(By.TAG_NAME, 'p').text
        regex = r'(\d+(?:\s*\d+)*)\s*maisons'
        match = re.search(regex, text)
        number_of_house = int(match.group(1).replace(' ', ''))
    except:
        number_of_house = None

    # Find number of apartment
    try:
        query = '//div[contains(@id, "Nombre d\'appartements à")]'
        text = driver.find_element(By.XPATH, query).find_element(By.TAG_NAME, 'p').text
        regex = r'(\d+(?:\s*\d+)*)\s*appartements'
        match = re.search(regex, text)
        number_of_apartment = int(match.group(1).replace(' ', ''))
    except:
        number_of_apartment = None

    # Logement quality
    try:
        query = '//div[contains(@id, "Qualité des logements à")]'
        text = driver.find_element(By.XPATH, query).find_element(By.TAG_NAME, 'p').text

        match_total = re.search(r'(\d+(?:\s*\d+)*)\s*logements.*?résidence principale', text)
        match_t1 = re.search(r'(\d+(?:\s*\d+)*)\s*logements de 1 pièce', text)
        match_t2 = re.search(r'(\d+(?:\s*\d+)*)\s*logements de 2 pièces', text)
        match_t3 = re.search(r'(\d+(?:\s*\d+)*)\s*résidences principales de 3 pièces', text)
        match_t4 = re.search(r'(\d+(?:\s*\d+)*)\s*logements de 4 pièces', text)
        match_t5_plus = re.search(r'(\d+(?:\s*\d+)*)\s*logements de 5 pièces ou plus', text)

        total_logements = int(match_total.group(1).replace(" ", "")) if match_total else None
        t1_logements = int(match_t1.group(1).replace(" ", "")) if match_t1 else None
        t2_logements = int(match_t2.group(1).replace(" ", "")) if match_t2 else None
        t3_logements = int(match_t3.group(1).replace(" ", "")) if match_t3 else None
        t4_logements = int(match_t4.group(1).replace(" ", "")) if match_t4 else None
        t5_plus_logements = int(match_t5_plus.group(1).replace(" ", "")) if match_t5_plus else None
    except:
        total_logements = None
        t1_logements = None
        t2_logements = None
        t3_logements = None
        t4_logements = None
        t5_plus_logements = None

    return {
        'housing': number_of_housing,
        'house': number_of_house,
        'apartment': number_of_apartment,
        'principal': total_logements,
        't1': t1_logements,
        't2': t2_logements,
        't3': t3_logements,
        't4': t4_logements,
        't5+': t5_plus_logements
    }

def update_log(row):
    driver.get(row['url_log'])
    time.sleep(1)
    values = extract_log()
    for key, value in values.items():
        row[key] = value
    return row


# Update values with selenium
communes = communes.apply(update_log, axis=1)
communes.head()

### Clean data

In [None]:
# Close the driver, not needed anymore
driver.close()

# Fill no apartment and house with 0
communes.apartment.fillna(0, inplace=True)
communes.house.fillna(0, inplace=True)

# Fill no t1, t2, t3, t4, t5+ with 0
columns = ['t1', 't2', 't3', 't4', 't5+']
communes[columns] = communes[columns].fillna(0)

# Fill no housing with sum of apartment and house
index = communes[communes.housing.isna()].index
communes.loc[index, 'housing'] = communes.loc[index, 'apartment'] + communes.loc[index, 'house']

# If no house, and no apartment, fill with estimate mean
index = communes[(communes.house == 0) & (communes.apartment == 0)].index
percent_house = communes.house.sum() / communes.housing.sum()
communes.loc[index, 'house'] = communes.loc[index, 'housing'] * percent_house
communes.loc[index, 'apartment'] = communes.loc[index, 'housing'] * (1 - percent_house)

# Convert columns to int
columns = ['id', 'population', 'housing', 'house', 'apartment', 'principal', 't1', 't2', 't3', 't4', 't5+']
communes[columns] = communes[columns].astype(int)

# house & apartment in the website are count only for principal residence
# Adjust number of house & apartment to consider even non principal residence (estimate)
columns = ['t1', 't2', 't3', 't4', 't5+']
for col in columns:
    communes[col] = (communes[col] * communes['housing'] / communes['principal']).round().astype(int)

# Drop url column
communes.drop(columns=['url_log'], inplace=True)
communes.drop(columns=['url_pop'], inplace=True)

communes.head()

In [None]:
communes.to_json('data/communes-housing.json', orient='records', lines=True)

# Load DVF

In [None]:
import pandas as pd

columns = [
    'Nature mutation', 
    'Valeur fonciere',
    'Code postal',
    'Commune', 
    'Code departement', 
    'Code commune',
    'Section', 
    'No plan', 
    'Type local',
    'Surface reelle bati', 
    'Nombre pieces principales'
]

# Load dvfs
dvf = pd.concat([
    pd.read_csv('data/dvf2022.txt', sep='|', low_memory=False),
    pd.read_csv('data/dvf2021.txt', sep='|', low_memory=False),
    pd.read_csv('data/dvf2020.txt', sep='|', low_memory=False),
    pd.read_csv('data/dvf2019.txt', sep='|', low_memory=False),
    pd.read_csv('data/dvf2018.txt', sep='|', low_memory=False),
])


# Open DF and clean it
dvf = dvf[dvf['Type local'].isin(['Appartement', 'Maison'])]
dvf = dvf[dvf['Surface reelle bati'].isna() == False]
dvf = dvf.reset_index(drop=True)
dvf = dvf[columns]

# Keep only selected communes in departement
CODE_DEP = 33
code_communes = communes['id'].apply(lambda x: x - CODE_DEP*1000)
dvf = dvf[(dvf['Code departement'] == str(CODE_DEP).zfill(2)) & (dvf['Code commune'].isin(code_communes))]
dvf = dvf.reset_index(drop=True)

# Convert to int
columns = ['Code commune', 'Nombre pieces principales', 'Surface reelle bati']
dvf[columns] = dvf[columns].astype(int)

# Set nombre pieces principales to 5 for all properties with more than 5 rooms
index = dvf[dvf['Nombre pieces principales'] >= 5].index
dvf.loc[index, 'Nombre pieces principales'] = 5

# Remove Nombre de pieces principales = 0
dvf = dvf[dvf['Nombre pieces principales'] > 0]

dvf

# Compute the average size of houses and apartments according to their number of rooms

In [None]:
MINIMUM_DATA = 5

# Get the average surface for each type of property
def get_mean_surface_by_type(df):
    # Mean for each combination of 'Nombre pieces principales' and 'Type local'
    general_mean = dvf.groupby(['Nombre pieces principales', 'Type local'])['Surface reelle bati'].mean()

    # Mean for each combination of 'Code commune', 'Nombre pieces principales' and 'Type local'
    mean_by_commune = dvf.groupby(['Code commune', 'Nombre pieces principales', 'Type local'])['Surface reelle bati'].mean()

    # Number of data for each combination of 'Code commune', 'Nombre pieces principales' and 'Type local'
    count_by_commune = dvf.groupby(['Code commune', 'Nombre pieces principales', 'Type local']).size()

    # For combinations where the number of data is less than MINIMUM_DATA, replace with the general mean
    for index, count in count_by_commune.items():
        if count < MINIMUM_DATA:
            commune, n_pieces, local_type = index
            mean_by_commune[commune, n_pieces, local_type] = general_mean[n_pieces, local_type]

    return mean_by_commune

# Get the average surface for each type of property
dfs = get_mean_surface_by_type(dvf)
dfs = dfs.unstack(level=[1, 2])
dfs.columns = [f"{'H' if col[1][0] == 'M' else col[1][0]}T{col[0]}" for col in dfs.columns]
dfs = dfs.reset_index()

# Fill nan with mean values of the column
columns = ['AT1', 'HT1', 'AT2', 'HT2', 'AT3', 'HT3', 'AT4', 'HT4', 'AT5', 'HT5']
for column in columns:
    dfs[column].fillna(dfs[column].mean(), inplace=True)
dfs.head()

# Add the department code to the city code
dfs['Code commune'] = dfs['Code commune'] + CODE_DEP * 1000
dfs.head()

In [None]:
# Merge it to communes to get final df
communes = pd.read_json('data/communes-housing.json', orient='records', lines=True)
df = pd.merge(left=communes, right=dfs, left_on='id', right_on='Code commune')
df = df.drop('Code commune', axis=1)
df.head()

In [None]:
df.to_json('data/communes-surface.json', orient='records', lines=True)

# Add the price per square meter for each city

In [None]:
import pandas as pd

# Load data
dfc = pd.read_json('data/communes-surface.json', orient='records', lines=True)
dfp = pd.read_json('data/price_square_meter.json', lines=True, orient='records')

# Merge data
df = pd.concat([dfc, dfp.drop('name', axis=1)], axis=1)
df.to_json('data/communes-ready.json', orient='records', lines=True)
df.head()

# Compute the price of each city

In [None]:
import pandas as pd

# Load data
df = pd.read_json('data/communes-ready.json', orient='records', lines=True)

# Count price of one TX for one city
def count_price_tx(row, x):
    x = str(x) 
    tx = row['t' + x if int(x) < 5 else 't5+']
    a = row['apartment']
    h = row['house']
    p = row['principal']
    ATX = row['AT' + x]
    pa = row['price_apart']
    HTX = row['HT' + x]
    ph = row['price_house']
    return tx * ((a/p) * ATX * pa + (h/p) * HTX * ph)

# Count price of all TX for one city
def count_price(row):
    sum = 0
    for i in range(1, 6):
        sum += count_price_tx(row, i)
    return sum

# Count price for all cities
df['city_price'] = df.apply(count_price, axis=1)
df.to_json('data/communes-price.json', orient='records', lines=True)
df

# Add GeoJSON cadastre to each city 

In [None]:
import geopandas as gpd

# Load GeoJSON file with geopandas
gdf = gpd.read_file('data/cadastre-33-communes.json')
gdf.drop(['nom', 'created', 'updated'], axis=1, inplace=True)
gdf['id'] = gdf['id'].astype('int')

# Load JSON file with pandas
df = pd.read_json('data/communes-price.json', orient='records', lines=True)

# Merge GeoJSON and JSON files
gdf = gdf.merge(df, on='id', how='right')
gdf.head()

In [None]:
import folium
import numpy as np

MIN_PRICE = np.log(min(gdf['city_price']))
MAX_PRICE = np.log(max(gdf['city_price']))

# Define the style of each location
def style_function(feature):

    # Définir la couleur de remplissage en fonction du log du prix 
    min_opacity = 0.05
    max_opacity = 0.5
    opacity = min_opacity + (max_opacity - min_opacity) * (np.log(feature['properties']['city_price']) - MIN_PRICE) / (MAX_PRICE - MIN_PRICE)


    return {
        'fillColor': '#ff0000',   # couleur de remplissage
        'color': f'rgba(255, 0, 0, {opacity})',      # couleur de la ligne
        'weight': 1,             # épaisseur de la ligne
        'fillOpacity': opacity,       # opacité du remplissage
        'clickable': False,       # si True, la zone réagit au clic

    }

# Rewrite price to readable format
def format_price(price):
    d = {
        1000000000: 'B€',
        1000000: 'M€',
        1000: 'K€',
    }
    for k in d:
        if price > k:
            return f'{round(price / k, 1)}{d[k]}'
    return price


# Define a custom function to create the tooltip (hover popup)
tooltip = folium.GeoJsonTooltip(
    fields=['name'], 
    sticky=False
)

# Créer une carte centrée sur les coordonnées moyennes du GeoDataFrame
m = folium.Map(
    location=[gdf.geometry.unary_union.centroid.y, gdf.geometry.unary_union.centroid.x],
    zoom_start=10,
    tiles='https://cartodb-basemaps-{s}.global.ssl.fastly.net/light_nolabels/{z}/{x}/{y}.png',
    attr='&copy; <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors &copy; <a href="https://carto.com/attributions">CARTO</a>'
)

# Add the data to the map with folium
folium.GeoJson(
    gdf, 
    style_function=style_function,
    tooltip=tooltip
).add_to(m)

# 2. Add names centered on each location
for _, row in gdf.iterrows():
    location = [row['geometry'].centroid.y, row['geometry'].centroid.x]
    folium.Marker(location, icon=folium.DivIcon(
        html=f"""
            <div style="font-family: 'Arial', sans-serif; transform: translate(-50%, -50%);">
                {row['name']}
                {format_price(row['city_price'])}
            </div>
        """
    )).add_to(m)

# Afficher la carte
m

In [None]:
MAX_PRICE

In [None]:
gdf[gdf['apartment'] + gdf['house'] != gdf['housing']]