In [1]:
"""
Q.1. En utilisant la fonction urlretrieve du module urllib.request, 
écrire une fonction download_file permettant de télécharger un fichier 
filename depuis l'adresse globale précédente. Appliquer cette fonction 
aux fichiers que nous voulons télécharger.
"""


from urllib.request import urlretrieve

# URL de base pour télécharger les fichiers
base_url = "https://assets-datascientest.s3.eu-west-1.amazonaws.com/"

# Fonction pour télécharger un fichier donné
def download_file(filename):
    url = base_url + filename
    urlretrieve(url, filename)
    print(f"Téléchargement effectué : {filename}")

# Application de la fonction aux deux fichiers nécessaires
download_file("gps_app.csv")
download_file("gps_user.csv")


Téléchargement effectué : gps_app.csv
Téléchargement effectué : gps_user.csv


In [2]:
from pyspark.sql import SparkSession

# Création de la SparkSession
spark = SparkSession.builder.appName("ETL_GPS").getOrCreate()

# Lecture du fichier gps_app.csv
raw_app = spark.read.option("header", True)\
                    .option("inferSchema", True)\
                    .option("escape", "\"")\
                    .csv("gps_app.csv")

# Lecture du fichier gps_user.csv
raw_user = spark.read.option("header", True)\
                     .option("inferSchema", True)\
                     .option("escape", "\"")\
                     .csv("gps_user.csv")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

In [3]:
# Q.2. Dans un premier prétraitement, renommer toutes les colonnes en remplaçant les espaces par des soulignements et les majuscules par des minuscules.

# Pour raw_app
raw_app = raw_app.toDF(*[col.lower().replace(" ", "_") for col in raw_app.columns])

# Pour raw_user
raw_user = raw_user.toDF(*[col.lower().replace(" ", "_") for col in raw_user.columns])


In [4]:
# Q.3.1 Remplacer les valeurs manquantes dans la colonne rating par la moyenne ou la médiane. Justifier le choix.
"""
Pourquoi la moyenne et non la médiane ?
La moyenne est simple à calculer en Spark avec avg().
La distribution n'a pas été indiquée comme fortement biaisée (pas mentionné de gros outliers dans le cours sur la colonne rating), 
donc la moyenne est acceptable.

"""

"\nPourquoi la moyenne et non la médiane ?\nLa moyenne est simple à calculer en Spark avec avg().\nLa distribution n'a pas été indiquée comme fortement biaisée (pas mentionné de gros outliers dans le cours sur la colonne rating), \ndonc la moyenne est acceptable.\n\n"

In [8]:
from pyspark.sql.functions import avg, col, isnan
from pyspark.sql.types import DoubleType

# 0. Recast la colonne rating pour être sûr qu'elle est numérique
raw_app = raw_app.withColumn("rating", col("rating").cast(DoubleType()))

# 1. Calculer la moyenne de la colonne "rating" en excluant les valeurs nulles
rating_average_df = raw_app.filter(col("rating").isNotNull()).select(avg("rating"))
rating_average = rating_average_df.first()[0]

# 2. Remplacer toutes les valeurs nulles ou nan de la colonne "rating" par cette moyenne
from pyspark.sql.functions import isnan, when

raw_app = raw_app.withColumn(
    "rating",
    when(isnan(col("rating")) | col("rating").isNull(), rating_average).otherwise(col("rating"))
)

# 3. Vérifier qu'il n'y a plus de valeurs manquantes dans "rating"
def getMissingValues(df):
    return df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c + "_null") for c in df.columns])

missing_values = getMissingValues(raw_app)
missing_values.show()


NameError: name 'count' is not defined

In [7]:
from pyspark.sql.functions import col, isnan, isnull, sum as _sum

# Fonction pour récupérer le nombre de NaN et NULL par colonne
def getMissingValues(df):
    return df.select([
        _sum(isnan(c).cast("int")).alias(c) for c in df.columns
    ] + [
        _sum(isnull(c).cast("int")).alias(c + "_null") for c in df.columns
    ])

# Fonction pour afficher proprement
def missingTable(missing_df):
    missing_df.show(truncate=False)

In [None]:
from pyspark.sql.functions import avg, col, isnan
from pyspark.sql.types import DoubleType

# 1. Recast la colonne rating pour être sûr qu'elle est numérique
raw_app = raw_app.withColumn("rating", col("rating").cast(DoubleType()))

# 2. Recalculer la moyenne sur les ratings non nulls
rating_average = raw_app.filter((col("rating").isNotNull()) & (~isnan("rating"))).select(avg(col("rating"))).first()[0]

# 3. Remplacer les valeurs nulles de rating par la moyenne
raw_app = raw_app.fillna({"rating": rating_average})

# 4. Vérification
getMissingValues(raw_app).show()


In [None]:
from pyspark.sql.functions import avg

rating_average_df = raw_app.filter(raw_app.rating.isNotNull()).select(avg("rating"))
rating_average = rating_average_df.first()[0]

raw_app = raw_app.fillna({"rating": rating_average})


In [None]:
# to delete
from pyspark.sql.functions import avg

rating_average_df = raw_app.filter(raw_app.rating.isNotNull()).select(avg("rating"))
rating_average = rating_average_df.first()[0]

raw_app = raw_app.fillna({"rating": rating_average})


In [None]:
# Q.3.2 Remplacer la valeur manquante de la colonne type par la valeur la plus logique. Justifier le choix.

"""
dans ce dataset, l'immense majorité des applications sont gratuites.
Donc, par logique statistique, on suppose qu'une application avec un type manquant est très probablement gratuite ("Free")
"""

In [None]:
from pyspark.sql.functions import col, isnan, when, count

# 1. Nettoyage : remplacer toutes les valeurs nulles ou NaN de la colonne type par "Free"
raw_app = raw_app.withColumn(
    "type",
    when(isnan(col("type")) | col("type").isNull(), "Free")\
    .otherwise(col("type"))
)

# 2. Fonctions de vérification
def getMissingValues(df):
    """Renvoie le nombre de valeurs nulles ou NaN pour chaque colonne."""
    return df.select([
        count(when(isnan(c) | col(c).isNull(), c)).alias(c + "_null")
        for c in df.columns
    ])

def missingTable(df_missing):
    """Affiche le tableau des valeurs manquantes."""
    df_missing.show()

# 3. Vérification : affichage des valeurs nulles restantes
missing_values_df = getMissingValues(raw_app)
missingTable(missing_values_df)

# 4. Vérification additionnelle : voir les valeurs distinctes dans la colonne "type"
raw_app.select("type").distinct().show()


In [None]:
# Q.3.3 Afficher les valeurs uniques prisent par la colonne type.
"""
Que remarquez-vous ? 
Supprimer le problème. Cela réglera aussi la valeur manquante de la colonne content_rating.
"""

In [None]:
raw_app.select("type").distinct().show()


In [None]:
# D’après ce qu’on vient de voir dans l'affichage (distinct()), 
# il existe bien "0" comme modalité de "type" (ce qui n'à aucun sens), mais on ne sait pas encore combien de lignes exactement.

In [None]:
raw_app.filter(col("type") == "0").show(truncate=False)
raw_app.filter(col("type") == "0").count()


# Nous venons de l'afficher et nous constatons qu'il s'agit d'une seule ligne
# Cette ligne a "0" en type et content "NULL" en content_rating.
"""
En fait, cette ligne est totalement déréglée :
"1.9" est dans category ➔ ce n'est pas normal.
"19.0" est dans rating ➔ totalement incohérent.
"3.0M" dans reviews ➔ incohérent.
"Free" dans installs ➔ totalement faux.
"0" dans type ➔ faux.
"""

In [None]:
# on va donc la supprimer :
raw_app = raw_app.filter(col("type") != "0")

In [None]:
raw_app.select("type").distinct().show()
# on constate que le problème est définitivement résolu

In [None]:
# Q.3.4 Remplacer le reste des valeurs manquantes pour la colonne current_ver et la colonne android_ver par leur modalité respective.


In [None]:
#  Trouver la modalité majoritaire pour chaque colonne
# Pour current_ver
mode_current_ver = raw_app.groupBy("current_ver").count().orderBy("count", ascending=False).first()["current_ver"]
# Pour android_ver
mode_android_ver = raw_app.groupBy("android_ver").count().orderBy("count", ascending=False).first()["android_ver"]

print(mode_current_ver)
print(mode_android_ver)

In [None]:
from pyspark.sql.functions import when, isnan, isnull

# Remplacer les NULL et NaN dans current_ver
raw_app = raw_app.withColumn(
    "current_ver",
    when(isnull("current_ver") | isnan("current_ver"), mode_current_ver).otherwise(col("current_ver"))
)
# Remplacer les NULL et NaN dans android_ver
raw_app = raw_app.withColumn(
    "android_ver",
    when(isnull("android_ver") | isnan("android_ver"), mode_android_ver).otherwise(col("android_ver"))
)

In [None]:
# Q.3.5 Vérifier qu'il ne reste plus de valeurs manquantes grâce à l'application de la commande

In [None]:
# 4. Vérification
from pyspark.sql.functions import col, isnan, isnull, sum as _sum

# Fonction pour récupérer le nombre de NaN et NULL par colonne
def getMissingValues(df):
    return df.select([
        _sum(isnan(c).cast("int")).alias(c) for c in df.columns
    ] + [
        _sum(isnull(c).cast("int")).alias(c + "_null") for c in df.columns
    ])

# Fonction pour afficher proprement
def missingTable(missing_df):
    missing_df.show(truncate=False)

getMissingValues(raw_app).show()

In [None]:
from pyspark.sql.functions import avg, col, isnan
from pyspark.sql.types import DoubleType

# 1. Recast la colonne rating pour être sûr qu'elle est numérique
raw_app = raw_app.withColumn("rating", col("rating").cast(DoubleType()))

# 2. Recalculer la moyenne sur les ratings non nulls
rating_average = raw_app.filter((col("rating").isNotNull()) & (~isnan("rating"))).select(avg(col("rating"))).first()[0]

# 3. Remplacer les valeurs nulles de rating par la moyenne
raw_app = raw_app.fillna({"rating": rating_average})

# 4. Vérification
getMissingValues(raw_app).show()


In [None]:
from pyspark.sql.functions import when, col, count, isnan

# Correction de la colonne "type" pour remplacer "0" par "Free"
raw_app = raw_app.withColumn(
    "type",
    when(col("type") == "0", "Free").otherwise(col("type"))
)

# Vérification que 'type' est maintenant propre
def getMissingValues(df):
    return df.select([
        count(when(isnan(c) | col(c).isNull(), c)).alias(c + "_null")
        for c in df.columns
    ])

# Afficher la table des valeurs manquantes
missing_values = getMissingValues(raw_app)
missing_values.show()
