Q1

In [None]:
# Spark initialisation (session + context)
from pyspark.sql import SparkSession
from urllib.request import urlretrieve

spark = (
    SparkSession.builder
    .appName("GPS_App_Analysis")
    .master("local[*]")
    .getOrCreate()
)
sc = spark.sparkContext

# Download the dataset
def download_file(filename: str) -> None:
    base_url = "https://assets-datascientest.s3.eu-west-1.amazonaws.com/"
    urlretrieve(base_url + filename, filename)

download_file("gps_app.csv")

# Load the CSV into a Spark DataFrame
raw_app = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .option("escape", "\"")
    .csv("gps_app.csv")
)

raw_app.show(5)
raw_app.printSchema()


Q2

In [None]:
# Renommer toutes les colonnes : espaces → underscores, majuscules → minuscules
raw_app = raw_app.toDF(*[col.replace(" ", "_").lower() for col in raw_app.columns])

# Vérification
raw_app.printSchema()


Q3.1

In [None]:
from pyspark.sql.functions import col, when, isnan, count, expr, percentile_approx

# Calcul de la médiane de rating
median_rating = raw_app.select(percentile_approx("rating", 0.5)).first()[0]

# Remplacement des NaN par la médiane
raw_app = raw_app.fillna({"rating": median_rating})

# Vérification
raw_app.select("rating").summary("count", "mean", "min", "max").show()


In [None]:
from pyspark.sql.functions import col, count, mean, min, max, percentile_approx

# 1. Compter les valeurs nulles / non nulles dans rating
print(" Statistiques de présence :")
raw_app.select(
    count("rating").alias("non_nulls"),
    count("*").alias("total"),
).withColumn("missing", col("total") - col("non_nulls")).show()

# 2. Statistiques descriptives classiques
print(" Statistiques descriptives de rating :")
raw_app.select("rating").summary("count", "mean", "min", "25%", "50%", "75%", "max").show()

# 3. Table de fréquence des ratings
print(" Distribution des valeurs de rating (fréquences) :")
raw_app.groupBy("rating").count().orderBy("rating").show(50)


Q3.2

In [None]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    strategy="median",
    inputCols=["rating"],
    outputCols=["rating"]
)
raw_app = imputer.fit(raw_app).transform(raw_app)



In [None]:
from pyspark.sql.functions import col, when

# 1. Afficher les valeurs distinctes de la colonne "type"
raw_app.select("type").distinct().show()

# 2. Compter les occurrences de chaque type pour identifier la modalité la plus fréquente
raw_app.groupBy("type").count().orderBy(col("count").desc()).show()

# 3. Vérifier quelles lignes sont manquantes
raw_app.filter(col("type").isNull() | (col("type") == "")).show()

# 4. Calculer la modalité la plus fréquente (le mode)
mode_type = raw_app.groupBy("type").count().orderBy(col("count").desc()).first()[0]

raw_app = raw_app.withColumn(
    "type",
    when(col("type").isNull() | (col("type") == ""), mode_type)
    .otherwise(col("type"))
)


Q3.3

In [None]:
from pyspark.sql.functions import col

# 1. Afficher les valeurs uniques de "type"
raw_app.select("type").distinct().show()

# 2. On remarque par exemple qu’il y a une valeur vide ("") ou un None en plus de "Free" et "Paid"
#    Ce même enregistrement a aussi content_rating = null.

# 3. Filtrer pour ne garder que les types valides ("Free" et "Paid")
raw_app = raw_app.filter(col("type").isin("Free", "Paid"))

# 4. Vérifier que le problème est réglé pour "type" ET pour "content_rating"
raw_app.select("type").distinct().show()
raw_app.select("content_rating").filter(col("content_rating").isNull()).show()


In [None]:
from pyspark.sql.functions import desc

# 1. Afficher les valeurs distinctes de content_rating
raw_app.select("content_rating").distinct().show()

# 2. Afficher la répartition (effectifs) de chaque modalité
raw_app.groupBy("content_rating") \
       .count() \
       .orderBy(desc("count")) \
       .show()


Q3.4

In [None]:
from pyspark.sql.functions import col, when, desc

# 1. Calculer la modalité (valeur la plus fréquente) de current_ver
mode_current = (
    raw_app.groupBy("current_ver")
           .count()
           .orderBy(desc("count"))
           .first()[0]
)

# 2. Calculer la modalité de android_ver
mode_android = (
    raw_app.groupBy("android_ver")
           .count()
           .orderBy(desc("count"))
           .first()[0]
)

# 3. Imputer les valeurs manquantes pour current_ver et android_ver
raw_app = (
    raw_app
    .withColumn(
        "current_ver",
        when(col("current_ver").isNull(), mode_current)
        .otherwise(col("current_ver"))
    )
    .withColumn(
        "android_ver",
        when(col("android_ver").isNull(), mode_android)
        .otherwise(col("android_ver"))
    )
)




Q3.5

In [None]:

from pyspark.sql.functions import col, sum as _sum, when, isnan


# Définitions des fonctions
def getMissingValues(df):
    exprs = [
        _sum(when(col(c).isNull() | isnan(col(c)), 1).otherwise(0)).alias(c)
        for c in df.columns
    ]
    missing_dict = df.select(*exprs).collect()[0].asDict()
    rows = [(k, v) for k, v in missing_dict.items()]
    return spark.createDataFrame(rows, ["column", "missingCount"])

def missingTable(missing_df):
    missing_df.filter(col("missingCount") > 0).show(truncate=False)

# Puis exécute :
missingTable(getMissingValues(raw_app))

Q4.1

In [None]:
download_file("creditcard.csv")

Q4.2

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator


raw_cc = spark.read.option("header", True).option("inferSchema", True).csv("creditcard.csv")

# 3. Nettoyage (suppression des lignes avec valeurs manquantes)
cc = raw_cc.dropna()

# 4. Split train/test
train, test = cc.randomSplit([0.8, 0.2], seed=42)

# 5. Assemblage des features
feature_cols = [c for c in cc.columns if c not in ("Amount", "Class")]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="assembled")
scaler = StandardScaler(inputCol="assembled", outputCol="features")

# 6. Modèle de régression
lr = LinearRegression(featuresCol="features", labelCol="Amount")

# 7. Pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])

# 8. Paramètres pour validation croisée
paramGrid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.1, 0.01])
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
    .build()
)

# 9. Evaluateur
evaluator = RegressionEvaluator(labelCol="Amount", predictionCol="prediction", metricName="rmse")

# 10. CrossValidator
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5,
    parallelism=2
)

# 11. Entraînement
cvModel = cv.fit(train)

# 12. Prédiction et évaluation
predictions = cvModel.transform(test)
rmse = evaluator.evaluate(predictions)
r2 = RegressionEvaluator(labelCol="Amount", predictionCol="prediction", metricName="r2").evaluate(predictions)

print(f"RMSE sur le test : {rmse:.4f}")
print(f"R2 sur le test  : {r2:.4f}")