In [2]:
import os
import pandas as pd 
import numpy as np
from dotenv import load_dotenv

#Spark
import pyspark.sql.functions as F 
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType , BooleanType

# Charger les variables d'environnement
load_dotenv()

url = os.getenv("DB_URL")
user = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")


# Création d'une session Spark
spark = SparkSession.builder \
    .appName("Hackaton17") \
    .config("spark.jars", "postgresql-42.6.0.jar") \
    .getOrCreate()
    


properties = {
    "user": user,
    "password": password,
    "driver": "org.postgresql.Driver"
}

In [3]:
file_path = 'olympic_hosts.csv'

schema_hosts = StructType([
    StructField("_id", IntegerType(), True),
    StructField("game_slug", StringType(), True),
    StructField("game_end_date", TimestampType(), True),
    StructField("game_start_date", TimestampType(), True),
    StructField("game_location", StringType(), True),
    StructField("game_name", StringType(), True),
    StructField("game_season", StringType(), True),
    StructField("game_year", IntegerType(), True)
])

# Chargement des données dans un DataFrame
df = spark.read \
    .csv(file_path, header=True, schema=schema_hosts) \


# Affichage du DataFrame
df.show()

# Écriture du DataFrame dans PostgreSQL
df.write \
    .jdbc(url=url, table="olympic_hosts", mode="overwrite", properties=properties)

+---+-------------------+-------------------+-------------------+------------------+-------------------+-----------+---------+
|_id|          game_slug|      game_end_date|    game_start_date|     game_location|          game_name|game_season|game_year|
+---+-------------------+-------------------+-------------------+------------------+-------------------+-----------+---------+
|  0|       beijing-2022|2022-02-20 13:00:00|2022-02-04 16:00:00|             China|       Beijing 2022|     Winter|     2022|
|  1|         tokyo-2020|2021-08-08 16:00:00|2021-07-23 13:00:00|             Japan|         Tokyo 2020|     Summer|     2020|
|  2|   pyeongchang-2018|2018-02-25 09:00:00|2018-02-09 00:00:00| Republic of Korea|   PyeongChang 2018|     Winter|     2018|
|  3|           rio-2016|2016-08-21 23:00:00|2016-08-05 14:00:00|            Brazil|           Rio 2016|     Summer|     2016|
|  4|         sochi-2014|2014-02-23 17:00:00|2014-02-07 05:00:00|Russian Federation|         Sochi 2014|     Wi

In [25]:
file_path = 'olympic_medals.csv'

schema_medals = StructType([
    StructField("_id", IntegerType(), True),
    StructField("discipline_title", StringType(), True),
    StructField("slug_game", StringType(), True),
    StructField("event_title", StringType(), True),
    StructField("event_gender", StringType(), True),
    StructField("medal_type", StringType(), True),
    StructField("participant_type", StringType(), True),
    StructField("participant_title", StringType(), True),
    StructField("athlete_url", StringType(), True),
    StructField("athlete_full_name", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("country_code", StringType(), True),
    StructField("country_3_letter_code", StringType(), True)
])

# Chargement des données dans un DataFrame
df_medals = spark.read \
    .csv(file_path, header=True, schema=schema_medals)

# Affichage du DataFrame
df_medals.show()

# Écriture du DataFrame dans PostgreSQL
df_medals.write \
    .jdbc(url=url, table="olympic_medals", mode="overwrite", properties=properties)

+---+----------------+------------+--------------------+------------+----------+----------------+-----------------+--------------------+--------------------+--------------------+------------+---------------------+
|_id|discipline_title|   slug_game|         event_title|event_gender|medal_type|participant_type|participant_title|         athlete_url|   athlete_full_name|        country_name|country_code|country_3_letter_code|
+---+----------------+------------+--------------------+------------+----------+----------------+-----------------+--------------------+--------------------+--------------------+------------+---------------------+
|  0|         Curling|beijing-2022|       Mixed Doubles|       Mixed|      GOLD|        GameTeam|            Italy|https://olympics....|Stefania CONSTANTINI|               Italy|          IT|                  ITA|
|  1|         Curling|beijing-2022|       Mixed Doubles|       Mixed|      GOLD|        GameTeam|            Italy|https://olympics....|        

In [26]:
file_path = 'olympic_results.csv'

schema_results = StructType([
    StructField("_id", IntegerType(), True),
    StructField("discipline_title", StringType(), True),
    StructField("event_title", StringType(), True),
    StructField("slug_game", StringType(), True),
    StructField("participant_type", StringType(), True),
    StructField("medal_type", StringType(), True),
    StructField("athletes", StringType(), True), # à revoir
    StructField("rank_equal", BooleanType(), True), 
    StructField("rank_position", IntegerType(), True),
    StructField("country_name", StringType(), True),
    StructField("country_code", StringType(), True),
    StructField("country_3_letter_code", StringType(), True),
    StructField("athlete_url", StringType(), True),
    StructField("athlete_full_name", StringType(), True),
    StructField("value_unit", StringType(), True),
    StructField("value_type", StringType(), True)
])

# Chargement des données dans un DataFrame
df_results = spark.read \
    .csv(file_path, header=True, schema=schema_results)


# Affichage du DataFrame
df_results.show()

# Écriture du DataFrame dans PostgreSQL
df_results.write \
    .jdbc(url=url, table="olympic_results", mode="overwrite", properties=properties)

+---+----------------+-------------+------------+----------------+----------+--------------------+----------+-------------+--------------------+------------+---------------------+-----------+-----------------+----------+----------+
|_id|discipline_title|  event_title|   slug_game|participant_type|medal_type|            athletes|rank_equal|rank_position|        country_name|country_code|country_3_letter_code|athlete_url|athlete_full_name|value_unit|value_type|
+---+----------------+-------------+------------+----------------+----------+--------------------+----------+-------------+--------------------+------------+---------------------+-----------+-----------------+----------+----------+
|  0|         Curling|Mixed Doubles|beijing-2022|        GameTeam|      GOLD|[('Stefania CONST...|     false|            1|               Italy|          IT|                  ITA|       NULL|             NULL|      NULL|      NULL|
|  1|         Curling|Mixed Doubles|beijing-2022|        GameTeam|    SI

In [30]:
file_path = 'olympic_athletes.csv'

schema_athletes = StructType([
    StructField("_id", IntegerType(), True),
    StructField("athlete_url", StringType(), True),
    StructField("athlete_full_name", StringType(), True),
    StructField("games_participations", IntegerType(), True),
    StructField("first_game", StringType(), True),
    StructField("athlete_year_birth", IntegerType(), True), 
    StructField("athlete_medals", StringType(), True),      
    StructField("bio", StringType(), True)
])

# Chargement des données dans un DataFrame
df = spark.read \
    .csv(file_path, header=True, schema=schema_athletes) \


# Affichage du DataFrame
df.show()

# Écriture du DataFrame dans PostgreSQL
df.write \
    .jdbc(url=url, table="olympic_athletes", mode="overwrite", properties=properties)

+---+--------------------+--------------------+--------------------+----------------+------------------+--------------+--------------------+
|_id|         athlete_url|   athlete_full_name|games_participations|      first_game|athlete_year_birth|athlete_medals|                 bio|
+---+--------------------+--------------------+--------------------+----------------+------------------+--------------+--------------------+
|  0|https://olympics....|Cooper WOODS-TOPA...|                   1|    Beijing 2022|              NULL|          NULL|                NULL|
|  1|https://olympics....|      Felix ELOFSSON|                   2|PyeongChang 2018|              NULL|          NULL|                NULL|
|  2|https://olympics....|       Dylan WALCZYK|                   1|    Beijing 2022|              NULL|          NULL|                NULL|
|  3|https://olympics....|       Olli PENTTALA|                   1|    Beijing 2022|              NULL|          NULL|                NULL|
|  4|https://

In [14]:
file_path = 'olympic_hosts.csv'
olympics_athletes = pd.read_csv(file_path)

print(olympics_athletes.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 53 entries, 0 to 52
Data columns (total 7 columns):
 #   Column           Non-Null Count  Dtype 
---  ------           --------------  ----- 
 0   game_slug        53 non-null     object
 1   game_end_date    53 non-null     object
 2   game_start_date  53 non-null     object
 3   game_location    53 non-null     object
 4   game_name        53 non-null     object
 5   game_season      53 non-null     object
 6   game_year        53 non-null     int64 
dtypes: int64(1), object(6)
memory usage: 3.0+ KB
None


In [5]:
print("="*40)
print("Olymics Athletes Info")
print("="*40)
print(olympics_athletes.head())
print(olympics_athletes.info())
print(olympics_athletes.isnull().sum())

Olymics Athletes Info
                                         athlete_url       athlete_full_name   
0  https://olympics.com/en/athletes/cooper-woods-...  Cooper WOODS-TOPALOVIC  \
1          https://olympics.com/en/athletes/elofsson          Felix ELOFSSON   
2     https://olympics.com/en/athletes/dylan-walczyk           Dylan WALCZYK   
3     https://olympics.com/en/athletes/olli-penttala           Olli PENTTALA   
4          https://olympics.com/en/athletes/reikherd        Dmitriy REIKHERD   

   games_participations        first_game  athlete_year_birth athlete_medals   
0                     1      Beijing 2022              2000.0            NaN  \
1                     2  PyeongChang 2018              1995.0            NaN   
2                     1      Beijing 2022              1993.0            NaN   
3                     1      Beijing 2022              1995.0            NaN   
4                     1      Beijing 2022              1989.0            NaN   

   bio  
0  NaN 

In [5]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Query") \
    .config("spark.jars", "postgresql-42.6.0.jar") \
    .getOrCreate()

#Se connecter à la
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgresql-mia17-hackaton.alwaysdata.net:5432/mia17-hackaton_psql") \
    .option("dbtable", "public.olympic_hosts") \
    .option("user", "mia17-hackaton") \
    .option("password", "hackaton-mia17") \
    .option("driver", "org.postgresql.Driver") \
    .load()

# Créer une vue temporaire pour le DataFrame
df.createOrReplaceTempView("olympic_hosts")

# Exécuter une requête SQL
result = spark.sql("SELECT * FROM olympic_hosts WHERE game_location like 'Italy'")

# Afficher les résultats
result.show()

# Fermer la session Spark
spark.stop()

+---+--------------------+-------------------+-------------------+-------------+--------------------+-----------+---------+
|_id|           game_slug|      game_end_date|    game_start_date|game_location|           game_name|game_season|game_year|
+---+--------------------+-------------------+-------------------+-------------+--------------------+-----------+---------+
|  8|          turin-2006|2006-02-26 20:00:00|2006-02-10 08:00:00|        Italy|          Turin 2006|     Winter|     2006|
| 31|           rome-1960|1960-09-11 20:00:00|1960-08-25 08:00:00|        Italy|           Rome 1960|     Summer|     1960|
| 34|cortina-d-ampezzo...|1956-02-05 20:00:00|1956-01-26 08:00:00|        Italy|Cortina d'Ampezzo...|     Winter|     1956|
+---+--------------------+-------------------+-------------------+-------------+--------------------+-----------+---------+



In [6]:
# Fermer la session Spark
spark.stop()