**Import et préparation des données**

In [0]:
# Chargement et préparation des données
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NYC_Taxi_Full").getOrCreate()
pop_path = "/Workspace/dataTaxi/"

df_pop = spark.read.parquet(pop_path)

df_pop.printSchema()
df_pop.count()

In [0]:
files = dbutils.fs.ls(pop_path)

df_list = []
for f in files:
    df_list.append(spark.read.parquet(f.path))

df_pop = df_list[0]
for df in df_list[1:]:
    df_pop = df_pop.unionByName(df, allowMissingColumns=True)

df_pop.count()

**EDA**

In [0]:
from pyspark.sql.functions import count, when, col

null_summary = df_pop.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_pop.columns
])
null_counts = null_summary.first().asDict()
cols_with_nulls = [c for c, v in null_counts.items() if v > 0]

print("Colonnes avec des valeurs nulles :", cols_with_nulls)
null_summary.select(cols_with_nulls).show()

In [0]:
# Lister uniquement les colonnes dont le type est 'long'
long_columns = [field.name for field in df_pop.schema.fields if "LongType" in str(field.dataType)]
print("Colonnes de type Long à convertir :")
print(long_columns)

In [0]:
total_count = df_pop.count()
distinct_count = df_pop.distinct().count()
duplicate_count = total_count - distinct_count

print(f"Nombre total de lignes : {total_count:,}")
print(f"Nombre de lignes en double : {duplicate_count:,}")

if duplicate_count > 0:
    df_pop.groupBy(df_pop.columns).count().filter("count > 1").show(5)

**Nettoyer valeurs manquantes et doublons** 

In [0]:
from pyspark.sql.functions import col
for c in long_columns:
    df_pop = df_pop.withColumn(c, col(c).cast("double"))
df_pop = df_pop.dropDuplicates()
df_pop = df_pop.fillna({
    "passenger_count": 1.0,
    "RatecodeID": 1.0,
    "congestion_surcharge": 0.0,
    "airport_fee": 0.0,
    "tip_amount": 0.0,
    "tolls_amount": 0.0,
    "store_and_fwd_flag": "N"
})
df_final_clean = df_pop.filter(
    (col("fare_amount").between(2.5, 200)) & 
    (col("trip_distance").between(0.1, 50)) & 
    (col("passenger_count").between(1, 4))
)

print(f"Nettoyage terminé ! Nombre de lignes finales : {df_final_clean.count():,}")
df_final_clean.select("fare_amount", "trip_distance", "passenger_count").summary().show()

In [0]:
display(df_final_clean.limit(10)) 

**Indicateurs Principaux**

In [0]:
stats_summary = df_final_clean.select("fare_amount", "trip_distance", "total_amount", "tip_amount").summary()
stats_summary.show()

In [0]:
# Prix moyen
stats_pop = df_final_clean.select("fare_amount").summary("mean")
stats_pop.show()

In [0]:
# Distance moyenne
distance_pop = df_final_clean.select("trip_distance").summary("mean")
distance_pop.show()

In [0]:
# Durée moyenne
from pyspark.sql.functions import unix_timestamp, avg
df_duration = df_final_clean.withColumn("duration_min", 
    (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60)
avg_duration_result = df_duration.select(avg("duration_min")).alias("duree_moyenne")
avg_duration_result.show()

In [0]:
#Intervalle de Confiance
import scipy.stats as st
import math

mean_fare = 18.1457
stddev_fare = 16.5377
n = 113619024  
# l'erreur standard
standard_error = stddev_fare / math.sqrt(n)

# Intervalle de confiance à 95%
confidence_interval = st.norm.interval(confidence=0.95, loc=mean_fare, scale=standard_error)

print(f"L'intervalle de confiance à 95% du prix moyen est : {confidence_interval}")

In [0]:
#Proportion des tips > 0
total_trips = df_final_clean.count()
trips_with_tip = df_final_clean.filter("tip_amount > 0").count()
proportion_tip = (trips_with_tip / total_trips) * 100

print(f"Nombre total de trajets : {total_trips}")
print(f"Trajets avec pourboire : {trips_with_tip}")
print(f"Proportion réelle : {proportion_tip:.2f}%")

In [0]:
from pyspark.sql.functions import hour, dayofweek, weekofyear, count

# Heure 
dist_heure = df_final_clean.groupBy(hour("tpep_pickup_datetime").alias("Heure")) \
    .agg(count("*").alias("Total_Courses")) \
    .orderBy("Heure")

# Jour
# (1 = Dimanche, 2 = Lundi, ..., 7 = Samedi)
dist_jour = df_final_clean.groupBy(dayofweek("tpep_pickup_datetime").alias("Jour")) \
    .agg(count("*").alias("Total_Courses")) \
    .orderBy("Jour")

# Semaine 
dist_semaine = df_final_clean.groupBy(weekofyear("tpep_pickup_datetime").alias("Semaine")) \
    .agg(count("*").alias("Total_Courses")) \
    .orderBy("Semaine")
print("--- Distribution par Heure ---")
dist_heure.show(24)
print("--- Distribution par Jour (1=Dim, 7=Sam) ---")
dist_jour.show()
print("--- Distribution par Semaine de l'année ---")
dist_semaine.show()

In [0]:
from pyspark.sql.functions import countDistinct

diversity_stats = df_final_clean.select(countDistinct("PULocationID").alias("nb_zones")).collect()[0][0]

#(Top 10 zones les plus chères)
fares_by_zone = df_final_clean.groupBy("PULocationID") \
    .agg(avg("fare_amount").alias("avg_fare")) \
    .orderBy("avg_fare", ascending=False)

print(f"Nombre total de zones géographiques couvertes : {diversity_stats}")
fares_by_zone.show(10)

In [0]:
#Valeur des valeurs maximales et minimales de la valeur et de la distance
outliers_stats = df_final_clean.select("fare_amount", "trip_distance").summary("min", "max", "50%", "95%", "99%")
outliers_stats.show()

#Le comptage du nombre de voyages coûtant plus de 200 dollars est un exemple de valeurs aberrantes.
expensive_trips = df_final_clean.filter("fare_amount > 200").count()
print(f"Le nombre de voyages qui coûtent plus de 200 $ est : {expensive_trips}")

In [0]:
from pyspark.sql.functions import col, avg, when

tip_analysis = df_final_clean.filter("fare_amount > 0 AND payment_type IN (1, 2)") \
    .groupBy("payment_type") \
    .agg(avg(col("tip_amount") / col("fare_amount")).alias("tip_ratio_moyen"))
tip_analysis_named = tip_analysis.withColumn("payment_method", 
    when(col("payment_type") == 1, "Carte Bancaire").otherwise("Cash"))

tip_analysis_named.select("payment_method", "tip_ratio_moyen").show()