<a href="https://colab.research.google.com/github/ayoubouafdi/BigDataLabs/blob/main/Lab_Spark_SQL_FIFA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Téléchargement du fichier depuis une source brute
!wget https://raw.githubusercontent.com/martj42/international_results/master/results.csv

--2026-01-27 17:41:13--  https://raw.githubusercontent.com/martj42/international_results/master/results.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3686208 (3.5M) [text/plain]
Saving to: ‘results.csv.1’


2026-01-27 17:41:13 (50.6 MB/s) - ‘results.csv.1’ saved [3686208/3686208]



In [None]:
# Installation de PySpark
!pip install pyspark

# Initialisation de la session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Chargement du fichier téléchargé
df = spark.read.csv("results.csv", header=True, inferSchema=True)





In [None]:
from pyspark.sql.functions import col, year, desc

# 1. Nombre de matchs [cite: 7]
print(f"Nombre de matchs présents : {df.count()}")

# 2. Première et dernière année [cite: 8]
df.select(year(col("date")).alias("year")).selectExpr("min(year) as Debut", "max(year) as Fin").show()

# 3. Les 10 tournois les plus fréquents [cite: 9]
print("Top 10 des tournois :")
df.groupBy("tournament").count().orderBy(desc("count")).show(10)

# 4. Matchs sur terrain neutre [cite: 10]
count_neutral = df.filter(col("neutral") == True).count()
print(f"Nombre de matchs sur terrain neutre : {count_neutral}")

# 5. Top 10 des pays ayant accueilli le plus de matchs [cite: 11]
print("Top 10 des pays hôtes :")
df.groupBy("country").count().orderBy(desc("count")).show(10)

# 6. Combien de matchs ont terminé sur un score nul?
# On compare si home_score est égal à away_score
matchs_nuls = df.filter(col("home_score") == col("away_score")).count()
print(f"Nombre de matchs nuls : {matchs_nuls}")

# 7. Afficher les matchs où le score total (home + away) est supérieur à 6
# On crée une colonne temporaire 'total_score' pour le filtre
print("Matchs avec un score total > 6 :")
df.withColumn("total_score", col("home_score") + col("away_score")).filter("total_score > 6").select("date", "home_team", "away_team", "home_score", "away_score", "total_score").show()

Nombre de matchs présents : 48943
+-----+----+
|Debut| Fin|
+-----+----+
| 1872|2026|
+-----+----+

Top 10 des tournois :
+--------------------+-----+
|          tournament|count|
+--------------------+-----+
|            Friendly|18171|
|FIFA World Cup qu...| 8688|
|UEFA Euro qualifi...| 2824|
|African Cup of Na...| 2278|
|      FIFA World Cup|  964|
|        Copa América|  869|
|African Cup of Na...|  845|
|AFC Asian Cup qua...|  829|
| UEFA Nations League|  658|
|          CECAFA Cup|  620|
+--------------------+-----+
only showing top 10 rows
Nombre de matchs sur terrain neutre : 12902
Top 10 des pays hôtes :
+--------------------+-----+
|             country|count|
+--------------------+-----+
|       United States| 1472|
|              France|  912|
|            Malaysia|  830|
|             England|  761|
|               Qatar|  760|
|            Thailand|  722|
|              Sweden|  686|
|             Germany|  683|
|United Arab Emirates|  601|
|        South Africa|  599|
+-

In [None]:
from pyspark.sql.functions import col, desc, sum

# 8. Nombre total de matchs joués par chaque équipe (domicile + extérieur)
home_counts = df.groupBy("home_team").count().withColumnRenamed("home_team", "team")
away_counts = df.groupBy("away_team").count().withColumnRenamed("away_team", "team")

# Union des deux et agrégation finale
total_matches = home_counts.union(away_counts).groupBy("team").agg(sum(col("count")).alias("total_played")).orderBy(desc("total_played"))

print("Nombre total de matchs par équipe :")
total_matches.show(10)

# 9. Top 10 des équipes ayant marqué le plus de buts
# On calcule les buts marqués à domicile et à l'extérieur séparément
home_goals = df.groupBy("home_team").agg(sum(col("home_score")).alias("goals")).withColumnRenamed("home_team", "team")
away_goals = df.groupBy("away_team").agg(sum(col("away_score")).alias("goals")).withColumnRenamed("away_team", "team")

top_scorers = home_goals.union(away_goals).groupBy("team").agg(sum(col("goals")).alias("total_goals_scored")).orderBy(desc("total_goals_scored"))

print("Top 10 des équipes les plus prolifiques :")
top_scorers.show(10)


Nombre total de matchs par équipe :
+-----------+------------+
|       team|total_played|
+-----------+------------+
|     Sweden|        1097|
|    England|        1086|
|  Argentina|        1062|
|     Brazil|        1055|
|    Germany|        1027|
|South Korea|        1003|
|    Hungary|        1002|
|     Mexico|         996|
|    Uruguay|         964|
|     France|         931|
+-----------+------------+
only showing top 10 rows
Top 10 des équipes les plus prolifiques :
+-----------+------------------+
|       team|total_goals_scored|
+-----------+------------------+
|    England|              2376|
|    Germany|              2308|
|     Brazil|              2293|
|     Sweden|              2162|
|  Argentina|              2009|
|    Hungary|              2005|
|Netherlands|              1836|
|South Korea|              1786|
|     Mexico|              1753|
|     France|              1706|
+-----------+------------------+
only showing top 10 rows


In [None]:
from pyspark.sql.functions import floor, avg, col, year

#10. On extrait l'année, on divise par 10, on arrondit à l'inférieur et on multiplie par 10
df_with_decade = df.withColumn("decade", (floor(year(col("date")) / 10) * 10))

stats_decade = df_with_decade.groupBy("decade").agg(avg(col("home_score") + col("away_score")).alias("moyenne_buts")).orderBy("decade")

print("Moyenne de buts par match par décennie :")
stats_decade.show()

Moyenne de buts par match par décennie :
+------+------------------+
|decade|      moyenne_buts|
+------+------------------+
|  1870| 4.538461538461538|
|  1880| 5.581818181818182|
|  1890|5.1525423728813555|
|  1900| 4.182481751824818|
|  1910| 4.218181818181818|
|  1920|3.8840579710144927|
|  1930| 4.318813716404078|
|  1940|  4.34453781512605|
|  1950| 4.004239854633555|
|  1960| 3.478129205921938|
|  1970|2.9750726040658275|
|  1980|  2.52687101910828|
|  1990|2.7790580440731674|
|  2000| 2.801259842519685|
|  2010|2.7321575061525842|
|  2020|2.7028520499108732|
+------+------------------+



In [None]:
from pyspark.sql.functions import col, year, desc, when, avg, abs

#11. Groupement par deux colonnes et comptage
print("11. Statistiques du nombre de matchs organisés par tournoi et par année :")
df.groupBy("tournament", year(col("date")).alias("year")).count().show()

#12. Filtre sur les victoires à domicile (score domicile > score extérieur)
print("12. Classement des équipes les plus performantes à domicile (nombre de victoires) :")
df.filter(col("home_score") > col("away_score")).groupBy("home_team").count().orderBy(desc("count")).show()

#13. On définit le résultat pour chaque match (W, L, D) puis on regroupe
print("13. Bilan complet par équipe (Nombre de victoires, défaites et matchs nuls) :")
results = df.select(col("home_team").alias("t"), when(col("home_score") > col("away_score"), "W").when(col("home_score") < col("away_score"), "L").otherwise("D").alias("res")).union(df.select(col("away_team").alias("t"), when(col("away_score") > col("home_score"), "W").when(col("away_score") < col("home_score"), "L").otherwise("D").alias("res")))
results.groupBy("t", "res").count().show()

#14. Comparaison de la moyenne de la somme des scores selon la colonne 'neutral'
print("14. Comparaison du score moyen : Matchs sur terrain neutre vs Terrain non-neutre :")
df.groupBy("neutral").agg(avg(col("home_score") + col("away_score")).alias("moyenne_buts")).show()

#15. Utilisation de la fonction abs() pour calculer la différence absolue
print("15. Analyse des records : Top 5 des matchs ayant le plus grand écart de buts :")
df.withColumn("ecart", abs(col("home_score") - col("away_score"))).orderBy(desc("ecart")).select("date", "home_team", "away_team", "ecart").show(5)

11. Statistiques du nombre de matchs organisés par tournoi et par année :
+--------------------+----+-----+
|          tournament|year|count|
+--------------------+----+-----+
|     Copa Rio Branco|1946|    1|
|Copa Bernardo O'H...|1959|    2|
|   Copa Félix Bogado|1983|    2|
|        Copa Artigas|1985|    2|
|        Muratti Vase|1986|    2|
|           Kirin Cup|1993|    3|
|        Copa América|2001|   26|
|   AFC Challenge Cup|2014|   16|
|           Kirin Cup|2016|    4|
|            Friendly|2018|  431|
|AFF Championship ...|2018|    2|
| Kirin Challenge Cup|2021|    1|
|    AFF Championship|2021|   25|
|Copa Chevallier B...|1926|    2|
|Copa Bernardo O'H...|1966|    2|
| Nordic Championship|1972|    4|
|  Merdeka Tournament|1974|   10|
|British Home Cham...|1977|    6|
|          King's Cup|1980|    3|
|            Friendly|1984|  243|
+--------------------+----+-----+
only showing top 20 rows
12. Classement des équipes les plus performantes à domicile (nombre de victoires) :
+

In [None]:
from pyspark.sql.functions import col, desc, sum, when, year, floor, avg, count, row_number, max
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# On combine les buts marqués/encaissés à domicile et à l'extérieur
print("16. Calcul du Goal Average (Buts marqués - Buts encaissés) pour chaque sélection :")
stats_m = df.select(col("home_team").alias("t"), col("home_score").alias("bp"), col("away_score").alias("bc")).union(df.select(col("away_team").alias("t"), col("away_score").alias("bp"), col("home_score").alias("bc")))
stats_m.groupBy("t").agg((sum("bp") - sum("bc")).alias("goal_average")).orderBy(desc("goal_average")).show()


# Définition de la fenêtre de partitionnement par année
print("17. Classement annuel des meilleures équipes (basé sur le nombre de victoires) :")
window_year = Window.partitionBy("year").orderBy(desc("wins"))
wins_df = df.withColumn("winner", when(col("home_score") > col("away_score"), col("home_team")).when(col("away_score") > col("home_score"), col("away_team"))).filter(col("winner").isNotNull()).groupBy(year("date").alias("year"), "winner").agg(count("*").alias("wins"))

wins_df.withColumn("rank",row_number().over(window_year)).filter("rank <= 3").show()


# Calcul de la décennie et agrégation
print("18. Analyse historique : Évolution du volume de matchs joués par décennie :")
df.withColumn("decade", floor(year("date") / 10) * 10).groupBy("decade").count().orderBy("decade").show()


# On identifie d'abord toutes les équipes ayant perdu au moins un match
print("19. Liste des sélections nationales restées invaincues sur une année civile :")
all_t = df.select(year("date").alias("y"), col("home_team").alias("t")).union(df.select(year("date").alias("y"), col("away_team").alias("t"))).distinct()
losers = df.select(year("date").alias("y"), when(col("home_score") < col("away_score"), col("home_team")).otherwise(col("away_team")).alias("t")).filter(col("home_score") != col("away_score")).distinct()
# On utilise right_anti pour garder celles qui ne sont pas dans la liste des perdants
all_t.join(losers, ["y", "t"], "left_anti").orderBy("y").show()




print("20. Version  : Plus longue série de victoires consécutives :")
# To calculate the longest winning streak for each team, we need to transform the data
# such that each row represents a team's result in a match.

# 1. Create a DataFrame with each team's perspective for each match
team_matches_df = df.select(
    col("date"),
    col("home_team").alias("team"),
    when(col("home_score") > col("away_score"), "W")
    .when(col("home_score") < col("away_score"), "L")
    .otherwise("D").alias("result")
).unionAll(
    df.select(
        col("date"),
        col("away_team").alias("team"),
        when(col("away_score") > col("home_score"), "W")
        .when(col("away_score") < col("home_score"), "L")
        .otherwise("D").alias("result")
    )
)

# 2. Order the results by team and date
ordered_team_matches = team_matches_df.orderBy(col("team"), col("date"))

# 3. Identify if it's a win and mark streak breaks
window_spec = Window.partitionBy("team").orderBy("date")

# 'is_winning' is 1 if win, 0 otherwise (draw or loss)
# 'prev_is_winning' checks if the previous match was a win for the same team
# 'streak_breaker' marks where a new streak begins (i.e., not a win or previous was not a win)
streak_df = ordered_team_matches.withColumn(
    "is_winning", when(col("result") == "W", 1).otherwise(0)
).withColumn(
    "prev_is_winning", F.lag(col("is_winning"), 1, 0).over(window_spec)
).withColumn(
    "streak_breaker", when(col("is_winning") == 0, 1).when(col("prev_is_winning") == 0, 1).otherwise(0)
)

# 4. Create a streak group ID using a cumulative sum of streak_breaker
streak_df_with_group = streak_df.withColumn(
    "streak_group", F.sum("streak_breaker").over(window_spec)
)

# 5. Filter for actual wins within each streak group and count them
longest_streaks_df = streak_df_with_group.filter(col("is_winning") == 1)\
    .groupBy("team", "streak_group")\
    .agg(F.count("*").alias("current_streak_length"))\
    .groupBy("team")\
    .agg(F.max("current_streak_length").alias("max_streak"))\
    .orderBy(F.desc("max_streak"))

longest_streaks_df.show(10)





# Détermination du vainqueur par match et ranking par tournoi
print("21. Les rois de la compétition : Équipe la plus victorieuse par tournoi :")
winners = df.withColumn("win", when(col("home_score") > col("away_score"), col("home_team")).otherwise(col("away_team"))).groupBy("tournament", "win").count()
winners.withColumn("rn", row_number().over(Window.partitionBy("tournament").orderBy(desc("count")))).filter("rn = 1").show()


# Jointure des moyennes de buts marqués selon le lieu
print("22. Analyse comparative des performances : Efficacité à domicile vs à l'extérieur :")
h_avg = df.groupBy("home_team").agg(avg("home_score").alias("avg_home"))
a_avg = df.groupBy("away_team").agg(avg("away_score").alias("avg_away"))
h_avg.join(a_avg, h_avg.home_team == a_avg.away_team).select(col("home_team").alias("team"), "avg_home", "avg_away").show()

16. Calcul du Goal Average (Buts marqués - Buts encaissés) pour chaque sélection :
+-----------+------------+
|          t|goal_average|
+-----------+------------+
|     Brazil|        1342|
|    England|        1337|
|    Germany|        1115|
|  Argentina|         937|
|      Spain|         894|
|South Korea|         877|
|Netherlands|         764|
|     Sweden|         749|
|     Mexico|         701|
|      Italy|         685|
|       Iran|         664|
|     Russia|         560|
|  Australia|         544|
|      Japan|         530|
|    Hungary|         524|
|   China PR|         516|
|     France|         502|
|      Egypt|         455|
|   Portugal|         445|
|    Denmark|         409|
+-----------+------------+
only showing top 20 rows
17. Classement annuel des meilleures équipes (basé sur le nombre de victoires) :
+----+--------+----+----+
|year|  winner|wins|rank|
+----+--------+----+----+
|1873| England|   1|   1|
|1874|Scotland|   1|   1|
|1876|Scotland|   2|   1|
|1877|S