In [50]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [9]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [10]:
spark = SparkSession.builder \
    .appName("Business Logic") \
    .enableHiveSupport() \
    .getOrCreate()

In [11]:
spark.sql("CREATE DATABASE IF NOT EXISTS gold")

DataFrame[]

In [12]:
spark.sql("USE gold")

DataFrame[]

In [13]:
path ="hdfs:///data/silver/Fact_Table_parquet"
df_fact = spark.read.parquet(path)

                                                                                

In [14]:
df_fact.printSchema()

root
 |-- Edition ID: integer (nullable = true)
 |-- Country NOC: string (nullable = true)
 |-- Result ID: integer (nullable = true)
 |-- Athlete ID: integer (nullable = true)
 |-- Position: string (nullable = true)
 |-- Medal Athlete: string (nullable = true)
 |-- Gold Medals: integer (nullable = true)
 |-- Silver Medals: integer (nullable = true)
 |-- Bronze Medals: integer (nullable = true)
 |-- Total Medals: integer (nullable = true)



In [15]:
df_fact.show(5)

                                                                                

+----------+-----------+---------+----------+--------+-------------+-----------+-------------+-------------+------------+
|Edition ID|Country NOC|Result ID|Athlete ID|Position|Medal Athlete|Gold Medals|Silver Medals|Bronze Medals|Total Medals|
+----------+-----------+---------+----------+--------+-------------+-----------+-------------+-------------+------------+
|        23|        KOR|    40745|      1971|       2|       Silver|         12|            5|           12|          29|
|        23|        KOR|    40745|      1972|      15|     No Medal|         12|            5|           12|          29|
|        23|        KOR|    40745|      1977|      20|     No Medal|         12|            5|           12|          29|
|        23|        KOR|    40788|      1972|       5|     No Medal|         12|            5|           12|          29|
|        23|        KOR|    40788|      1971|       5|     No Medal|         12|            5|           12|          29|
+----------+-----------+

In [16]:
df_unique_results = df_fact.dropDuplicates(["Country NOC", "Edition ID"])

In [17]:
df_medals = df_unique_results.groupBy("Country NOC", "Edition ID").agg(
    sum("Gold Medals").alias("total_gold"),
    sum("Silver Medals").alias("total_silver"),
    sum("Bronze Medals").alias("total_bronze")
)
df_medals = df_medals.withColumn("total_medals", expr("total_gold + total_silver + total_bronze"))

In [18]:
df_medals.show(5)

[Stage 4:>                                                          (0 + 1) / 1]

+-----------+----------+----------+------------+------------+------------+
|Country NOC|Edition ID|total_gold|total_silver|total_bronze|total_medals|
+-----------+----------+----------+------------+------------+------------+
|        NED|        40|         1|           2|           3|           6|
|        FRA|        41|         0|           0|           1|           1|
|        URS|        17|        29|          32|          30|          91|
|        ROU|        11|         0|           1|           0|           1|
|        GBR|        29|         1|           1|           2|           4|
+-----------+----------+----------+------------+------------+------------+
only showing top 5 rows



                                                                                

In [19]:
df_medals.count()

1785

In [20]:
df_medals.printSchema()

root
 |-- Country NOC: string (nullable = true)
 |-- Edition ID: integer (nullable = true)
 |-- total_gold: long (nullable = true)
 |-- total_silver: long (nullable = true)
 |-- total_bronze: long (nullable = true)
 |-- total_medals: long (nullable = true)



In [21]:
path ="hdfs:///data/silver/Athlete_parquet"
df_athlete = spark.read.parquet(path)

In [22]:
df_athlete.printSchema()

root
 |-- Athlete ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: integer (nullable = true)
 |-- Born: date (nullable = true)
 |-- Height: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Nationality: string (nullable = true)



In [23]:
df_athlete = df_athlete.select(
    col("Athlete ID").alias("bio_athlete_id"),
    "Name",
    "Sex",
    "Born",
    "Height",
    "Weight",
    "Nationality"
)

In [24]:
df_athletes_with_medals = df_fact.join(df_athlete, df_fact["Athlete ID"] == df_athlete["bio_athlete_id"], "left_outer") 

In [25]:
df_athletes_with_medals.show(2)

                                                                                

+----------+-----------+---------+----------+--------+-------------+-----------+-------------+-------------+------------+--------------+--------------+---+----------+------+------+------------------+
|Edition ID|Country NOC|Result ID|Athlete ID|Position|Medal Athlete|Gold Medals|Silver Medals|Bronze Medals|Total Medals|bio_athlete_id|          Name|Sex|      Born|Height|Weight|       Nationality|
+----------+-----------+---------+----------+--------+-------------+-----------+-------------+-------------+------------+--------------+--------------+---+----------+------+------+------------------+
|        23|        KOR|    40745|      1971|       2|       Silver|         12|            5|           12|          29|          1971|Jeong Jae-Heon|  1|1974-06-01|   176|    71| Republic of Korea|
|        23|        KOR|    40745|      1972|      15|     No Medal|         12|            5|           12|          29|          1972| Han Seung-Hun|  1|1973-06-11|   171|    62| Republic of Korea|


In [26]:
df_stats =df_athletes_with_medals.groupBy("Country NOC", "Edition ID").agg(
    count("Athlete ID").alias("total_athletes"), 
    avg("Height").alias("avg_height"), 
    avg("Weight").alias("avg_weight"),  
    sum(when(df_athlete["Sex"] == 1, 1).otherwise(0)).alias("male_athletes"), 
    sum(when(df_athlete["Sex"] == 0, 1).otherwise(0)).alias("female_athletes")
)
df_stats.show(5)



+-----------+----------+--------------+------------------+-----------------+-------------+---------------+
|Country NOC|Edition ID|total_athletes|        avg_height|       avg_weight|male_athletes|female_athletes|
+-----------+----------+--------------+------------------+-----------------+-------------+---------------+
|        NED|        40|            26|178.26923076923077|73.15384615384616|           13|             13|
|        FRA|        41|            44|171.52272727272728|64.95454545454545|           34|             10|
|        URS|        17|           501| 175.3692614770459|72.00998003992017|          368|            133|
|        ROU|        11|           128|         176.03125|           71.125|          126|              2|
|        GBR|        29|            87|176.73684210526315|71.63157894736842|           72|              4|
+-----------+----------+--------------+------------------+-----------------+-------------+---------------+
only showing top 5 rows



                                                                                

In [27]:
df_stats = df_stats.select(
    col("Country NOC").alias("NOC"),
    col("Edition ID").alias("edition_id"),
    "total_athletes",
    "avg_height",
    "avg_weight",
    "male_athletes",
    "female_athletes"
)

In [28]:
df_stats_with_medals = df_stats.join(df_medals, (df_stats["edition_id"] == df_medals["Edition ID"]) & (df_stats["NOC"] == df_medals["Country NOC"])) 

In [29]:
df_stats_with_medals.printSchema()

root
 |-- NOC: string (nullable = true)
 |-- edition_id: integer (nullable = true)
 |-- total_athletes: long (nullable = false)
 |-- avg_height: double (nullable = true)
 |-- avg_weight: double (nullable = true)
 |-- male_athletes: long (nullable = true)
 |-- female_athletes: long (nullable = true)
 |-- Country NOC: string (nullable = true)
 |-- Edition ID: integer (nullable = true)
 |-- total_gold: long (nullable = true)
 |-- total_silver: long (nullable = true)
 |-- total_bronze: long (nullable = true)
 |-- total_medals: long (nullable = true)



In [30]:
path ="hdfs:///data/silver/OlympicsCountry_parquet"
df_country = spark.read.parquet(path)

In [31]:
df_country.columns

['noc', 'country']

In [32]:
df_stats_with_medals_participating_cnt = df_stats_with_medals.join(df_country, (df_stats_with_medals["NOC"] == df_country["noc"])) 

In [33]:
df_stats_with_medals_participating_cnt.columns

['NOC',
 'edition_id',
 'total_athletes',
 'avg_height',
 'avg_weight',
 'male_athletes',
 'female_athletes',
 'Country NOC',
 'Edition ID',
 'total_gold',
 'total_silver',
 'total_bronze',
 'total_medals',
 'noc',
 'country']

In [34]:
path ="hdfs:///data/silver/OlympicsGames_parquet"
df_games = spark.read.parquet(path)

In [35]:
df_games.printSchema()

root
 |-- Edition ID: integer (nullable = true)
 |-- Edition Name: string (nullable = true)
 |-- Edition URL: string (nullable = true)
 |-- Flag URL: string (nullable = true)
 |-- Host City: string (nullable = true)
 |-- Host Country: string (nullable = true)



In [36]:
df_games = df_games.withColumnRenamed("Edition ID", "id")

In [37]:
df_final = df_stats_with_medals_participating_cnt.join(df_games, (df_stats_with_medals_participating_cnt["edition_id"] == df_games["id"])) 

In [38]:
df_final.printSchema()

root
 |-- NOC: string (nullable = true)
 |-- edition_id: integer (nullable = true)
 |-- total_athletes: long (nullable = false)
 |-- avg_height: double (nullable = true)
 |-- avg_weight: double (nullable = true)
 |-- male_athletes: long (nullable = true)
 |-- female_athletes: long (nullable = true)
 |-- Country NOC: string (nullable = true)
 |-- Edition ID: integer (nullable = true)
 |-- total_gold: long (nullable = true)
 |-- total_silver: long (nullable = true)
 |-- total_bronze: long (nullable = true)
 |-- total_medals: long (nullable = true)
 |-- noc: string (nullable = true)
 |-- country: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- Edition Name: string (nullable = true)
 |-- Edition URL: string (nullable = true)
 |-- Flag URL: string (nullable = true)
 |-- Host City: string (nullable = true)
 |-- Host Country: string (nullable = true)



In [39]:
path ="hdfs:///data/silver/Date_parquet"
df_date = spark.read.parquet(path)

In [40]:
df_date.printSchema()

root
 |-- Date_id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- start_day_date: integer (nullable = true)
 |-- start_month_date: integer (nullable = true)
 |-- end_day_date: integer (nullable = true)
 |-- end_month_date: integer (nullable = true)



In [41]:
df_final_with_date = df_final.join(df_date, (df_final["edition_id"] == df_date["Date_id"])) 

In [42]:
df_final_with_date = df_final_with_date.select(
    col("edition_id").alias("EditionID"),
    col("Edition Name").alias("EditionName"),
    col("Flag URL").alias("FlagURL"),
    col("Host Country").alias("HostCountry"),
    col("Host City").alias("HostCity"),
    col("country").alias("ParticipateCountry"),
    col("year").alias("Year"),
    col("avg_height").alias("AverageHeight"),
    col("avg_weight").alias("AverageWeight"),
    col("male_athletes").alias("MaleAthletes"),
    col("female_athletes").alias("FemaleAthletes"),
    col("total_athletes").alias("TotalAthletes"),
    col("total_gold").alias("TotalGold"),
    col("total_silver").alias("TotalSilver"),
    col("total_bronze").alias("TotalBronze"),
    col("total_medals").alias("TotalMedals")
)

In [43]:
df_final_with_date.show(5)

                                                                                

+---------+--------------------+--------------------+-------------+----------------+------------------+----+------------------+-----------------+------------+--------------+-------------+---------+-----------+-----------+-----------+
|EditionID|         EditionName|             FlagURL|  HostCountry|        HostCity|ParticipateCountry|Year|     AverageHeight|    AverageWeight|MaleAthletes|FemaleAthletes|TotalAthletes|TotalGold|TotalSilver|TotalBronze|TotalMedals|
+---------+--------------------+--------------------+-------------+----------------+------------------+----+------------------+-----------------+------------+--------------+-------------+---------+-----------+-----------+-----------+
|       40|1976 Winter Olympics|https://olympedia...|      Austria|       Innsbruck|       Netherlands|1976|178.26923076923077|73.15384615384616|          13|            13|           26|        1|          2|          3|          6|
|       41|1980 Winter Olympics|https://olympedia...|United Stat

In [45]:
df_final_with_date.write.mode("overwrite").parquet("hdfs:///data/gold/BusinessLogic_Parquet")

                                                                                

In [46]:
path ="hdfs:///data/gold/BusinessLogic_Parquet"
df_fact = spark.read.parquet(path)

In [47]:
df_fact.show(5)

+---------+--------------------+--------------------+-------------+----------------+------------------+----+------------------+-----------------+------------+--------------+-------------+---------+-----------+-----------+-----------+
|EditionID|         EditionName|             FlagURL|  HostCountry|        HostCity|ParticipateCountry|Year|     AverageHeight|    AverageWeight|MaleAthletes|FemaleAthletes|TotalAthletes|TotalGold|TotalSilver|TotalBronze|TotalMedals|
+---------+--------------------+--------------------+-------------+----------------+------------------+----+------------------+-----------------+------------+--------------+-------------+---------+-----------+-----------+-----------+
|       40|1976 Winter Olympics|https://olympedia...|      Austria|       Innsbruck|       Netherlands|1976|178.26923076923077|73.15384615384616|          13|            13|           26|        1|          2|          3|          6|
|       41|1980 Winter Olympics|https://olympedia...|United Stat

In [None]:
# df_final.write.format("hive").mode("overwrite").saveAsTable("gold.BusinessLogic")

In [48]:
spark.sql("drop table gold.BusinessLogic;")

DataFrame[]

In [49]:
spark.sql("""
CREATE TABLE gold.BusinessLogic (
  EditionID INT,
  EditionName STRING,
  FlagURL STRING,
  HostCountry STRING,
  HostCity STRING,
  ParticipateCountry STRING,
  Year INT,
  AverageHeight DOUBLE,
  AverageWeight DOUBLE,
  MaleAthletes INT,
  FemaleAthletes INT,
  TotalAthletes INT,
  TotalGold INT,
  TotalSilver INT,
  TotalBronze INT,
  TotalMedals INT
)
STORED AS PARQUET
LOCATION 'hdfs:///data/gold/BusinessLogic_Parquet';
""")


25/04/09 01:08:51 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


DataFrame[]