In [88]:
from pyspark.sql import SparkSession
import os
spark = SparkSession.builder.getOrCreate()

In [89]:
# Directory containing the CSV files
csv_directory = "data/"

csv_files = []

# List all CSV files in the directory
for f in os.listdir(csv_directory):
    if f.endswith(".csv"):
        csv_files.append(f)
csv_files

['Athletes.csv', 'Coaches.csv', 'EntriesGender.csv', 'Medals.csv', 'Teams.csv']

In [90]:
# Loop through the CSV files and read them into DataFrames
dataframes = {}
for csv_file in csv_files:
    # Extract the DataFrame name from the CSV file name (removing the ".csv" extension)
    dataframe_name = os.path.splitext(csv_file)[0]
    
    # Read the CSV file into a DataFrame
    dataframe = spark.read.csv(os.path.join(csv_directory, csv_file), header=True, inferSchema=True)
    
    # Store the DataFrame in the dictionary with the appropriate name
    dataframes[dataframe_name] = dataframe
dataframes

{'Athletes': DataFrame[PersonName: string, Country: string, Discipline: string],
 'Coaches': DataFrame[Name: string, Country: string, Discipline: string, Event: string],
 'EntriesGender': DataFrame[Discipline: string, Female: int, Male: int, Total: int],
 'Medals': DataFrame[Rank: int, Team_Country: string, Gold: int, Silver: int, Bronze: int, Total: int, Rank by Total: int],
 'Teams': DataFrame[TeamName: string, Discipline: string, Country: string, Event: string]}

In [91]:
from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, asc

In [92]:
w = Window().orderBy(lit('A')) 
dim_discipline = dataframes['Athletes'].select('Discipline').union(dataframes['Coaches'].select('Discipline')) \
                                        .union(dataframes['EntriesGender'].select('Discipline')) \
                                        .union(dataframes['Teams'].select('Discipline')).distinct() \
                                        .orderBy(asc("Discipline")).withColumn('Id', row_number().over(w)) \
                                        .join(dataframes['EntriesGender'],'Discipline','left') \
                                        .select('Id','Discipline','Female','Male','Total')

dim_discipline.show(10)                                                     
#dim_discipline.show(50, truncate=False)
dim_discipline.printSchema                                                     

+---+-------------------+------+----+-----+
| Id|         Discipline|Female|Male|Total|
+---+-------------------+------+----+-----+
|  1|     3x3 Basketball|    32|  32|   64|
|  2|            Archery|    64|  64|  128|
|  3|Artistic Gymnastics|    98|  98|  196|
|  4|  Artistic Swimming|   105|   0|  105|
|  5|          Athletics|   969|1072| 2041|
|  6|          Badminton|    86|  87|  173|
|  7|  Baseball/Softball|    90| 144|  234|
|  8|         Basketball|   144| 144|  288|
|  9|   Beach Volleyball|    48|  48|   96|
| 10|             Boxing|   102| 187|  289|
+---+-------------------+------+----+-----+
only showing top 10 rows



<bound method DataFrame.printSchema of DataFrame[Id: int, Discipline: string, Female: int, Male: int, Total: int]>

In [93]:
dim_country = dataframes['Athletes'].select('Country').union(dataframes['Coaches'].select('Country')) \
                                    .union(dataframes['Teams'].select('Country')).union(dataframes['Medals'].select('Team_Country')) \
                                    .distinct().dropDuplicates().orderBy(asc("Country")).withColumn('Id', row_number().over(w)) \
                                    .select('Id', 'Country')
dim_country.show(5)

+---+--------------+
| Id|       Country|
+---+--------------+
|  1|   Afghanistan|
|  2|       Albania|
|  3|       Algeria|
|  4|American Samoa|
|  5|       Andorra|
+---+--------------+
only showing top 5 rows



In [94]:
fact_medal = dataframes['Medals'].join(dim_country,[dim_country.Country == dataframes['Medals'].Team_Country],'left') \
                                 .withColumnRenamed('id', 'Country_Id').dropDuplicates().orderBy(asc("Rank")).withColumn('Id', row_number().over(w)) \
                                 .withColumnRenamed('Rank by Total', 'Rank_by_Total') \
                                 .select('Id','Country_Id','Gold','Silver','Bronze','Total','Rank','Rank_by_Total')

fact_medal.show(5)

+---+----------+----+------+------+-----+----+-------------+
| Id|Country_Id|Gold|Silver|Bronze|Total|Rank|Rank_by_Total|
+---+----------+----+------+------+-----+----+-------------+
|  1|       196|  39|    41|    33|  113|   1|            1|
|  2|       145|  38|    32|    18|   88|   2|            2|
|  3|        95|  27|    14|    17|   58|   3|            5|
|  4|        74|  22|    21|    22|   65|   4|            4|
|  5|       152|  20|    28|    23|   71|   5|            3|
+---+----------+----+------+------+-----+----+-------------+
only showing top 5 rows



In [95]:
dim_coach = dataframes['Coaches'].join(dim_country, 'Country', 'left').withColumnRenamed('Id', 'Country_Id').dropDuplicates().orderBy(asc("Name")).withColumn('Id', row_number().over(w)) \
                                 .select('Id','Name','Country_Id', 'Discipline')

dim_coach.show(5)

+---+---------------+----------+----------+
| Id|           Name|Country_Id|Discipline|
+---+---------------+----------+----------+
|  1|ABDELMAGID Wael|        58|  Football|
|  2|      ABE Junya|        95|Volleyball|
|  3|  ABE Katsuhiko|        95|Basketball|
|  4|   ADAMA Cherif|        50|  Football|
|  5|     AGEBA Yuya|        95|Volleyball|
+---+---------------+----------+----------+
only showing top 5 rows



In [96]:
dim_athlete = dataframes['Athletes'].join(dim_country, 'Country', 'left').withColumnRenamed('Id', 'Country_Id').withColumnRenamed('PersonName','Name').dropDuplicates().orderBy(asc("Name")).withColumn('Id', row_number().over(w)) \
                                 .select('Id','Name','Country_Id', 'Discipline')

dim_athlete.show(5)

+---+-----------------+----------+-------------------+
| Id|             Name|Country_Id|         Discipline|
+---+-----------------+----------+-------------------+
|  1|  AALERUD Katrine|       137|       Cycling Road|
|  2|      ABAD Nestor|       175|Artistic Gymnastics|
|  3|ABAGNALE Giovanni|        93|             Rowing|
|  4|   ABALDE Alberto|       175|         Basketball|
|  5|    ABALDE Tamara|       175|         Basketball|
+---+-----------------+----------+-------------------+
only showing top 5 rows



In [97]:
fact_sport = dim_discipline.withColumnRenamed('Id', 'Discipline_Id').join(dim_coach, 'Discipline', 'left').withColumnRenamed('Id', 'Coach_Id') \
                              .join(dim_athlete, 'Discipline', 'left').withColumnRenamed('Id', 'Athlete_Id') \
                              .dropDuplicates().orderBy(asc("Discipline")).withColumn('Id', row_number().over(w)) \
                              .select('Id','Discipline_Id','Coach_Id','Athlete_Id')

fact_sport.show(5)

+---+-------------+--------+----------+
| Id|Discipline_Id|Coach_Id|Athlete_Id|
+---+-------------+--------+----------+
|  1|            1|    null|     11071|
|  2|            1|    null|     10892|
|  3|            1|    null|     10841|
|  4|            1|    null|     10755|
|  5|            1|    null|     10701|
+---+-------------+--------+----------+
only showing top 5 rows



In [98]:
dim_coach = dim_coach.select('Id','Name','Country_Id')
dim_athlete = dim_athlete.select('Id','Name','Country_Id')

In [99]:
df_dic = {"dim_discipline":dim_discipline,"dim_country":dim_country,"fact_medal":fact_medal,"dim_athlete":dim_athlete,"dim_team":dim_team,"dim_coach":dim_coach,"fact_sport":fact_sport}

for name,df in df_dic.items():
    df.write \
      .format("jdbc") \
      .mode("overwrite") \
      .option("url", "jdbc:mysql://localhost:3306/olympics_tokyo") \
      .option("dbtable", name) \
      .option("user", "evan") \
      .option("password", "Wrwr1234") \
      .save()

In [100]:
spark.stop()