In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_extract,count,when,avg,lag,format_number,concat, lit,row_number,concat_ws,collect_list,expr
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType


In [2]:
# 创建SparkSession
spark = SparkSession.builder \
    .appName("Athlete Events Data Import") \
    .getOrCreate()

# 定义模式
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Sex", StringType(), True),
    StructField("Age", DoubleType(), True),
    StructField("Height", DoubleType(), True),
    StructField("Weight", DoubleType(), True),
    StructField("Team", StringType(), True),
    StructField("NOC", StringType(), True),
    StructField("Games", StringType(), True),
    StructField("Year", IntegerType(), True),
    StructField("Season", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Sport", StringType(), True),
    StructField("Event", StringType(), True),
    StructField("Medal", StringType(), True)
])

# 文件路径
csv_file_path = "E:/hku/cloud cluster/ex4/dataset/athlete_events.csv"

# 读取CSV文件
spark_df = spark.read.csv(csv_file_path, schema=schema, header=True)
spark_df = spark_df.filter((spark_df.Games.contains('Summer')) & (spark_df.Year >= 1956))

# 打印模式
spark_df.printSchema()

# 显示前五行数据
spark_df.show(5)


root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Height: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Team: string (nullable = true)
 |-- NOC: string (nullable = true)
 |-- Games: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Season: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- Medal: string (nullable = true)

+---+------------------+---+----+------+------+-------+---+-----------+----+------+---------+----------+--------------------+-----+
| ID|              Name|Sex| Age|Height|Weight|   Team|NOC|      Games|Year|Season|     City|     Sport|               Event|Medal|
+---+------------------+---+----+------+------+-------+---+-----------+----+------+---------+----------+--------------------+-----+
|  1|         A Dijiang|  M|24.0| 180.0|  80.0|  Ch

In [3]:
# 打印 Schema
spark_df.printSchema()


# 获取数值列的统计信息aa
spark_df.describe().show()

# 计算总行数
total_rows = spark_df.count()
print(f"Total Rows: {total_rows}")

# 打印 DataFrame 的列名和数据类型
print("DataFrame 的列名和数据类型:")
print(spark_df.dtypes)

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Height: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Team: string (nullable = true)
 |-- NOC: string (nullable = true)
 |-- Games: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Season: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- Medal: string (nullable = true)

+-------+-----------------+--------+------+------------------+------------------+-----------------+---------+------+-----------+------------------+------+------+---------+--------------------+------+
|summary|               ID|    Name|   Sex|               Age|            Height|           Weight|     Team|   NOC|      Games|              Year|Season|  City|    Sport|               Event| Medal|
+-------+-----------------+--------+------+----

# 第一个eda问题：Distribution and development trends of sports participation between male and female athletes

In [4]:
# 1. 按性别和年份分组，计算每组的运动员人数
gender_trend_df = spark_df.groupBy("Sex", "Year").agg(count("ID").alias("Participant_Count"))

# 2. 对结果进行排序
sorted_gender_trend_df = gender_trend_df.orderBy("Year", "Sex")

# 查看前21行
sorted_gender_trend_df.show(21)


+---+----+-----------------+
|Sex|Year|Participant_Count|
+---+----+-----------------+
|  F|1956|              891|
|  M|1956|             4208|
|  F|1960|             1422|
|  M|1960|             6660|
|  F|1964|             1336|
|  M|1964|             6326|
|  F|1968|             1767|
|  M|1968|             6786|
|  F|1972|             2179|
|  M|1972|             8090|
|  F|1976|             2164|
|  M|1976|             6457|
|  F|1980|             1755|
|  M|1980|             5435|
|  F|1984|             2442|
|  M|1984|             6984|
|  F|1988|             3535|
|  M|1988|             8473|
|  F|1992|             4114|
|  M|1992|             8832|
|  F|1996|             4998|
+---+----+-----------------+
only showing top 21 rows



In [5]:
# 1. 按性别和年份分组，计算每组的运动员人数
gender_trend_df = spark_df.groupBy("Sex", "Year").agg(
    count("ID").alias("Participant_Count")
)

# 2. 计算年增长率和参与比例
window_spec = Window.partitionBy("Sex").orderBy("Year")
gender_trend_df = gender_trend_df.withColumn(
    "Prev_Year_Participant", lag("Participant_Count").over(window_spec)
).withColumn(
    "Growth_Rate", 
    ((col("Participant_Count") - col("Prev_Year_Participant")) / col("Prev_Year_Participant") * 100).cast("decimal(10,3)")
).withColumn(
    "Growth_Rate", concat(col("Growth_Rate"), lit("%"))
)

# 3. 计算每年的总参与人数和参与比例
total_participants_df = spark_df.groupBy("Year").agg(count("ID").alias("Total_Participants"))
gender_ratio_df = gender_trend_df.join(total_participants_df, on="Year").withColumn(
    "Participation_Ratio", (col("Participant_Count") / col("Total_Participants") * 100).cast("decimal(10,3)")
).withColumn(
    "Participation_Ratio", concat(col("Participation_Ratio"), lit("%"))
)


In [6]:
# 捕获物理计划
gender_ratio_df.explain()

# 4. 展示所有数据
gender_ratio_df.orderBy("Year", "Sex").show(n=200000, truncate=False)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Year#9, Sex#2, Participant_Count#996L, Prev_Year_Participant#1000L, Growth_Rate#1011, Total_Participants#1033L, concat(cast(cast(((cast(Participant_Count#996L as double) / cast(Total_Participants#1033L as double)) * 100.0) as decimal(10,3)) as string), %) AS Participation_Ratio#1066]
   +- BroadcastHashJoin [Year#9], [Year#1045], Inner, BuildRight, false
      :- Project [Sex#2, Year#9, Participant_Count#996L, Prev_Year_Participant#1000L, concat(cast(cast(((cast((Participant_Count#996L - Prev_Year_Participant#1000L) as double) / cast(Prev_Year_Participant#1000L as double)) * 100.0) as decimal(10,3)) as string), %) AS Growth_Rate#1011]
      :  +- Window [lag(Participant_Count#996L, -1, null) windowspecdefinition(Sex#2, Year#9 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS Prev_Year_Participant#1000L], [Sex#2], [Year#9 ASC NULLS FIRST]
      :     +- Sort [Sex#2 ASC NULLS FIRST, Year#9 ASC NULLS FIRST], fal



Inference: Even though the overall trend of both the graphs is on the rise. However, after the 1996 we see that there was a slight dip in the number of male participants.


# Inference:

# 第二个EDA问题：dentification of the sports infrastructure and traditional strong events of various countries

In [7]:
medal_counts = spark_df.groupBy("Year", "NOC").agg(count(when(col("Medal").isNotNull(), 1)).alias("MedalCount"))

# 使用窗口函数找出每年奖牌总数排名前十的国

windowSpec = Window.partitionBy("Year").orderBy(col("MedalCount").desc())
top_10_countries = medal_counts.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 10).drop("rank")

# 将结果显示为每年奖牌总数排名前十的国家和对应的奖牌数
result = top_10_countries.groupBy("Year").agg(concat_ws(", ", collect_list(concat_ws("-", col("NOC"), col("MedalCount")))).alias("Top10Countries"))

# 展示结果
result.show(20, truncate=False)

+----+----------------------------------------------------------------------------------------+
|Year|Top10Countries                                                                          |
+----+----------------------------------------------------------------------------------------+
|1956|USA-421, AUS-412, URS-411, GBR-283, GER-261, FRA-226, JPN-222, ITA-207, HUN-197, SWE-184|
|1960|GER-424, URS-414, USA-405, ITA-402, GBR-383, FRA-361, HUN-312, POL-307, JPN-278, AUS-277|
|1964|GER-478, URS-463, JPN-456, USA-455, AUS-391, HUN-319, GBR-300, ITA-256, KOR-240, POL-240|
|1968|USA-496, URS-471, MEX-446, FRG-431, GDR-346, GBR-328, FRA-311, HUN-303, POL-296, CAN-291|
|1972|FRG-584, USA-551, URS-531, GDR-454, GBR-433, POL-418, HUN-367, FRA-356, ITA-345, CAN-344|
|1976|URS-574, USA-538, CAN-531, FRG-450, GDR-407, GBR-351, JPN-328, ITA-312, FRA-302, HUN-288|
|1980|URS-660, GDR-495, POL-418, BUL-397, HUN-392, ROU-339, GBR-328, TCH-317, CUB-288, ESP-211|
|1984|USA-671, FRG-571, CAN-569, GBR-509

In [8]:
sport_leaders = spark_df.groupBy("NOC", "Sport").agg(count(when(col("Medal").isNotNull(), 1)).alias("MedalCount"))

# 找出每项运动的主导国家
windowSpec = Window.partitionBy("Sport").orderBy(col("MedalCount").desc())
sport_leaders = sport_leaders.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") == 1).drop("rank")

# 合并同一个国家的所有主导项目
from pyspark.sql.functions import collect_list

merged_sport_leaders = sport_leaders.groupBy("NOC").agg(concat_ws(", ", collect_list(concat_ws("-", col("Sport"), col("MedalCount")))).alias("DominantSports"))

# 展示结果
merged_sport_leaders.show(40, truncate=False)

+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|NOC|DominantSports                                                                                                                                                                                                |
+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|POL|Weightlifting-113                                                                                                                                                                                             |
|BRA|Football-295, Volleyball-285                                                                                                                   

# 第三个EDA问题：The Relationship Between Height Vs Weight Vs Age of Participants Across Sports

In [9]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 创建通用区间函数
def create_bin(column, bins, labels):
    bin_expr = F.when((column >= bins[0]) & (column < bins[1]), labels[0])
    for i in range(1, len(bins) - 1):
        bin_expr = bin_expr.when((column >= bins[i]) & (column < bins[i+1]), labels[i])
    return bin_expr

# 定义区间和标签
age_bins = [10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80]
age_labels = ["10-15", "15-20", "20-25", "25-30", "30-35", "35-40", "40-45", "45-50", "50-55", "55-60", "60-65", "65-70", "70-75", "75-80"]

height_bins = [120, 125, 130, 135, 140, 145, 150, 155, 160, 165, 170, 175, 180, 185, 190, 195, 200, 205, 210, 215, 220]
height_labels = ["120-125", "125-130", "130-135", "135-140", "140-145", "145-150", "150-155", "155-160", "160-165", "165-170", "170-175", "175-180", "180-185", "185-190", "190-195", "195-200", "200-205", "205-210", "210-215", "215-220"]

weight_bins = [30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150]
weight_labels = ["30-40", "40-50", "50-60", "60-70", "70-80", "80-90", "90-100", "100-110", "110-120", "120-130", "130-140", "140-150"]

# 添加区间列
spark_df = spark_df.withColumn("Age_Bin", create_bin(F.col("Age"), age_bins, age_labels))
spark_df = spark_df.withColumn("Height_Bin", create_bin(F.col("Height"), height_bins, height_labels))
spark_df = spark_df.withColumn("Weight_Bin", create_bin(F.col("Weight"), weight_bins, weight_labels))

# 过滤出金牌选手
gold_medalists = spark_df.filter(spark_df["Medal"] == "Gold")

# 分组统计不同运动的最多夺冠年龄区间、身高区间和体重区间
age_mode = gold_medalists.groupBy("Sport", "Age_Bin").count().withColumnRenamed("count", "Age_Count")
height_mode = gold_medalists.groupBy("Sport", "Height_Bin").count().withColumnRenamed("count", "Height_Count")
weight_mode = gold_medalists.groupBy("Sport", "Weight_Bin").count().withColumnRenamed("count", "Weight_Count")

# 找到每个运动最多夺冠的年龄区间
age_mode = age_mode.withColumn("Row_Number", F.row_number().over(Window.partitionBy("Sport").orderBy(F.desc("Age_Count"))))
age_mode = age_mode.filter(age_mode["Row_Number"] == 1).drop("Row_Number")

# 找到每个运动最多夺冠的身高区间
height_mode = height_mode.withColumn("Row_Number", F.row_number().over(Window.partitionBy("Sport").orderBy(F.desc("Height_Count"))))
height_mode = height_mode.filter(height_mode["Row_Number"] == 1).drop("Row_Number")

# 找到每个运动最多夺冠的体重区间
weight_mode = weight_mode.withColumn("Row_Number", F.row_number().over(Window.partitionBy("Sport").orderBy(F.desc("Weight_Count"))))
weight_mode = weight_mode.filter(weight_mode["Row_Number"] == 1).drop("Row_Number")

# 合并结果
result = age_mode.join(height_mode, on="Sport").join(weight_mode, on="Sport")

# 选择需要的列并重命名
result = result.select("Sport", "Age_Bin", "Height_Bin", "Weight_Bin")



result.show()

+-------------------+-------+----------+----------+
|              Sport|Age_Bin|Height_Bin|Weight_Bin|
+-------------------+-------+----------+----------+
|            Archery|  20-25|   165-170|     70-80|
|          Athletics|  20-25|   180-185|     60-70|
|          Badminton|  25-30|   175-180|     60-70|
|           Baseball|  25-30|   185-190|     80-90|
|         Basketball|  20-25|   190-195|    90-100|
|   Beach Volleyball|  30-35|   190-195|     70-80|
|             Boxing|  20-25|   165-170|     60-70|
|           Canoeing|  20-25|   180-185|     80-90|
|            Cycling|  20-25|   180-185|     70-80|
|             Diving|  20-25|   160-165|     50-60|
|      Equestrianism|  30-35|   170-175|     60-70|
|            Fencing|  25-30|   175-180|     70-80|
|           Football|  20-25|   170-175|     70-80|
|               Golf|  25-30|   165-170|     60-70|
|         Gymnastics|  20-25|   160-165|     50-60|
|           Handball|  25-30|   180-185|     70-80|
|           

# 第四个EDA问题：Do host countries win significantly more medals compared to the Olympics before and after they host?

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, greatest

# 假设你已经创建了SparkSession，并导入了spark_df
# spark = SparkSession.builder.appName("Olympic Analysis").getOrCreate()

# 筛选出有奖牌的记录
medals_df = spark_df.filter(col("Medal") != 'NA')

# 初始化主办国家字典
host_countries = {
    1956: 'AUS', 1960: 'ITA', 1964: 'JPN', 1968: 'MEX', 1972: 'FRG', 1976: 'CAN',
    1980: 'URS', 1984: 'USA', 1988: 'KOR', 1992: 'ESP', 1996: 'USA', 2000: 'AUS',
    2004: 'GRE', 2008: 'CHN', 2012: 'GBR', 2016: 'BRA'
}

# 创建一个临时视图以便使用SQL查询
medals_df.createOrReplaceTempView("medals")

# 结果列表
results = []

# 计算每一届主办国的奖牌数和前后两届非主办时期的奖牌数
for year, country in host_countries.items():
    query = f"""
        SELECT
            {year} as Year,
            '{country}' as Country,
            COUNT(CASE WHEN NOC = '{country}' AND Year = {year} THEN Event END) as Medals_host,
            COUNT(CASE WHEN NOC = '{country}' AND Year = {year-4} THEN Event END) as Medals_nothost1,
            COUNT(CASE WHEN NOC = '{country}' AND Year = {year-8} THEN Event END) as Medals_nothost2,
            COUNT(CASE WHEN NOC = '{country}' AND Year = {year+4} THEN Event END) as Medals_nothost3,
            COUNT(CASE WHEN NOC = '{country}' AND Year = {year+8} THEN Event END) as Medals_nothost4
        FROM medals
    """
    result = spark.sql(query)
    results.append(result)

# 将所有结果合并到一个DataFrame中
final_df = results[0]
for df in results[1:]:
    final_df = final_df.union(df)

# 新增一列，计算主办时期的奖牌数比非主办时期的最大值多百分之多少
final_df = final_df.withColumn(
    "Medals_bigger%",
    (col("Medals_host") - greatest("Medals_nothost1", "Medals_nothost2", "Medals_nothost3", "Medals_nothost4"))
    / col("Medals_host") * 100
)

final_df.show()


+----+-------+-----------+---------------+---------------+---------------+---------------+-------------------+
|Year|Country|Medals_host|Medals_nothost1|Medals_nothost2|Medals_nothost3|Medals_nothost4|     Medals_bigger%|
+----+-------+-----------+---------------+---------------+---------------+---------------+-------------------+
|1956|    AUS|         67|              0|              0|             46|             44| 31.343283582089555|
|1960|    ITA|         88|             47|              0|             51|             33|  42.04545454545455|
|1964|    JPN|         62|             31|             24|             63|             56|-1.6129032258064515|
|1968|    MEX|          9|              1|              1|              1|              2|  77.77777777777779|
|1972|    FRG|        102|             51|              0|             77|              0| 24.509803921568626|
|1976|    CAN|         23|             11|             10|              0|             85| -269.5652173913044|
|