# Jeux olympiques d'été et d'hiver, 1896-2022

# CONTEXT :

* Nous souhaitons menez une Analyse sur cet ensemble de données historiques sur les Jeux olympiques, comprenant tous les Jeux d'Athènes 1896 à Pékin 2022. Le jeu de données comprend les résultats, les médailles, les athlètes et les hôtes.

In [47]:
from pyspark.sql import SparkSession
from pymongo import MongoClient
from pyspark.sql.functions import split, regexp_replace, upper, col, when, median

* Créer une session Spark avec la configuration du connector permettant d'intégrer Spark avec MongoDB

In [48]:
spark = SparkSession.builder.master("local[*]").config("spark.jars", "/postgresql-42.7.1.jar")\
     .config("spark.driver.memory", "10g")\
     .config("spark.driver.extraClassPath", "/postgresql-42.7.1.jar")\
     .config("spark.executor.extraClassPath", "/postgresql-42.7.1.jar")\
     .config("spark.repl.local.jars", "/postgresql-42.7.1.jar")\
     .getOrCreate()

* Charger les données du fichier olympic__hosts.csv et olympic__medals.csv depuis Hdfs avec Spark

In [49]:
hosts_df = spark.read.csv("hdfs://namenode:9000/data/staging/olympic__hosts.csv", header=True, inferSchema=True)

In [50]:
medals_df = spark.read.csv("hdfs://namenode:9000/data/staging/olympic__medals.csv", header=True, inferSchema=True)

In [51]:
athletes_df = spark.read.csv("hdfs://namenode:9000/data/staging/olympic_athletes.csv", header=True, inferSchema=True)

* Transfert de hosts_df et medals_df propres dans Hdfs archive

In [52]:
# Enregistrement des données traitées dans HDFS
hosts_df.write.mode("overwrite").parquet("hdfs://namenode:9000/data/archive/cleaned_hosts")

In [53]:
# Enregistrement des données traitées dans HDFS
medals_df.write.mode("overwrite").parquet("hdfs://namenode:9000/data/archive/cleaned_medals")

In [54]:
# Enregistrement des données traitées dans HDFS
athletes_df.write.mode("overwrite").parquet("hdfs://namenode:9000/data/archive/cleaned_athletes")

In [55]:
jdbc_url = "jdbc:postgresql://postgresql-olympicgames.alwaysdata.net:5432/olympicgames_db"
connection_properties = {
    "user": "olympicgames",
    "password": "ipssi2024",
    "driver": "org.postgresql.Driver"
}

* Transfert des autres données propres 'olympic_athletes.csv', 'olympic_medals.csv', 'olympic_hosts.csv' vers le Service Mongodb pour la partie Monitoring avec Streamlit

In [56]:
hosts_df.write.jdbc(
        url=jdbc_url,
        table="hosts",
        mode="overwrite", 
        properties=connection_properties
)

In [57]:
medals_df.write.jdbc(
        url=jdbc_url,
        table="medals",
        mode="overwrite", 
        properties=connection_properties
)

In [58]:
medals_df.write.jdbc(
        url=jdbc_url,
        table="athletes",
        mode="overwrite", 
        properties=connection_properties
)