
Nous allons réaliser une exploration des données, comprenant des analyses univariées et bivariées ainsi que des tests statistiques


###  I. Importing Data and Cleaning Outliers

In [0]:
train_data = spark.read.csv("/mnt/data/train.csv", header=True, inferSchema=True)
test_data = spark.read.csv("/mnt/data/test.csv", header=True, inferSchema=True)

# Combining train and test datasets
# We combine the two datasets to address the undersampling of label 'A'. The union operation merges the rows of both datasets.
data = train_data.union(test_data)

In [0]:
display(data.select("Hauteur_sous-plafond"))

In [0]:
from pyspark.sql import functions as F

# Removing outliers from the dataset
data = data.filter(F.col("Surface_habitable_logement") <= 100000)
data = data.filter(F.col("Conso_5_usages/m²_é_finale") <= 100000)
data = data.filter(F.col("Hauteur_sous-plafond") <= 10)

In [0]:
display(data)


## II. Data Processing

In [0]:
from pyspark.sql.functions import col
# Splitting the dataset into numerical and categorical variables

# Selecting numerical variables
# We iterate through the DataFrame's columns and select those with data types indicating numerical values.
numerical_data = data.select([col(c) for c, t in data.dtypes if t in ['int', 'bigint', 'float', 'double']])
#numerical_vars

# Selecting categorical (qualitative) variables
# Similarly, we select columns where the data type is 'string', indicating categorical variables.
categorical_data = data.select([col(c) for c, t in data.dtypes if t == 'string'])
#categorical_vars



### 1. Processing Quantitative Values

Removing variables/columns who have more than 10% NA

In [0]:
# Importing necessary functions from PySpark
from pyspark.sql.functions import col, mean, count, when

# Calculating the percentage of missing (null) values for each numerical column
missing_value_stats = (numerical_data.select(*[
    (count(when(col(column).isNull(), column)) / count("*")).alias(column) 
    for column in numerical_data.columns
]).toPandas().transpose() * 100).round(4)

# Displaying the percentage of missing values for each column
print(missing_value_stats.sort_values(by=0))

# Columns with more than 10% missing values are identified for removal.
columns_to_remove = missing_value_stats.index[missing_value_stats[0] > 10].tolist()

data = data.drop(*columns_to_remove)
numerical_data = numerical_data.drop(*columns_to_remove)


In [0]:
# These columns are considered irrelevant or redundant, such as duplicate postal code columns.
columns_to_discard = ["_c0", "Code_postal_(BAN)", "Conso_5_usages_é_finale", "Code_postal_(brut)", "Emission_GES_éclairage"]

# Dropping specified columns from the dataset
# Both the main dataset 'data' and the subset 'numerical_data' are updated by removing these columns.
data = data.drop(*columns_to_discard)
numerical_data = numerical_data.drop(*columns_to_discard)

In [0]:
from pyspark.sql.functions import avg, col, when

# Replacing missing values with the mean for each column in numerical_data
# We iterate over each column and calculate its mean, then replace null values with this mean.
for col_name in numerical_data.columns:
    mean_val = data.agg(avg(col(col_name)).alias('mean')).first()['mean']
    data = data.withColumn(col_name, when(col(col_name).isNull(), mean_val).otherwise(col(col_name)))


### 2. Processing Qualitative Values

Removing variables who have more than 10% NA

In [0]:
from pyspark.sql.functions import col, count, when

# Calculating the percentage of missing (null) values for each categorical column
missing_value_stats_cat = (categorical_data.select(*[
    (count(when(col(column).isNull(), column)) / count("*")).alias(column) 
    for column in categorical_data.columns
]).toPandas().transpose() * 100).round(4) 

# Displaying the percentage of missing values for each column
print(missing_value_stats_cat.sort_values(by=0))

# Identifying columns with more than 10% missing values to be removed
# Columns exceeding the threshold are marked for removal.
columns_to_remove_cat = missing_value_stats_cat.index[missing_value_stats_cat[0] > 10].tolist()

data = data.drop(*columns_to_remove_cat)
categorical_data = categorical_data.drop(*columns_to_remove_cat)

In [0]:
# These columns are deemed unnecessary, either because they contain unique values or are not informative.
columns_to_discard_cat = ["N°DPE", "Code_INSEE_(BAN)", "Qualité_isolation_plancher_bas", "Nom__commune_(Brut)"]

# The operation is applied to both 'data' and 'categorical_data' DataFrames.
data = data.drop(*columns_to_discard_cat)
categorical_data = categorical_data.drop(*columns_to_discard_cat)


In [0]:
from pyspark.sql.functions import col, when

# Replacing missing values with 0 in categorical columns
for col_name in categorical_data.columns:
    data = data.withColumn(col_name, when(col(col_name).isNull(), 0).otherwise(col(col_name)))




### 3. Converting Quantitative Variables into Categorical Variables


Surface habitable logement

In [0]:
from pyspark.sql.functions import when

# Defining the number of categories (e.g., 4 for quartiles)
num_categories = 4
# Calculating the quantiles for the variable
quantile_values = data.approxQuantile("Surface_habitable_logement", [0.25, 0.5, 0.75], 0.05)

# Assigning categories based on quantile thresholds
data = data.withColumn("Surface_habitable_logement_cat",
                       when(data["Surface_habitable_logement"] <= quantile_values[0], "Q1")
                       .when(data["Surface_habitable_logement"] <= quantile_values[1], "Q2")
                       .when(data["Surface_habitable_logement"] <= quantile_values[2], "Q3")
                       .otherwise("Q4"))

# Displaying the first few rows to check the result
data.select("Surface_habitable_logement", "Surface_habitable_logement_cat").show()



In [0]:
from pyspark.sql.functions import count

# Counting the occurrences of each category in the 'Surface_habitable_logement_cat' column
category_counts = data.groupBy("Surface_habitable_logement_cat").agg(count("*").alias("count"))

# This gives an overview of the distribution of data across the defined categories.
category_counts.show()


In [0]:
from pyspark.sql.functions import count

# Counting and sorting the occurrences of each unique value in 'Surface_habitable_logement'
sorted_value_counts = (data.groupBy("Surface_habitable_logement")
                       .agg(count("*").alias("count"))
                       .orderBy("Surface_habitable_logement", ascending=False))

# This will show the frequency of each unique value in the 'Surface_habitable_logement' column, sorted from the highest to lowest.
sorted_value_counts.show()


Consommation 5 usages / m²

In [0]:
import matplotlib.pyplot as plt

histogram_data = data.select("Conso_5_usages/m²_é_finale").rdd.flatMap(lambda x: x).histogram(20)
bins = histogram_data[0]
freqs = histogram_data[1]
plt.hist(bins[:-1], bins=bins, weights=freqs)
plt.show()

# on voit bien qu'il y a encore des varibales abérantes

In [0]:
from pyspark.sql.functions import when, col

# Defining the number of categories, such as 4 for quartiles
num_quartiles = 4

# Calculating the quartiles for the specific column
# The 'approxQuantile' function is used to find the quartile thresholds with a relative error of 5%.
quartile_thresholds = data.approxQuantile("Conso_5_usages/m²_é_finale", [0.25, 0.5, 0.75], 0.05)

# Categorizing the data based on the quartile thresholds
data = data.withColumn("Conso_5_usages_cat",
                       when(col("Conso_5_usages/m²_é_finale") <= quartile_thresholds[0], "Q1")
                       .when(col("Conso_5_usages/m²_é_finale") <= quartile_thresholds[1], "Q2")
                       .when(col("Conso_5_usages/m²_é_finale") <= quartile_thresholds[2], "Q3")
                       .otherwise("Q4"))

# This step helps in checking the categorization based on the computed quartiles.
data.select("Conso_5_usages/m²_é_finale", "Conso_5_usages_cat").show()


In [0]:
from pyspark.sql.functions import count

# Counting the occurrences of each category in 'Conso_5_usages_cat' column
category_counts = data.groupBy("Conso_5_usages_cat").agg(count("*").alias("category_count"))

# This provides an overview of how many entries fall into each category.
category_counts.show()



Hauteur sous-plafond

In [0]:
import matplotlib.pyplot as plt

histogram_data = data.select("Hauteur_sous-plafond").rdd.flatMap(lambda x: x).histogram(20)
bins = histogram_data[0]
freqs = histogram_data[1]
plt.hist(bins[:-1], bins=bins, weights=freqs)
plt.show()


In [0]:
from pyspark.sql.functions import when

# Manually specifying the bin ranges
bins = [0, 2.2, 2.4, 2.5, 3, 6, 10]  
labels = ['0-2.2', '2.2-2.4', '2.4-2.5', '2.5-3', '3-6', '6-10']  

# Creating a new column 'Hauteur_category' with categorized values
# The 'when' function is used to assign each value to a category based on the specified bins.
data = data.withColumn('Hauteur_category', 
                       when((data['Hauteur_sous-plafond'] > bins[0]) & (data['Hauteur_sous-plafond'] <= bins[1]), labels[0])
                       .when((data['Hauteur_sous-plafond'] > bins[1]) & (data['Hauteur_sous-plafond'] <= bins[2]), labels[1])
                       .when((data['Hauteur_sous-plafond'] > bins[2]) & (data['Hauteur_sous-plafond'] <= bins[3]), labels[2])
                       .when((data['Hauteur_sous-plafond'] > bins[3]) & (data['Hauteur_sous-plafond'] <= bins[4]), labels[3])
                       .when((data['Hauteur_sous-plafond'] > bins[4]) & (data['Hauteur_sous-plafond'] <= bins[5]), labels[4])
                       .when(data['Hauteur_sous-plafond'] > bins[5], labels[5])
                       .otherwise("Other"))

# This step is useful for checking if the categorization has been applied correctly.
data.select('Hauteur_sous-plafond', 'Hauteur_category').show()


In [0]:
from pyspark.sql.functions import count

# Counting the occurrences of each category in 'Hauteur_categorie' column
category_value_counts = data.groupBy("Hauteur_category").agg(count("*").alias("count"))

# This shows how many records are in each category of 'Hauteur_categorie'.
category_value_counts.show()




## IV. Univariate plot

In [0]:
categorical_data.display()

In [0]:
from pyspark.sql.functions import col, lit

top_n = 10

for var in categorical_data.columns:
    top_modalities = data.groupBy(var).count().orderBy('count', ascending=False).limit(top_n).select(var).rdd.flatMap(lambda x: x).collect()
    data = data.withColumn(var + "_top_n", when(col(var).isin(top_modalities), col(var)).otherwise(lit('Autres')))


In [0]:
import matplotlib.pyplot as plt
import seaborn as sns

# Configure the number of rows and columns based on the number of categorical variables
n_vars = len(categorical_data.columns)
n_cols = 2
n_rows = (n_vars + n_cols - 1) // n_cols  # Ceiling division to get enough rows

# Adjust the figure size dynamically based on the number of rows
plt.figure(figsize=(20, 5 * n_rows))

for i, var in enumerate(categorical_data.columns):
    ax = plt.subplot(n_rows, n_cols, i + 1)
    data_pd = data.groupBy(var + "_top_n").count().orderBy('count', ascending=False).toPandas()
    
    sns.barplot(x='count', y=var + "_top_n", data=data_pd, palette='viridis')
    plt.title(f'Distribution of {var}', fontsize=14)
    
    ax.set_yticklabels([label.get_text()[:30] + '...' if len(label.get_text()) > 30 else label.get_text() for label in ax.get_yticklabels()], fontsize=10)
    
    # Formatting the count labels to include thousand separator and avoid overlap by adjusting text position
    for p in ax.patches:
        width = p.get_width()
        plt.text(width, p.get_y() + p.get_height() / 2, f'{int(width):,}', ha='left', va='center', fontsize=10)

# Adjust the layout
plt.subplots_adjust(hspace=0.5, wspace=0.3)
plt.tight_layout()
plt.show()



In [0]:
n_vars = len(numerical_data.columns)
n_cols = 2
n_rows = (n_vars + n_cols - 1) // n_cols  

fig, axes = plt.subplots(n_rows, n_cols, figsize=(20, 4 * n_vars))


axes_flat = axes.flatten()

for i, var in enumerate(numerical_data.columns):
    bins, counts = data.select(var).rdd.flatMap(lambda x: [x[0]] if x[0] is not None else []).histogram(20)
    ax = axes_flat[i]
    ax.hist(bins[:-1], bins=bins, weights=counts, edgecolor='black', alpha=0.7, log=True)
    for count, bin in zip(counts, bins[:-1]):
        if count > 0:
            ax.text(bin + (bins[1] - bins[0])/2, count, f'{int(count)}',
                    va='bottom', ha='center', fontsize=8)

    ax.set_title(f'Distribution of {var}')
    ax.set_xlabel(var)
    ax.set_ylabel('Frequency')
    ax.grid(True)

plt.tight_layout()
plt.show()


## V. Bivariate Plots

In [0]:
from pyspark.sql.functions import col, when, lit, expr
from pyspark.sql import functions as F
import matplotlib.pyplot as plt

def plot_stacked_bar(data, variable_analyse, variable_cible, top_n=10):
    top_modalities = data.groupBy(variable_analyse).count().orderBy('count', ascending=False).limit(top_n).select(variable_analyse).rdd.flatMap(lambda x: x).collect()
    data_top_n = data.withColumn(variable_analyse, when(col(variable_analyse).isin(top_modalities), col(variable_analyse)).otherwise(lit('Autres')))
    pivot_data = data_top_n.groupBy(variable_analyse).pivot(variable_cible).count()
    pivot_data = pivot_data.fillna(0)

    columns_to_sum = [col(c) for c in pivot_data.columns if c != variable_analyse]

    pivot_data = pivot_data.select("*", expr("A + B + C + D + E + F + G as total"))

    for c in pivot_data.columns:
        if c != variable_analyse and c != 'total':
            pivot_data = pivot_data.withColumn(c, (col(c) / col('total')) * 100)       
    pivot_percent_pd = pivot_data.toPandas().set_index(variable_analyse)
    ax = pivot_percent_pd.drop('total', axis=1).plot(kind='bar', stacked=True, figsize=(10, 6))
    
    plt.title(f'Distribution de {variable_analyse} par {variable_cible} (en %)')
    plt.xlabel(f'{variable_analyse}')
    plt.ylabel('Pourcentage')
    plt.xticks(rotation=0)
    plt.legend(title=f'{variable_cible}', bbox_to_anchor=(1.05, 1), loc='upper left', fontsize='small')

    for p in ax.patches:
        width = p.get_width()
        height = p.get_height()
        x, y = p.get_xy() 
        if height > 3:
            ax.text(x + width/2, y + height/2, '{:1.1f}%'.format(height), ha='center', va='center', fontsize=8)

    plt.show()


 Etiquette GES

In [0]:
plot_stacked_bar(data, "Etiquette_GES", "Etiquette_DPE", top_n=10)


 Classe Altitude

In [0]:
plot_stacked_bar(data, "Classe_altitude", "Etiquette_DPE", top_n=10)


 Département

In [0]:
plot_stacked_bar(data, "N°_département_(BAN)", "Etiquette_DPE", top_n=10)

 Qualité isolation enveloppe

In [0]:
plot_stacked_bar(data, "Qualité_isolation_enveloppe", "Etiquette_DPE", top_n=10)

 Qualité isolation murs

In [0]:
plot_stacked_bar(data, "Qualité_isolation_murs", "Etiquette_DPE", top_n=10)

 Qualité isolation menuiseries

In [0]:
plot_stacked_bar(data, "Qualité_isolation_menuiseries", "Etiquette_DPE", top_n=10)

 Type bâtiment

In [0]:
plot_stacked_bar(data, "Type_bâtiment", "Etiquette_DPE", top_n=10)

 Surface logement

In [0]:
plot_stacked_bar(data, "Surface_habitable_logement_cat", "Etiquette_DPE", top_n=10)

 Conso 5 usages

In [0]:
plot_stacked_bar(data, "Conso_5_usages_cat", "Etiquette_DPE", top_n=10)

## 6. Tests statistiques

In [0]:
from pyspark.sql import functions as F
from scipy.stats import chi2_contingency
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

variable_cible = "Etiquette_DPE"

def compute_chi2_contingency(data, var, variable_cible):
    # Créer un tableau de contingence
    contingency_table = data.crosstab(var, variable_cible)
    
    # Convertir en format approprié pour chi2_contingency
    contingency_table_pd = contingency_table.toPandas()
    contingency_table_pd = contingency_table_pd.set_index(contingency_table_pd.columns[0])
    contingency_table_values = contingency_table_pd.values

    # Calculer le test du chi-deux
    chi2, p, dof, expected = chi2_contingency(contingency_table_values)
    return chi2, p, dof, expected

def cramers_v(chi2, n, k1, k2):
    """Calcul du V de Cramer."""
    return np.sqrt(chi2 / (n * min(k1 - 1, k2 - 1)))

# Initialisation des listes pour stocker les résultats
variables = []
cramers_v_values = []

# Calculer le V de Cramer pour chaque variable catégorielle par rapport à la variable cible
for var in categorical_data.columns:
    chi2, p, dof, expected = compute_chi2_contingency(data, var, variable_cible)
    n = np.sum(expected)  # Taille totale de l'échantillon
    k1, k2 = expected.shape
    cramers_v_value = cramers_v(chi2, n, k1, k2)
    print(f"{var}, p-value = {p}")
    variables.append(var)
    cramers_v_values.append(cramers_v_value)

# Créer un DataFrame pour les résultats
cramers_v_df = pd.DataFrame({'Variable': variables, 'Cramers_V': cramers_v_values})

# Trier les données pour une meilleure visualisation
cramers_v_df = cramers_v_df.sort_values('Cramers_V', ascending=True)

# Créer un graphique à barres horizontales
plt.figure(figsize=(10, len(variables) * 0.5))  # Ajuster la taille en fonction du nombre de variables
plt.barh(cramers_v_df['Variable'], cramers_v_df['Cramers_V'], color='skyblue')
plt.xlabel('Valeur du V de Cramer')
plt.title('Force de l\'association (V de Cramer) par rapport à ' + variable_cible)

# Ajouter des étiquettes pour chaque barre
for index, value in enumerate(cramers_v_df['Cramers_V']):
    plt.text(value, index, f'{value:.2f}')

plt.show()
