# Jeux olympiques d'été et d'hiver, 1896-2022

# CONTEXT :

* Nous souhaitons menez une Analyse sur cet ensemble de données historiques sur les Jeux olympiques, comprenant tous les Jeux d'Athènes 1896 à Pékin 2022. Le jeu de données comprend les résultats, les médailles, les athlètes et les hôtes.

In [1]:
from pyspark.sql import SparkSession
from pymongo import MongoClient
import json
from pyspark.sql.functions import split, regexp_replace, upper, col, when, median

* Créer une session Spark avec la configuration du connector permettant d'intégrer Spark avec MongoDB

In [2]:
spark = SparkSession.builder.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0") \
        .getOrCreate()

* Charger les données du fichier olympic_hosts.csv depuis Hdfs avec Spark

In [3]:
results_df = spark.read.csv("hdfs://namenode:9000/olympic_game/staging/olympic_results.csv", header=True, inferSchema=True)

In [107]:
# premier 5 lignes avant nettoyage
results_df.show(5, truncate=False)

+----------------+-------------+------------+----------------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------+----------+-------------+-------------+------------+---------------------+-----------+-----------------+----------+----------+
|discipline_title|event_title  |slug_game   |participant_type|medal_type|athletes                                                                                                                                                  |rank_equal|rank_position|country_name |country_code|country_3_letter_code|athlete_url|athlete_full_name|value_unit|value_type|
+----------------+-------------+------------+----------------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------+----------+-------------+-------------+------------+----------

In [4]:
missing_values = [results_df.where(col(c).isNull()).count() for c in results_df.columns]

# Créer un dict avec les colonnes et leurs nombres de valeurs nulles
missing_values_dict = dict(zip(results_df.columns, missing_values))

# Afficher le résultat
print("Valeurs manquantes :")
for col_name, missing_count in missing_values_dict.items():
    print(col_name, ":", missing_count)

Valeurs manquantes :
discipline_title : 0
event_title : 0
slug_game : 0
participant_type : 0
medal_type : 142598
athletes : 154828
rank_equal : 130249
rank_position : 3888
country_name : 0
country_code : 4981
country_3_letter_code : 2
athlete_url : 32783
athlete_full_name : 21138
value_unit : 84155
value_type : 72765


In [5]:
# Nettoyage des données
# Supprimer les colonnes inutiles
columns_to_drop = ["value_unit", "value_type", "athlete_url", "country_3_letter_code", "country_code"]
results_cleaned_df = results_df.drop(*columns_to_drop)

In [6]:
# Fractionner la colonne 'athletes' en deux colonnes distinctes en supprimant les URL
results_cleaned_df = results_cleaned_df.withColumn("athlete_1", split(split(col("athletes"), "https://olympics.com/en/athletes/")[1], "'")[0]) \
                                       .withColumn("athlete_2", split(split(col("athletes"), "https://olympics.com/en/athletes/")[2], "'")[0])

In [7]:
# Supprimer la colonne 'athletes' désormais inutile
results_cleaned_df = results_cleaned_df.drop("athletes")

In [8]:
# Afficher les données nettoyées
print("Cleaned data:")
results_cleaned_df.show(5, truncate=False)

Cleaned data:
+----------------+-------------+------------+----------------+----------+----------+-------------+-------------+-----------------+--------------------+------------------+
|discipline_title|event_title  |slug_game   |participant_type|medal_type|rank_equal|rank_position|country_name |athlete_full_name|athlete_1           |athlete_2         |
+----------------+-------------+------------+----------------+----------+----------+-------------+-------------+-----------------+--------------------+------------------+
|Curling         |Mixed Doubles|beijing-2022|GameTeam        |GOLD      |False     |1            |Italy        |NULL             |stefania-constantini|amos-mosaner      |
|Curling         |Mixed Doubles|beijing-2022|GameTeam        |SILVER    |False     |2            |Norway       |NULL             |kristin-skaslien    |magnus-nedregotten|
|Curling         |Mixed Doubles|beijing-2022|GameTeam        |BRONZE    |False     |3            |Sweden       |NULL             |a

In [9]:
# Mettre les noms des athlètes en majuscules et supprimer les tirets
results_cleaned_df = results_cleaned_df.withColumn("athlete_1", upper(regexp_replace("athlete_1", "-", " "))) \
                                       .withColumn("athlete_2", upper(regexp_replace("athlete_2", "-", " ")))

In [10]:
# Définir les valeurs par défaut pour chaque type de colonne
default_values = {
    'medal_type': "UNDEFINED",
    'athlete_full_name': "UNDEFINED",
    'athlete_1': "UNDEFINED",
    'athlete_2': "UNDEFINED",
}

# Remplacer les valeurs manquantes dans chaque colonne avec les valeurs par défaut
for column, default_value in default_values.items():
    results_cleaned_df = results_cleaned_df.withColumn(column, when(col(column).isNull(), default_value).otherwise(col(column)))

In [11]:
# Remplacer les valeurs manquantes dans la colonne "rank_equal" par False
results_cleaned_df = results_cleaned_df.fillna({'rank_equal': False})

In [12]:
# Calculer la médiane pour la colonne "rank_position"
median_rank_position = results_cleaned_df.select(median('rank_position')).collect()[0][0]

# Remplacer les valeurs manquantes dans la colonne "rank_position" par la médiane
results_cleaned_df = results_cleaned_df.withColumn('rank_position', when(col('rank_position').isNull(), median_rank_position).otherwise(col('rank_position')))

In [13]:
# Afficher les données nettoyées
print("Cleaned data:")
results_cleaned_df.show(5, truncate=False)

Cleaned data:
+----------------+-------------+------------+----------------+----------+----------+-------------+-------------+-----------------+--------------------+------------------+
|discipline_title|event_title  |slug_game   |participant_type|medal_type|rank_equal|rank_position|country_name |athlete_full_name|athlete_1           |athlete_2         |
+----------------+-------------+------------+----------------+----------+----------+-------------+-------------+-----------------+--------------------+------------------+
|Curling         |Mixed Doubles|beijing-2022|GameTeam        |GOLD      |False     |1            |Italy        |UNDEFINED        |STEFANIA CONSTANTINI|AMOS MOSANER      |
|Curling         |Mixed Doubles|beijing-2022|GameTeam        |SILVER    |False     |2            |Norway       |UNDEFINED        |KRISTIN SKASLIEN    |MAGNUS NEDREGOTTEN|
|Curling         |Mixed Doubles|beijing-2022|GameTeam        |BRONZE    |False     |3            |Sweden       |UNDEFINED        |A

In [14]:
missing_values = [results_cleaned_df.where(col(c).isNull()).count() for c in results_cleaned_df.columns]

# Créer un dictionnaire avec les colonnes et leurs nombres de valeurs nulles
missing_values_dict = dict(zip(results_cleaned_df.columns, missing_values))

# Afficher le résultat
print("Valeurs manquantes :")
for col_name, missing_count in missing_values_dict.items():
    print(col_name, ":", missing_count)

Valeurs manquantes :
discipline_title : 0
event_title : 0
slug_game : 0
participant_type : 0
medal_type : 0
rank_equal : 0
rank_position : 0
country_name : 0
athlete_full_name : 0
athlete_1 : 0
athlete_2 : 0


* Transfert des autres données propres 'results_cleaned_df' de 'olympic_results.csv'  dans Hdfs archive

In [15]:
# Enregistrement des données traitées dans HDFS
results_cleaned_df.write.mode("overwrite").parquet("hdfs://namenode:9000/olympic_game/archive/cleaned_results")

* Transfert des données propres 'olympic_results.csv' vers le Service Mongodb pour la partie Monitoring avec Streamlit

In [120]:
# Écrire les données Spark dans MongoDB
results_cleaned_df.write.format("mongo") \
    .option("uri", "mongodb://db:27017/olympic_game.olympic_results") \
    .mode("overwrite") \
    .save()

* Transfert des autres données propres 'olympic_results.csv' vers Mongodb Atlas pour la partie Monitoring avec PowerBI

In [None]:
results_cleaned_df.write.format("mongo") \
    .option("uri", "mongodb+srv://samgilbella:h6JSEv4xx6z2bwqD@cluster0.mrg2znu.mongodb.net/olympic_game.olympic_results") \
    .mode("overwrite") \
    .save()

* Chargement des autres données propres 'olympic_athletes.csv', 'olympic_medals.csv', 'olympic_hosts.csv' depuis hdfs  

In [None]:
athletes_cleaned_df = spark.read.csv("hdfs://namenode:9000/olympic_game/archive/olympic_athletes.csv", header=True, inferSchema=True)

In [None]:
medals_cleaned_df = spark.read.csv("hdfs://namenode:9000/olympic_game/archive/olympic_medals.csv", header=True, inferSchema=True)

In [None]:
hosts_cleaned_df = spark.read.csv("hdfs://namenode:9000/olympic_game/archive/olympic_hosts.csv", header=True, inferSchema=True)

* Transfert des autres données propres 'olympic_athletes.csv', 'olympic_medals.csv', 'olympic_hosts.csv' vers le Service Mongodb pour la partie Monitoring avec Streamlit

In [121]:
# Écrire les données Spark dans MongoDB
hosts_cleaned_df.write.format("mongo") \
    .option("uri", "mongodb://db:27017/olympic_game.olympic_hosts") \
    .mode("overwrite") \
    .save()

In [122]:
# Écrire les données Spark dans MongoDB
medals_cleaned_df.write.format("mongo") \
    .option("uri", "mongodb://db:27017/olympic_game.olympic_medals") \
    .mode("overwrite") \
    .save()

In [123]:
# Écrire les données Spark dans MongoDB
athletes_cleaned_df.write.format("mongo") \
    .option("uri", "mongodb://db:27017/olympic_game.olympic_athletes") \
    .mode("overwrite") \
    .save()

* Transfert des autres données propres 'olympic_athletes.csv', 'olympic_medals.csv', 'olympic_hosts.csv' vers Mongodb Atlas pour la partie Monitoring avec PowerBI

In [99]:
medals_cleaned_df.write.format("mongo") \
    .option("uri", "mongodb+srv://samgilbella:h6JSEv4xx6z2bwqD@cluster0.mrg2znu.mongodb.net/olympic_game.olympic_medals") \
    .mode("overwrite") \
    .save()

In [100]:
athletes_cleaned_df.write.format("mongo") \
    .option("uri", "mongodb+srv://samgilbella:h6JSEv4xx6z2bwqD@cluster0.mrg2znu.mongodb.net/olympic_game.olympic_athletes") \
    .mode("overwrite") \
    .save()

In [None]:
hosts_cleaned_df.write.format("mongo") \
    .option("uri", "mongodb+srv://samgilbella:h6JSEv4xx6z2bwqD@cluster0.mrg2znu.mongodb.net/olympic_game.olympic_hosts") \
    .mode("overwrite") \
    .save()