### analyse de Big Data

In [0]:
from pyspark.sql.functions import (
    col, count, mean, stddev, sum as spark_sum, min as spark_min, max as spark_max,
    hour, dayofweek, dayofmonth, month, year, weekofyear,
    unix_timestamp, percentile_approx, when, lit
)
from pyspark.sql.types import *
from pyspark.sql.window import Window

# Imports Python scientifique
import pandas as pd
import numpy as np
from scipy import stats
from scipy.stats import t, norm

# Imports visualisation
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
# Configuration visualisation
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (12, 6)

print("‚úÖ Configuration et imports termin√©s")

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType, LongType

# CHEMINS DES DONN√âES
PATH_POPULATION = "/Volumes/workspace/trips/population/"

In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## üìÇ Chargement des Donn√©es ‚Äì Population (Big Data)

# COMMAND ----------

from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

# CHEMIN DES DONN√âES
PATH_POPULATION = "/Volumes/workspace/trips/population/"

# ============================================
# CHARGEMENT POPULATION - Approche fichier par fichier
# ============================================
print("üîÑ Chargement de la population compl√®te...")

# Lister tous les fichiers parquet
files = [f.path for f in dbutils.fs.ls(PATH_POPULATION) if f.path.endswith(".parquet")]
print(f"üìÇ {len(files)} fichiers trouv√©s")

dfs = []

for i, file_path in enumerate(files, 1):
    try:
        df_temp = spark.read.parquet(file_path)

        columns_to_convert = [
            "passenger_count", "trip_distance", "RatecodeID",
            "fare_amount", "extra", "mta_tax", "tip_amount",
            "tolls_amount", "improvement_surcharge", "total_amount",
            "congestion_surcharge", "airport_fee"
        ]

        for col_name in columns_to_convert:
            if col_name in df_temp.columns:
                df_temp = df_temp.withColumn(
                    col_name, col(col_name).cast(DoubleType())
                )

        dfs.append(df_temp)
        print(f"‚úÖ [{i}/{len(files)}] {file_path.split('/')[-1]}")

    except Exception as e:
        print(f"‚ö†Ô∏è Erreur sur {file_path.split('/')[-1]} : {str(e)}")

# Fusion des fichiers
from functools import reduce
from pyspark.sql import DataFrame

print("\nüîÑ Fusion de tous les fichiers parquet...")
df_population = reduce(
    lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True),
    dfs
)

nb_population = df_population.count()
print(f"‚úÖ Population totale charg√©e : {nb_population:,} courses")

# V√©rification du sch√©ma final
print("\n=== SCH√âMA POPULATION ===")
df_population.printSchema()


In [0]:
print("=== SCH√âMA POPULATION ===")
df_population.printSchema()




In [0]:
print("=== STATISTIQUES DESCRIPTIVES - POPULATION ===")
df_population.select(
    "fare_amount", "trip_distance", "tip_amount", 
    "tolls_amount", "total_amount", "passenger_count"
).summary("count", "mean", "stddev", "min", "25%", "50%", "75%", "max").show()

In [0]:
from pyspark.sql.functions import col, sum

def count_nulls_bigdata(df, dataset_name):
    print(f"\n=== VALEURS MANQUANTES - {dataset_name} ===")
    total_rows = df.count()
    null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
    null_counts.show(truncate=False)
    # Pourcentage
    null_pct = null_counts.select([(col(c)/total_rows*100).alias(c+"_pct") for c in null_counts.columns])
    null_pct.show(truncate=False)

count_nulls_bigdata(df_population, "POPULATION")


### EDA

In [0]:
df_population.columns

In [0]:
from pyspark.sql.functions import col, sum

# Pour chaque colonne, compter les valeurs nulles
df_population.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

In [0]:
df_population.count()

### description des donn√©es

In [0]:
df_population.select(
    "trip_distance",
    "fare_amount",
    "total_amount",
    "tip_amount",
    "passenger_count"
).describe().show()

In [0]:
# payment_type
df_population.groupBy("payment_type").count().orderBy("count", ascending=False).show()

In [0]:
# RatecodeID : Type de tarif appliqu√©
df_population.groupBy("RatecodeID").count().orderBy("count", ascending=False).show()

### les valeurs manquantes 

In [0]:
df_population = df_population.withColumn(
    "airport_fee",
    when(
        col("airport_fee").isNull() &
        (~col("PULocationID").isin([132, 138])),
        lit(0)
    ).otherwise(col("airport_fee"))
)

In [0]:
# √âtape 1: m√©diane par zone
median_passenger_by_zone = (
    df_population
    .groupBy("PULocationID")
    .agg({"passenger_count": "median"})
    .withColumnRenamed("median(passenger_count)", "median_passenger")
)

# √âtape 2: jointure + imputation
df_population = (
    df_population
    .join(median_passenger_by_zone, on="PULocationID", how="left")
    .withColumn(
        "passenger_count",
        when(
            col("passenger_count").isNull(),
            col("median_passenger")
        ).otherwise(col("passenger_count"))
    )
    .drop("median_passenger")
)

In [0]:
# √âtape 1: calcul du mode par zone
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

w = Window.partitionBy("PULocationID").orderBy(col("count").desc())

ratecode_mode = (
    df_population
    .groupBy("PULocationID", "RatecodeID")
    .count()
    .withColumn("rn", row_number().over(w))
    .filter(col("rn") == 1)
    .select("PULocationID", col("RatecodeID").alias("mode_ratecode"))
)

# √âtape 2: jointure + imputation
df_population = (
    df_population
    .join(ratecode_mode, on="PULocationID", how="left")
    .withColumn(
        "RatecodeID",
        when(
            col("RatecodeID").isNull(),
            col("mode_ratecode")
        ).otherwise(col("RatecodeID"))
    )
    .drop("mode_ratecode")
)
# s‚Äôil reste encore des null ‚Üí valeur par d√©faut = 1
df_population = df_population.fillna({"RatecodeID": 1})

In [0]:
df_population = df_population.fillna({"store_and_fwd_flag": "N"})

In [0]:
money_cols = ["congestion_surcharge", "airport_fee"]

df_population = df_population.fillna(
    {c: 0 for c in money_cols}
)

In [0]:
from pyspark.sql.functions import sum

df_population.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df_population.columns
]).show()

### statistiques 

In [0]:


# Calcul des quartiles pour fare_amount
quantiles_fare = df_population.approxQuantile("fare_amount", [0.01, 0.25, 0.50, 0.75, 0.99], 0.01)
Q1, Q3 = quantiles_fare[1], quantiles_fare[3]
IQR = Q3 - Q1

print("=== ANALYSE FARE_AMOUNT ===")
print(f"Q1 (25%): ${Q1:.2f}")
print(f"M√©diane (50%): ${quantiles_fare[2]:.2f}")
print(f"Q3 (75%): ${Q3:.2f}")
print(f"IQR: ${IQR:.2f}")
print(f"1er percentile: ${quantiles_fare[0]:.2f}")
print(f"99e percentile: ${quantiles_fare[4]:.2f}")

# Limites outliers (m√©thode IQR)
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
print(f"\nLimites outliers (IQR ¬±1.5):")
print(f"Limite inf√©rieure: ${lower_bound:.2f}")
print(f"Limite sup√©rieure: ${upper_bound:.2f}")

# Comptage outliers
nb_outliers = df_population.filter(
    (col("fare_amount") < lower_bound) | (col("fare_amount") > upper_bound)
).count()
pct_outliers = (nb_outliers / nb_population) * 100

print(f"\nüìä Outliers d√©tect√©s: {nb_outliers:,} ({pct_outliers:.2f}%)")

# COMMAND ----------

### prix moyenne dune course


In [0]:
mean_population = df_population.agg({"fare_amount": "mean"}).first()[0]
mean_population


### Distance moyenne (trip_distance)

In [0]:
mean_dist_pop = df_population.agg({"trip_distance": "mean"}).first()[0]
mean_dist_pop


### Dur√©e moyenne des courses

In [0]:
mean_trip_duration_min=df_population.agg({"trip_duration_min": "mean"}).first()[0]
mean_trip_duration_min

### Proportion des courses avec tip > 0

In [0]:
prop_pop = df_population.filter(col("tip_amount") > 0).count() / df_clean.count()
prop_pop

### Distribution par heure / jour / semaine

In [0]:
from pyspark.sql.functions import hour, dayofweek

df_population = df_population.withColumn("hour", hour("tpep_pickup_datetime"))
df_population = df_population.withColumn("day_of_week", dayofweek("tpep_pickup_datetime"))

nbr_trips_per_hour=df_clean.groupBy("hour").count()
nbr_trips_per_hour.display()

In [0]:
fares_pickup_borough=df_population.groupBy("PULocationID").agg(
    {"fare_amount": "mean"}
)
fares_pickup_borough.display()

In [0]:
df_population.select("fare_amount") \
        .filter(col("fare_amount") == 0) \
        .show()