In [3]:
## Spark conf
from pyspark import SparkConf
from pyspark.sql import  SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [4]:
sparkConf = SparkConf()
sparkConf.setAll(
    [
        ("spark.app.name", "olmypics spark app"),
        ("spark.master", "local[3]")
    ]
)

<pyspark.conf.SparkConf at 0x27a5a7a3ef0>

In [5]:
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

## Loading data

In [6]:
athletesSchema = StructType(
    [
        StructField("id", IntegerType(), False),
        StructField("name", StringType(), False),
        StructField("sex", StringType(), False),
        StructField("height", IntegerType(), True),
        StructField("weight", IntegerType(), True),
        StructField("team", StringType(), False)
    ]
)
# id,name,sex,height,weight,team

In [7]:
athletes_df = spark.read.format("csv").option("header", True).schema(athletesSchema).option("enforceSchema", True).option("path", r"D:\SQL telegram\Project 2 olympic history\athletes.csv").load()

In [8]:
athletes_df.show(5, truncate=False)

+---+------------------------+---+------+------+--------------+
|id |name                    |sex|height|weight|team          |
+---+------------------------+---+------+------+--------------+
|1  |A Dijiang               |M  |180   |80    |China         |
|2  |A Lamusi                |M  |170   |60    |China         |
|3  |Gunnar Nielsen Aaby     |M  |NULL  |NULL  |Denmark       |
|4  |Edgar Lindenau Aabye    |M  |NULL  |NULL  |Denmark/Sweden|
|5  |Christine Jacoba Aaftink|F  |185   |82    |Netherlands   |
+---+------------------------+---+------+------+--------------+
only showing top 5 rows



In [9]:
athletes_processed_df = athletes_df.filter("team is not null").cache()

In [10]:
print(athletes_processed_df.printSchema())
athletes_processed_df.columns

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- height: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- team: string (nullable = true)

None


['id', 'name', 'sex', 'height', 'weight', 'team']

In [11]:
athletes_processed_df.show(5, truncate=False)

+---+------------------------+---+------+------+--------------+
|id |name                    |sex|height|weight|team          |
+---+------------------------+---+------+------+--------------+
|1  |A Dijiang               |M  |180   |80    |China         |
|2  |A Lamusi                |M  |170   |60    |China         |
|3  |Gunnar Nielsen Aaby     |M  |NULL  |NULL  |Denmark       |
|4  |Edgar Lindenau Aabye    |M  |NULL  |NULL  |Denmark/Sweden|
|5  |Christine Jacoba Aaftink|F  |185   |82    |Netherlands   |
+---+------------------------+---+------+------+--------------+
only showing top 5 rows



In [12]:
athletes_processed_df.columns

['id', 'name', 'sex', 'height', 'weight', 'team']

In [13]:
athletes_processed_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(f"{c}_null_count") for c in athletes_processed_df.columns]).show()

+-------------+---------------+--------------+-----------------+-----------------+---------------+
|id_null_count|name_null_count|sex_null_count|height_null_count|weight_null_count|team_null_count|
+-------------+---------------+--------------+-----------------+-----------------+---------------+
|            0|              0|             0|            34082|            35191|              0|
+-------------+---------------+--------------+-----------------+-----------------+---------------+



In [14]:
athlete_events_df = spark.read.format("csv").option("header", True).option("inferSchema", True).option("path", r"D:\SQL telegram\Project 2 olympic history\athlete_events.csv").load()
athlete_events_processed_df = athlete_events_df.drop_duplicates().cache()

In [15]:
athlete_events_processed_df.printSchema()

root
 |-- athlete_id: integer (nullable = true)
 |-- games: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- season: string (nullable = true)
 |-- city: string (nullable = true)
 |-- sport: string (nullable = true)
 |-- event: string (nullable = true)
 |-- medal: string (nullable = true)



In [16]:
athlete_events_processed_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(f"{c}_null_count") for c in athlete_events_processed_df.columns]).show()

+---------------------+----------------+---------------+-----------------+---------------+----------------+----------------+----------------+
|athlete_id_null_count|games_null_count|year_null_count|season_null_count|city_null_count|sport_null_count|event_null_count|medal_null_count|
+---------------------+----------------+---------------+-----------------+---------------+----------------+----------------+----------------+
|                    0|               0|              0|                0|              0|               0|               0|               0|
+---------------------+----------------+---------------+-----------------+---------------+----------------+----------------+----------------+



## Processing

#### which team has won the maximum gold medals over the years.

In [17]:
joined_athletes_and_events_df = athlete_events_processed_df.join(athletes_processed_df, athletes_processed_df.id==athlete_events_df.athlete_id, "right")

In [18]:
joined_athletes_and_events_df.cache()

DataFrame[athlete_id: int, games: string, year: int, season: string, city: string, sport: string, event: string, medal: string, id: int, name: string, sex: string, height: int, weight: int, team: string]

In [19]:
joined_athletes_and_events_df.select("team", "medal").filter(F.col("medal") != "NA").groupBy("team").agg(F.count("medal").alias("medal_count")).orderBy(F.col("medal_count").desc()).limit(5).show()

+-------------+-----------+
|         team|medal_count|
+-------------+-----------+
|United States|       5131|
| Soviet Union|       2604|
|      Germany|       1887|
|Great Britain|       1698|
|       France|       1558|
+-------------+-----------+



In [20]:
print(athlete_events_processed_df.rdd.getNumPartitions())
print(athletes_processed_df.rdd.getNumPartitions())
print(joined_athletes_and_events_df.rdd.getNumPartitions())

200
2
200


#### for each team print total silver medals and year in which they won maximum silver medal..output 3 columns
#### team,total_silver_medals, year_of_max_silver

In [21]:
year_of_max_silver_wind = Window.partitionBy("team").orderBy(F.desc("silver_medal_count"))

In [22]:
team_wise_silver_df = joined_athletes_and_events_df.select("team", "year", "medal")\
    .filter(F.col("medal") == "Silver")\
    .groupBy("team", "year")\
    .agg(F.count("medal").alias("silver_medal_count"))\
    .withColumn("year_of_max_silver", F.first_value("year").over(year_of_max_silver_wind))\
    .groupBy("team")\
    .agg(F.sum("silver_medal_count").alias("total_silver_medals"), F.min("year_of_max_silver").alias("year_of_max_silver"))

In [23]:
team_wise_silver_df.orderBy(F.desc("total_silver_medals")).show(truncate=False)

+--------------+-------------------+------------------+
|team          |total_silver_medals|year_of_max_silver|
+--------------+-------------------+------------------+
|United States |1507               |1984              |
|Soviet Union  |766                |1980              |
|Germany       |603                |1936              |
|Great Britain |590                |1908              |
|France        |524                |1920              |
|Italy         |507                |2016              |
|Sweden        |479                |1912              |
|Australia     |453                |2004              |
|Canada        |405                |1984              |
|China         |332                |1996              |
|Norway        |330                |1920              |
|Hungary       |329                |1972              |
|East Germany  |325                |1980              |
|Netherlands   |319                |2004              |
|Japan         |307                |2012        

#### which player has won maximum gold medals  amongst the players 
#### which have won only gold medal (never won silver or bronze) over the years

In [24]:
athlete_with_silver_or_bronze = athlete_events_processed_df.where(F.col("medal").isin("Silver", "Bronze")).select("athlete_id")

In [25]:
result = athlete_with_silver_or_bronze.collect()
temp = set([result[i].asDict()["athlete_id"] for i in range(len(result))])

In [26]:
athlete_with_gold_only = athlete_events_processed_df.select("athlete_id", "medal")\
    .where((F.col("medal") == "Gold") & (~F.col("athlete_id").isin(temp)))

In [27]:
athlete_with_gold_only.cache()

DataFrame[athlete_id: int, medal: string]

In [28]:
athlete_with_max_gold_over_years = athlete_with_gold_only.groupBy("athlete_id").agg(F.count("medal").alias("gold_count")).orderBy(F.desc("gold_count")).limit(3)

In [29]:
athlete_with_max_gold_over_years.show()

+----------+----------+
|athlete_id|gold_count|
+----------+----------+
|     33557|        10|
|     13029|         8|
|     84026|         6|
+----------+----------+



#### in each year which player has won maximum gold medal. Write a query to print year,player name 
#### and no of golds won in that year . In case of a tie print comma separated player names.

In [30]:
gold_ranker_wind = Window.partitionBy("year").orderBy(F.desc("gold_medal_count"))

In [31]:
player_with_max_gold = joined_athletes_and_events_df.where(F.col("medal") == "Gold").select("athlete_id", "name", "year", "medal")\
    .groupBy("year", "athlete_id", "name")\
    .agg(F.count("medal").alias("gold_medal_count"))\
    .withColumn("gold_ranker", F.dense_rank().over(gold_ranker_wind))\
    .where(F.col("gold_ranker") == 1)\
    .groupBy("year", "gold_medal_count").agg(F.concat_ws(", ", F.collect_list(F.col("name"))).alias("player_name"))\
    .select("year","player_name", "gold_medal_count")

In [32]:
player_with_max_gold.show(truncate=False)

+----+-----------------------------------------------------------------------------------------------------------------------------------+----------------+
|year|player_name                                                                                                                        |gold_medal_count|
+----+-----------------------------------------------------------------------------------------------------------------------------------+----------------+
|1896|Carl Schuhmann                                                                                                                     |4               |
|1900|"Alvin Christian ""Al"" Kraenzlein"                                                                                                |4               |
|1904|Anton Heida                                                                                                                        |5               |
|1906|Francesco Verri, Emilio Fontanella, Giorgio Cesana, "Maxim

#### in which event and year India has won its first gold medal,first silver medal and first bronze medal
#### print 3 columns medal,year,sport

In [33]:
joined_athletes_and_events_df.where((F.col("team") == "India") & (F.col("medal") != "NA"))\
    .withColumn("medal_year", F.dense_rank().over(Window.partitionBy("medal").orderBy("year")))\
    .where(F.col("medal_year") == 1)\
    .select("team", "medal", "year", "sport").distinct().show()

+-----+------+----+---------+
| team| medal|year|    sport|
+-----+------+----+---------+
|India|Bronze|1952|Wrestling|
|India|  Gold|1924| Alpinism|
|India|Silver|1900|Athletics|
+-----+------+----+---------+



#### find players who won gold medal in summer and winter olympics both.

In [34]:
athlete_events_processed_df.where(F.col("medal") == "Gold")\
    .select("athlete_id", "season")\
    .groupBy("athlete_id")\
    .agg(F.count("season").alias("season_count_for_gold"))\
    .where(F.col("season_count_for_gold") == 2).select("athlete_id").orderBy("athlete_id").show()

+----------+
|athlete_id|
+----------+
|        73|
|       107|
|       108|
|       404|
|       583|
|       705|
|       832|
|       846|
|       977|
|       980|
|      1169|
|      1173|
|      1380|
|      1483|
|      1554|
|      2469|
|      2486|
|      2511|
|      2723|
|      2785|
+----------+
only showing top 20 rows



#### find players who won gold, silver and bronze medal in a single olympics. print player name along with year.

In [35]:
joined_athletes_and_events_df.where(F.col("medal") != "NA")\
    .select("athlete_id", "name", "games", "medal")\
    .groupBy("athlete_id", "name", "games")\
    .agg(F.count("medal").alias("medal_count"))\
    .where(F.col("medal_count") == 3)\
    .withColumn("year", F.substring(F.col("games"), 1, 4))\
    .select("name", "year").distinct().show()

+--------------------+----+
|                name|year|
+--------------------+----+
|         Karin Seick|1984|
|        Sandra Vlker|1996|
|"Jennifer Elisabe...|1996|
| Frank Sherman Henry|1948|
|       Li Xiaoshuang|1996|
|Carly Rae Patters...|2004|
|        Enrico Bruna|1906|
|Renate Stecher (M...|1972|
|Elaine Tanner (-W...|1968|
|"Adrie ""Ard"" Sc...|1972|
|Erika Zuchold (Ba...|1972|
|            Yang Wei|2008|
|Paul Louis Eugne ...|1920|
|Marcel Carlos Pau...|1906|
|      Julius Lenhart|1904|
|Dara Grace Torres...|2008|
|Nikolay Yefimovic...|1972|
|        Jon C. Olsen|1992|
|Atje Keulen-Deelstra|1972|
|"John ""Johnny"" ...|2010|
+--------------------+----+
only showing top 20 rows



#### find players who have won gold medals in consecutive 3 summer olympics in the same event.
###### Consider only olympics 2000 onwards. Assume summer olympics happens every 4 year starting 2000

In [36]:
cons_year = 3
a = 4 * cons_year
lead_year = cons_year - 1
athlete_events_processed_df.where((F.col("year") >= 2000) & (F.col("year") % 4 == 0) & (F.col("season") == "Summer") & (F.col("medal") == "Gold"))\
    .withColumn("3rd_oly_year", F.lead("year", cons_year).over(Window.partitionBy("athlete_id", "event").orderBy("year")))\
    .where(F.col("3rd_oly_year") == F.col("year") + a)\
    .select("athlete_id", "event").distinct().show(truncate=False)

+----------+---------------------------------------------+
|athlete_id|event                                        |
+----------+---------------------------------------------+
|11671     |Basketball Women's Basketball                |
|19044     |Basketball Women's Basketball                |
|70965     |Swimming Men's 4 x 200 metres Freestyle Relay|
|94406     |Swimming Men's 200 metres Individual Medley  |
|94406     |Swimming Men's 4 x 100 metres Medley Relay   |
|94406     |Swimming Men's 4 x 200 metres Freestyle Relay|
|118778    |Basketball Women's Basketball                |
|131805    |Diving Women's Synchronized Springboard      |
+----------+---------------------------------------------+



In [37]:
spark.stop()