<a href="https://colab.research.google.com/github/MatheusMGirardi/py-spark-olympic-athlete-data/blob/main/PySpark_big_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import shutil
import os

In [2]:
!pip install -q findspark
!pip install pyspark



In [3]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession.builder.appName("TDE4").master("local").getOrCreate()

#### 1. CSV loading

In [4]:
df = spark.read.csv("/content/athletes.csv")
rdd = df.rdd

In [5]:
for row in rdd.take(2):
    print(row)

Row(_c0='ID', _c1='Name', _c2='Sex', _c3='Age', _c4='Height', _c5='Weight', _c6='Team', _c7='NOC', _c8='Games', _c9='Year', _c10='Season', _c11='City', _c12='Sport', _c13='Event', _c14='Medal')
Row(_c0='1', _c1='A Dijiang', _c2='M', _c3='24', _c4='180', _c5='80', _c6='China', _c7='CHN', _c8='1992 Summer', _c9='1992', _c10='Summer', _c11='Barcelona', _c12='Basketball', _c13="Basketball Men's Basketball", _c14='NA')


In [6]:
# remove o header
rdd = rdd.zipWithIndex().filter(lambda row_index: row_index[1] > 0).map(lambda row_index: row_index[0])

#### 1.1 Contagem de Atletas por Ano

In [7]:
consulta1_1m = rdd.map(lambda row: ((row._c9), 1))
consulta1_1r = consulta1_1m.reduceByKey(lambda a,b: a + b)

In [8]:
results = consulta1_1r.take(10)
for result in results:
    print(result)

('1992', 10083)
('2012', 7958)
('1920', 2655)
('1900', 1299)
('1988', 9120)
('1994', 1852)
('1932', 2150)
('2002', 2409)
('1952', 5727)
('1980', 5431)


In [23]:
output_dir = "out/consulta1_1"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

In [24]:
consulta1_1r.saveAsTextFile(output_dir)

### 1.2 Contagem de Atletas por esporte

In [11]:
consulta1_2m = rdd.map(lambda row: ((row._c12), 1))
consulta1_2r = consulta1_2m.reduceByKey(lambda a,b: a + b) #soma os val

In [12]:
results = consulta1_2r.take(10)
for result in results:
    print(result)

('Basketball', 2843)
('Judo', 2347)
('Football', 4249)
('Tug-Of-War', 104)
('Speed Skating', 3378)
('Cross Country Skiing', 5416)
('Athletics', 23978)
('Ice Hockey', 3236)
('Swimming', 14148)
('Badminton', 865)


In [25]:
output_dir = "out/consulta1_2"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

In [26]:
consulta1_2r.saveAsTextFile(output_dir)

### 1.3 Média de Idade dos Atletas por Gênero

In [27]:
filtered_rdd = rdd.filter(lambda row: row._c3 != 'NA' and row._c3 is not None)

consulta1_3m = filtered_rdd.map(lambda row: (row._c2, (int(row._c3), 1)))

consulta1_3r = consulta1_3m.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

consulta1_3_avg = consulta1_3r.mapValues(lambda x: x[0] / x[1])

results = consulta1_3_avg.collect()

for gender, avg_age in results:
    print(f"Gender: {gender}, Average Age: {avg_age}")

Gender: M, Average Age: 26.26023675862484
Gender: F, Average Age: 23.739007700819766


In [17]:
output_dir = "out/consulta1_3"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

consulta1_3_avg.saveAsTextFile(output_dir)

### 1.4 Contagem de medalhas por país

In [18]:
consulta1_4m = rdd.filter(lambda row: row._c14 in ['Gold', 'Silver', 'Bronze']).map(lambda row: (row._c7, 1))

consulta1_4r = consulta1_4m.reduceByKey(lambda a, b: a + b)

results = consulta1_4r.take(10)

for country, total_medals in results:
    print(f"Country: {country}, Total Medals: {total_medals}")

Country: DEN, Total Medals: 385
Country: FIN, Total Medals: 461
Country: NOR, Total Medals: 654
Country: NED, Total Medals: 516
Country: FRA, Total Medals: 1298
Country: ITA, Total Medals: 1026
Country: ESP, Total Medals: 329
Country: AZE, Total Medals: 24
Country: RUS, Total Medals: 658
Country: BLR, Total Medals: 82


In [19]:
output_dir = "out/consulta1_4"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

consulta1_4r.saveAsTextFile(output_dir)

### 2.1 Contagem de Medalhas por País e Ano

In [28]:
# Mapper: Emit (country, year) and 1 for each medal won (Gold, Silver, Bronze)
consulta2_1m = rdd.filter(lambda row: row._c14 in ['Gold', 'Silver', 'Bronze']).map(lambda row: ((row._c7, row._c9), 1))

consulta2_1r = consulta2_1m.reduceByKey(lambda a, b: a + b)

results = consulta2_1r.take(10)

# Display the results
for (country, year), total_medals in results:
    print(f"Country: {country}, Year: {year}, Total Medals: {total_medals}")

Country: DEN, Year: 1900, Total Medals: 3
Country: FIN, Year: 1920, Total Medals: 23
Country: FIN, Year: 2014, Total Medals: 18
Country: FIN, Year: 1948, Total Medals: 24
Country: FIN, Year: 1952, Total Medals: 30
Country: NOR, Year: 1992, Total Medals: 25
Country: NOR, Year: 1994, Total Medals: 18
Country: NOR, Year: 2002, Total Medals: 24
Country: NOR, Year: 2006, Total Medals: 19
Country: NOR, Year: 2008, Total Medals: 11


In [29]:
output_dir = "out/consulta2_1"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

consulta2_1r.saveAsTextFile(output_dir)

### 2.2. Top 3 Atletas com Mais Medalhas

In [30]:
# Mapper: Emit (atleta, 1) for each medal won (Gold, Silver, Bronze)
consulta2_2m = rdd.filter(lambda row: row._c14 in ['Gold', 'Silver', 'Bronze']).map(lambda row: (row._c1, 1))

consulta2_2r = consulta2_2m.reduceByKey(lambda a, b: a + b)

# Get the top 3 athletes with the most medals
top_3_atletas = consulta2_2r.takeOrdered(3, key=lambda x: -x[1])

for atleta, total_medals in top_3_atletas:
    print(f"Atleta: {atleta}, Total Medals: {total_medals}")

Atleta: Larysa Semenivna Latynina (Diriy-), Total Medals: 18
Atleta: Nikolay Yefimovich Andrianov, Total Medals: 15
Atleta: Ole Einar Bjrndalen, Total Medals: 13


In [31]:
output_dir = "out/consulta2_2"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)


top_3_atletas_rdd = spark.sparkContext.parallelize(top_3_atletas)
top_3_atletas_rdd.saveAsTextFile(output_dir)

### 2.3. Contagem de Atletas por Faixa Etária e Esporte

In [32]:
def categorize_age(age):
    if 20 <= age <= 30:
        return '20-30'
    elif 31 <= age <= 40:
        return '31-40'
    else:
        return 'Other'

filtered_rdd = rdd.filter(lambda row: row._c3.isdigit())

consulta2_3m = filtered_rdd.map(lambda row: ((categorize_age(int(row._c3)), row._c12), 1))

consulta2_3r = consulta2_3m.reduceByKey(lambda a, b: a + b)

results = consulta2_3r.take(10)

for (faixa_etaria, esporte), total_athletes in results:
    print(f"Faixa Etária: {faixa_etaria}, Esporte: {esporte}, Total Atletas: {total_athletes}")

Faixa Etária: 20-30, Esporte: Basketball, Total Atletas: 2314
Faixa Etária: 20-30, Esporte: Judo, Total Atletas: 1980
Faixa Etária: 20-30, Esporte: Football, Total Atletas: 3487
Faixa Etária: 31-40, Esporte: Tug-Of-War, Total Atletas: 24
Faixa Etária: 20-30, Esporte: Speed Skating, Total Atletas: 2717
Faixa Etária: 31-40, Esporte: Cross Country Skiing, Total Atletas: 785
Faixa Etária: Other, Esporte: Athletics, Total Atletas: 1703
Faixa Etária: 20-30, Esporte: Ice Hockey, Total Atletas: 2569
Faixa Etária: 20-30, Esporte: Swimming, Total Atletas: 7712
Faixa Etária: 20-30, Esporte: Cross Country Skiing, Total Atletas: 4381


In [33]:
output_dir = "out/consulta2_3"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

consulta2_3r.saveAsTextFile(output_dir)

### 2.4. Proporção de Atletas Homens e Mulheres por Ano

In [34]:
consulta2_4m = rdd.map(lambda row: (row._c9, (1, 0)) if row._c2 == 'M' else (row._c9, (0, 1)))

# Combine sum of (males, females) by year
consulta2_4c = consulta2_4m.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# Reducer:  sum of (males, females) by year and calculate the proportion
consulta2_4r = consulta2_4c.mapValues(lambda counts: (counts[0], counts[1], counts[0] / (counts[0] + counts[1]), counts[1] / (counts[0] + counts[1])))

results = consulta2_4r.take(10)

for year, (males, females, male_ratio, female_ratio) in results:
    print(f"Ano: {year}, Homens: {males}, Mulheres: {females}, Proporção Homens: {male_ratio:.2f}, Proporção Mulheres: {female_ratio:.2f}")

Ano: 1992, Homens: 7008, Mulheres: 3075, Proporção Homens: 0.70, Proporção Mulheres: 0.30
Ano: 2012, Homens: 4441, Mulheres: 3517, Proporção Homens: 0.56, Proporção Mulheres: 0.44
Ano: 1920, Homens: 2562, Mulheres: 93, Proporção Homens: 0.96, Proporção Mulheres: 0.04
Ano: 1900, Homens: 1275, Mulheres: 24, Proporção Homens: 0.98, Proporção Mulheres: 0.02
Ano: 1988, Homens: 6523, Mulheres: 2597, Proporção Homens: 0.72, Proporção Mulheres: 0.28
Ano: 1994, Homens: 1231, Mulheres: 621, Proporção Homens: 0.66, Proporção Mulheres: 0.34
Ano: 1932, Homens: 1910, Mulheres: 240, Proporção Homens: 0.89, Proporção Mulheres: 0.11
Ano: 2002, Homens: 1565, Mulheres: 844, Proporção Homens: 0.65, Proporção Mulheres: 0.35
Ano: 1952, Homens: 4743, Mulheres: 984, Proporção Homens: 0.83, Proporção Mulheres: 0.17
Ano: 1980, Homens: 4123, Mulheres: 1308, Proporção Homens: 0.76, Proporção Mulheres: 0.24


In [35]:
output_dir = "out/consulta2_4"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

consulta2_4r.saveAsTextFile(output_dir)

### 3.1. Evolução da Idade Média dos Atletas por País

In [43]:
filtered_rdd = rdd.filter(lambda row: row._c3.isdigit())

# Mapper: Emit ((country, year), (age, 1)) for each row
consulta3_1m = filtered_rdd.map(lambda row: ((row._c6, row._c9), (float(row._c3), 1)))

# Reducer: Sum ages and counts
consulta3_1c = consulta3_1m.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# average age
consulta3_1r = consulta3_1c.mapValues(lambda x: x[0] / x[1])

# Mapper: Emit (country, (year, average_age)) for each entry
consulta3_1m2 = consulta3_1r.map(lambda x: (x[0][0], (x[0][1], x[1])))

results = consulta3_1m2.take(10)

for country, (year, avg_age) in results:
    print(f"Country: {country}, Year: {year}, Average Age: {avg_age:.2f}")

Country: China, Year: 1992, Average Age: 21.40
Country: China, Year: 2012, Average Age: 24.00
Country: Denmark, Year: 1920, Average Age: 29.11
Country: Denmark/Sweden, Year: 1900, Average Age: 34.00
Country: Netherlands, Year: 1988, Average Age: 24.46
Country: Netherlands, Year: 1992, Average Age: 24.45
Country: Netherlands, Year: 1994, Average Age: 22.60
Country: United States, Year: 1992, Average Age: 25.69
Country: United States, Year: 1994, Average Age: 25.73
Country: Netherlands, Year: 1932, Average Age: 34.74


In [54]:
output_dir = "out/consulta3_1"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

consulta3_1m2.saveAsTextFile(output_dir)

### 3.2 Contagem de Medalhas por Tipo (Ouro, Prata, Bronze) por País e Ano

In [51]:
medal_counts_map = rdd.filter(lambda row: row._c14.lower() in ["gold", "silver", "bronze"]).map(lambda row: ((row._c7, row._c9, row._c14.lower()), 1))

medal_counts_reduce = medal_counts_map.reduceByKey(lambda a, b: a + b)

medal_counts_by_type = medal_counts_reduce.map(lambda x: (x[0][0], x[0][1], x[0][2], x[1]))

total_medals_map = medal_counts_by_type.map(lambda x: ((x[0], x[1]), (x[2], x[3])))

total_medals_reduce = total_medals_map.groupByKey().mapValues(lambda medals: {
    "total": sum(count for _, count in medals),
    "gold_percent": sum(count for medal, count in medals if medal == "gold") / sum(count for _, count in medals),
    "silver_percent": sum(count for medal, count in medals if medal == "silver") / sum(count for _, count in medals),
    "bronze_percent": sum(count for medal, count in medals if medal == "bronze") / sum(count for _, count in medals)
})

# Formatted result
results = total_medals_reduce.take(10)
for (country, year), data in results:
    print(f"País: {country}, Ano: {year}, Total: {data['total']}, Percentual Ouro: {data['gold_percent']:.2%}, Percentual Prata: {data['silver_percent']:.2%}, Percentual Bronze: {data['bronze_percent']:.2%}")


País: DEN, Ano: 1900, Total: 3, Percentual Ouro: 66.67%, Percentual Prata: 0.00%, Percentual Bronze: 33.33%
País: FIN, Ano: 1920, Total: 23, Percentual Ouro: 47.83%, Percentual Prata: 17.39%, Percentual Bronze: 34.78%
País: FIN, Ano: 2014, Total: 18, Percentual Ouro: 5.56%, Percentual Prata: 11.11%, Percentual Bronze: 83.33%
País: FIN, Ano: 1948, Total: 24, Percentual Ouro: 37.50%, Percentual Prata: 29.17%, Percentual Bronze: 33.33%
País: FIN, Ano: 1952, Total: 30, Percentual Ouro: 30.00%, Percentual Prata: 13.33%, Percentual Bronze: 56.67%
País: NOR, Ano: 1992, Total: 25, Percentual Ouro: 24.00%, Percentual Prata: 56.00%, Percentual Bronze: 20.00%
País: NOR, Ano: 1994, Total: 18, Percentual Ouro: 44.44%, Percentual Prata: 44.44%, Percentual Bronze: 11.11%
País: NOR, Ano: 2002, Total: 24, Percentual Ouro: 66.67%, Percentual Prata: 20.83%, Percentual Bronze: 12.50%
País: NOR, Ano: 2006, Total: 19, Percentual Ouro: 10.53%, Percentual Prata: 36.84%, Percentual Bronze: 52.63%
País: NOR, An

In [53]:
output_dir = "out/consulta3_2"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

consulta3_1m2.saveAsTextFile(output_dir)