# 1 - Data transformation

In [0]:
import pandas as pd
from pyspark.sql import Row, Column
from pyspark.sql.functions import *
from pyspark.sql.types import *



In [0]:
%fs ls

path,name,size,modificationTime
dbfs:/FileStore/,FileStore/,0,0
dbfs:/databricks-datasets/,databricks-datasets/,0,0
dbfs:/databricks-results/,databricks-results/,0,0
dbfs:/local_disk0/,local_disk0/,0,0


## File 1 - OI Results 1896-2016
ID - id of athlete
<br>
Name - name of athlete
<br>
Gender - gender of athlete
<br>
Age - age of athlete, can be NULL
<br>
Height - height of athlete, can be NULL
<br>
Weight - weight of athlete, can be NULL
<br>
Team - Country team the athlete represents
<br>
NOC - National Olympic Committee
<br>
Games - Summer or Winter season and year held
<br>
Year - when Olympics took place
<br>
Season - Summer or Winter
<br>
City - where Olympics took place
<br>
Sport - Athlete's sport
<br>
Event - Athlete's discipline
<br>
Medal - Medal athlete got on event
<br><br>
Link: https://www.kaggle.com/datasets/heesoo37/120-years-of-olympic-history-athletes-and-results

In [0]:
file_location = "dbfs:/FileStore/tables/athletes-1.csv"

#schema 
schema_def = StructType([StructField('ID', IntegerType(), True),
                     StructField('Name', StringType(), True),
                     StructField('Gender', StringType(), True),
                     StructField('Age', IntegerType(), True),
                     StructField('Height', IntegerType(), True),
                     StructField('Weight', IntegerType(), True),
                     StructField('Team', StringType(), True),
                     StructField('NOC', StringType(), True),
                     StructField('Games', StringType(), True),
                     StructField('Year', IntegerType(), True),
                     StructField('Season', StringType(), True),
                     StructField('City', StringType(), True),
                     StructField('Sport', StringType(), True),
                     StructField('Event', StringType(), True),
                     StructField('Medal', StringType(), True),
])

In [0]:
#load data from csv file
games = spark.read.csv(file_location, encoding="UTF-8", header=True, schema=schema_def) 
games.cache()

#replace null with -1 for age, height and weight
games = games.fillna(-1, 'Age')
games = games.withColumn('Medal', regexp_replace('Medal','NA', 'NO'))

#select columns
games_df = games.select(
    "ID",
    "Name",
    "Age",
    "NOC",
    "Games",
    "Sport",
    "Event",
    "Medal",
    "Team",
    "Gender"
)

games_df.cache()
# dropping ALL duplicate values
games_no_duplicates_df = games_df.drop_duplicates(subset=['Name','Games'])
# sorting by id and name
games_no_duplicates_df = games_no_duplicates_df.sort(['ID','Name'])

athl_df = games_no_duplicates_df.select("Age", "Medal").where((col("Medal") == "Gold") | (col("Medal") == "NO") & (col("Age") != -1))
# save in DBFS
athl_df.write.mode("overwrite").csv(path='dbfs:/FileStore/tables/athletes_results.csv', header=True)
athl_df.show()
display(games_no_duplicates_df)


games_no_duplicates = games_no_duplicates_df.rdd
games_list = games_no_duplicates.map(list).collect()

ID,Name,Age,NOC,Games,Sport,Event,Medal,Team,Gender
1,A Dijiang,24,CHN,1992 Summer,Basketball,Basketball Men's Basketball,NO,China,M
2,A Lamusi,23,CHN,2012 Summer,Judo,Judo Men's Extra-Lightweight,NO,China,M
3,Gunnar Nielsen Aaby,24,DEN,1920 Summer,Football,Football Men's Football,NO,Denmark,M
4,Edgar Lindenau Aabye,34,DEN,1900 Summer,Tug-Of-War,Tug-Of-War Men's Tug-Of-War,Gold,Denmark/Sweden,M
5,Christine Jacoba Aaftink,21,NED,1988 Winter,Speed Skating,Speed Skating Women's 500 metres,NO,Netherlands,F
5,Christine Jacoba Aaftink,27,NED,1994 Winter,Speed Skating,Speed Skating Women's 500 metres,NO,Netherlands,F
5,Christine Jacoba Aaftink,25,NED,1992 Winter,Speed Skating,Speed Skating Women's 500 metres,NO,Netherlands,F
6,Per Knut Aaland,33,USA,1994 Winter,Cross Country Skiing,Cross Country Skiing Men's 10 kilometres,NO,United States,M
6,Per Knut Aaland,31,USA,1992 Winter,Cross Country Skiing,Cross Country Skiing Men's 10 kilometres,NO,United States,M
7,John Aalberg,31,USA,1992 Winter,Cross Country Skiing,Cross Country Skiing Men's 10 kilometres,NO,United States,M


### RDD map and reduce transformations

In [0]:
# athlete who participated in games
games_list_filter = [el for el in games_list if len(el[4]) > 0]
athletes_rdd = sc.parallelize(games_list)
athletes = athletes_rdd.map(lambda row: row[1]).cache()

athletes_count = athletes.map(lambda athlete: (athlete, 1)).reduceByKey(lambda c1,c2: c1+c2).sortBy(lambda gc: gc[1], ascending=False).cache()
athletes_map = athletes_count.take(5)

#Top 5 athletes who participated in most of the Olimpycs
for athlete in athletes_map:
    print(athlete)


('Ian Millar', 10)
('Hubert Raudaschl', 9)
('Afanasijs Kuzmins', 9)
('Paul Bert Elvstrm', 8)
('Aleksandr Vladimirovich Popov', 8)


In [0]:
# teams who had most participants in game
games_list_filter = [el for el in games_list if len(el[4]) > 0]
teams_rdd = sc.parallelize(games_list)
teams = teams_rdd.map(lambda row: row[3]+"_"+row[4]).cache()

teams_count = teams.map(lambda team: (team, 1)).reduceByKey(lambda c1,c2: c1+c2).sortBy(lambda tc: tc[1], ascending=False).cache()
teams_map = teams_count.take(5)

#Top 5 teams who had most participants in Olimpycs in one year
for team in teams_map:
    print(team)

('GBR_1908 Summer', 735)
('FRA_1900 Summer', 716)
('USA_1996 Summer', 648)
('AUS_2000 Summer', 617)
('USA_2008 Summer', 588)


In [0]:
teams_rdd = sc.parallelize(games_list)
teams = teams_rdd.map(lambda row: row[3]+"_"+row[4]).cache()

teams_count = teams.map(lambda team: (team, 1)).reduceByKey(lambda c1,c2: c1+c2).sortBy(lambda tc: tc[1], ascending=False).cache()
teams_map = teams_count.take(5)

#Top 5 teams who had most participants in Olimpycs in one year
for team in teams_map:
    print(team)

('GBR_1908 Summer', 735)
('FRA_1900 Summer', 716)
('USA_1996 Summer', 648)
('AUS_2000 Summer', 617)
('USA_2008 Summer', 588)


In [0]:
def delete_team_duplicates(list):
    list_filter = []
    for elem in list:
        new_elem = elem[3]+"_"+elem[4]+"_"+elem[6]
        if (elem[8] == 'Team') and new_elem not in list_filter:
            list.remove(elem)

    return list

In [0]:
#country with most won medals in games
games_list_filter = games_df.drop_duplicates(subset=['NOC', 'Games', 'Event', 'Medal']).collect()

#filter medals only
games_list_filter = [el for el in games_list_filter if el[7] != 'NO']

medals_rdd = sc.parallelize(games_list_filter)

medals = medals_rdd.map(lambda row: row[3]+"_"+row[4]).cache()

medals_count = medals.map(lambda medal: (medal, 1)).reduceByKey(lambda c1,c2: c1+c2).sortBy(lambda mc: mc[1], ascending=False).cache()
medals_map = medals_count.take(5)

#Top 5 teams who had most medals in Olimpycs in one year
for team in medals_map:
    print(team)
    

('USA_1904 Summer', 230)
('URS_1980 Summer', 195)
('USA_1984 Summer', 173)
('GBR_1908 Summer', 143)
('URS_1988 Summer', 131)


In [0]:
games_no_duplicates_athlete = games_df.drop_duplicates(subset=['Name','Games','Event'])
games_no_duplicates_athlete = games_no_duplicates_athlete.rdd
games_list_athlete = games_no_duplicates_athlete.map(list).collect()
games_list_filter = [el for el in games_list_athlete if el[7] != 'NO']

athletes_medals_rdd = sc.parallelize(games_list_filter)

athletes_medals = athletes_medals_rdd.map(lambda row: row[1]).cache()

athletes_medals_count = athletes_medals.map(lambda athlete: (athlete, 1)).reduceByKey(lambda c1,c2: c1+c2).sortBy(lambda amc: amc[1], ascending=False).cache()
athletes_medals_map = athletes_medals_count.take(5)

for athlete in athletes_medals_map:
    print(athlete)

('Michael Fred Phelps. II', 28)
('Larysa Semenivna Latynina (Diriy-)', 18)
('Nikolay Yefimovich Andrianov', 15)
('Borys Anfiyanovych Shakhlin', 13)
('Ole Einar Bjrndalen', 13)


### Transformation for OI data 1986-2016
We want to get number of gold, silver and bronze medals for each Game and each NOC who won medals on OI Games each year

In [0]:
games_df_new = games_df.drop_duplicates(subset=['NOC', 'Games', 'Event', 'Medal'])

# GOLD MEDALS FOR NOC AND GAME
gold_df = games_df_new.select("*").where((col("Medal") == "Gold")).groupBy(["NOC", "Games"]).agg(count("*").alias("Gold"))
# SILVER MEDALS FOR NOC AND GAME
silver_df = games_df_new.select("*").where(col("Medal") == "Silver").groupBy(["NOC", "Games"]).agg(count("*").alias("Silver"))
#rename columns
silver_df = silver_df.withColumnRenamed("NOC","NOC 1").withColumnRenamed("Games","Games 1")

# BRONZE MEDALS FOR NOC AND GAME
bronze_df = games_df_new.select("*").where(col("Medal") == "Bronze").groupBy(["NOC", "Games"]).agg(count("*").alias("Bronze"))
#rename columns
bronze_df = bronze_df.withColumnRenamed("NOC","NOC 2").withColumnRenamed("Games","Games 2")

# JOIN ALL TABLES 
first_join = gold_df.join(silver_df, (gold_df["NOC"] == silver_df["NOC 1"]) & (gold_df["Games"] == silver_df["Games 1"]), "outer")
first_join = first_join.withColumn('NOC',when(col('NOC').isNotNull(),col('NOC')).otherwise(col('NOC 1')))
first_join = first_join.withColumn('Games',when(col('Games').isNotNull(),col('Games')).otherwise(col('Games 1')))

second_join = first_join.join(bronze_df, (first_join["NOC"] == bronze_df["NOC 2"]) & (first_join["Games"] == bronze_df["Games 2"]), "outer")
second_join = second_join.withColumn('NOC',when(col('NOC').isNotNull(),col('NOC')).otherwise(col('NOC 2')))
second_join = second_join.withColumn('Games',when(col('Games').isNotNull(),col('Games')).otherwise(col('Games 2')))

# Remove columns
final_df = second_join.drop("NOC 1", "NOC 2", "Games 1", "Games 2")

final_df = final_df.fillna(0, ['Gold', 'Silver', 'Bronze'])

# Total
final_df = final_df.withColumn("Total",  expr("Gold + Silver + Bronze"))
# Sort
final_df = final_df.sort(desc("Total"))
#final_df = final_df.sort(asc("Games"), asc("NOC"))

display(final_df)

NOC,Games,Gold,Silver,Bronze,Total
USA,1904 Summer,76,79,75,230
URS,1980 Summer,80,69,46,195
USA,1984 Summer,82,61,30,173
GBR,1908 Summer,56,50,37,143
URS,1988 Summer,54,31,46,131
GDR,1980 Summer,47,37,42,126
URS,1976 Summer,49,41,35,125
USA,2016 Summer,46,37,38,121
EUN,1992 Summer,45,38,29,112
USA,1932 Summer,44,36,30,110


In [0]:
gm = games.filter(col("Games").contains("Summer"))
gm = gm.select("Event").distinct().orderBy("Event")
display(gm)

Event
Aeronautics Mixed Aeronautics
Alpinism Mixed Alpinism
"Archery Men's Au Chapelet, 33 metres"
"Archery Men's Au Chapelet, 50 metres"
"Archery Men's Au Cordon Dore, 33 metres"
"Archery Men's Au Cordon Dore, 50 metres"
Archery Men's Championnat Du Monde
Archery Men's Continental Style
Archery Men's Double American Round
Archery Men's Double York Round


## File 2 - OI Results 2020
ID - id of athlete
<br>
Team - Olympic team name
<br>
Gold - number of team gold medals 
<br>
Silver - number of team silver medals 
<br>
Bronze - number of team bronze medals 
<br>
Total - sum of number of team medals 
<br>
Rank - overall team placement
<br><br>
Link: https://www.kaggle.com/datasets/berkayalan/2021-olympics-medals-in-tokyo

In [0]:
# File 2 -> Results from 2020 Olympics
file_location_2 = "dbfs:/FileStore/tables/oi_summer_2020-2.csv"

#schema 
schema_def_2 = StructType([StructField('ID', IntegerType(), True),
                     StructField('Team', StringType(), True),
                     StructField('Gold', IntegerType(), True),
                     StructField('Silver', IntegerType(), True),
                     StructField('Bronze', IntegerType(), True),
                     StructField('Total', IntegerType(), True),
                     StructField('Rank', IntegerType(), True),
])

### Transformation for OI data 2020
We want to make column NOC by joining with existing dataframe we made with transforming oi data 1986-2016

In [0]:
oi_2020 = spark.read.csv(file_location_2, encoding="UTF-8", header=True, schema=schema_def_2)
display(oi_2020)

# Add column NOC
oi_2020 = oi_2020.withColumn("Games 2", lit("2020 Summer"))
oi_2020 = oi_2020.join(games_df, (oi_2020["Team"] == games_df["Team"]), "outer").cache()
oi_2020 = oi_2020.select("NOC", "Games 2", "Gold", "Silver", "Bronze", "Total").where(col("Games 2") == "2020 Summer").drop_duplicates(subset=["NOC", "Games 2"])
oi_2020 = oi_2020.fillna('RUS', 'NOC')
display(oi_2020)

ID,Team,Gold,Silver,Bronze,Total,Rank
1,United States,39,41,33,113,1
2,China,38,32,18,88,2
3,Japan,27,14,17,58,5
4,Great Britain,22,21,22,65,4
5,ROC,20,28,23,71,3
6,Australia,17,7,22,46,6
7,Netherlands,10,12,14,36,9
8,France,10,12,11,33,10
9,Germany,10,11,16,37,8
10,Italy,10,10,20,40,7


NOC,Games 2,Gold,Silver,Bronze,Total
RUS,2020 Summer,3,2,2,7
TPE,2020 Summer,2,4,6,12
SWE,2020 Summer,3,6,0,9
PHI,2020 Summer,1,2,1,4
MAS,2020 Summer,0,1,1,2
FIJ,2020 Summer,1,0,1,2
TUR,2020 Summer,2,2,9,13
GER,2020 Summer,10,11,16,37
JOR,2020 Summer,0,1,1,2
FRA,2020 Summer,10,12,11,33


Union dataframe we got with transforming oi data 1986-2016 and results from oi 2020 and adding dense ranking for NOC for each OI Games

In [0]:
# Add Olympics results from 2020 to all
final_result = final_df.union(oi_2020).cache()
final_result = final_result.sort(desc("Total"))

display(final_result)

NOC,Games,Gold,Silver,Bronze,Total
USA,1904 Summer,76,79,75,230
URS,1980 Summer,80,69,46,195
USA,1984 Summer,82,61,30,173
GBR,1908 Summer,56,50,37,143
URS,1988 Summer,54,31,46,131
GDR,1980 Summer,47,37,42,126
URS,1976 Summer,49,41,35,125
USA,2016 Summer,46,37,38,121
USA,2020 Summer,39,41,33,113
EUN,1992 Summer,45,38,29,112


In [0]:
from pyspark.sql.window import Window

windowSpec = Window.partitionBy(final_result["Games"]).orderBy(desc("Total"))
final_result = final_result.withColumn("Rank", dense_rank().over(windowSpec))
display(final_result)

NOC,Games,Gold,Silver,Bronze,Total,Rank
GRE,1896 Summer,10,17,17,44,1
USA,1896 Summer,11,6,2,19,2
GER,1896 Summer,7,5,2,14,3
FRA,1896 Summer,5,4,2,11,4
GBR,1896 Summer,3,3,3,9,5
HUN,1896 Summer,2,1,3,6,6
DEN,1896 Summer,1,2,3,6,6
AUT,1896 Summer,2,1,2,5,7
AUS,1896 Summer,2,0,1,3,8
SUI,1896 Summer,1,2,0,3,8


#### Saving transformed data in DBFS for later analysis

In [0]:
final_result.write.mode("overwrite").csv(path='dbfs:/FileStore/tables/oi_results-2.csv', header=True)

## File 3 - RENT
Date - when advertisment was posted
<br>
Price - anual price for house
<br>
Bedrooms - number of bedrooms
<br>
Bathrooms - nummber of bathrooms
<br>
Sqft living - living area in square foot
<br>
Sqft LOT - square footage or area 
<br>
Floors - number of floors
<br>
Waterfront - if house has waterfront or not
<br>
View - number of house views
<br>
Condition - number of conditions
<br>
Sqft above - all living square feet in a home that is above the ground
<br>
Sqft basement - living square feet in a home that is bellothe ground
<br>
Year built - when house is built
<br>
Year renovated - when house is renovated, or 0 if not
<br>
Street - in which street house is located
<br>
City - where house is located
<br><br>
Link: https://www.kaggle.com/datasets/shree1992/housedata

In [0]:
# File 3 -> House renting
file_location_3 = "dbfs:/FileStore/tables/data.csv"

#schema 
schema_def_3 = StructType([StructField('Date', DateType(), True),
                     StructField('Price', DoubleType(), True),
                     StructField('Bedrooms', DoubleType(), True),
                     StructField('Bathrooms', DoubleType(), True),
                     StructField('Sqft Living', IntegerType(), True),
                     StructField('Sqft LOT', IntegerType(), True),
                     StructField('Floors', DoubleType(), True),
                     StructField('Waterfront', IntegerType(), True),
                     StructField('View', IntegerType(), True),
                     StructField('Condition', IntegerType(), True),
                     StructField('Sqft above', IntegerType(), True),
                     StructField('Sqft basement', DoubleType(), True),
                     StructField('Year built', IntegerType(), True),
                     StructField('Year renovated', IntegerType(), True),
                     StructField('Street', StringType(), True),
                     StructField('City', StringType(), True),
])

Transformation for RENT data
We want to make column Old depending on year when house was built

In [0]:
rent_df = spark.read.csv(file_location_3, encoding="UTF-8", header=True, schema=schema_def_3).cache()
rent_df = rent_df.select("Price", "Bedrooms", "Bathrooms", "Sqft Living", "Floors", "Year built").where(col("Price") > 0).cache()
rent_df = rent_df.withColumn("Old", when(((col("Year built") >= 1900) & (col("Year built") <= 1950)), lit("20th c - first half")).when((col("Year built") >= 1951) & (col("Year built") <= 2000), lit("20th c - second half")).when(col("Year built") > 2000, lit("21th")))

steps = rent_df.select("Bedrooms").distinct().collect()
for step in steps[:]:
    print(step[0])
          #step[0])
#    _df = rent_df.select("*").where(col("Bedrooms") == step[0])
#    _df.coalesce(1).write.mode("append").option("header", "true").csv('dbfs:/FileStore/stream')

8.0
0.0
7.0
1.0
4.0
3.0
2.0
6.0
5.0
9.0


In [0]:
# save in DBFS
rent_df.write.mode("overwrite").csv(path='dbfs:/FileStore/tables/rent_results-2.csv', header=True)

In [0]:
rent_df.groupBy("Old").agg(avg("Price"), count("*")).show()

+--------------------+-----------------+--------+
|                 Old|       avg(Price)|count(1)|
+--------------------+-----------------+--------+
| 20th c - first half|565139.0624241706|    1124|
|20th c - second half|536592.2376613059|    2473|
|                21th|604634.0011274718|     954|
+--------------------+-----------------+--------+



## File 4 - Diet
<br>
Gender - male or female
<br>
Diet - type of diet
<br>
Weigth - weight before diet
<br>
Weigth After - number of bathrooms
<br><br>
Link: https://www.kaggle.com/datasets/zaranadoshi/anova-diet

In [0]:
# File 4 -> Diet plan
file_location_4 = "dbfs:/FileStore/tables/Diet.csv"

#schema 
schema_def_4 = StructType([StructField('Gender', StringType(), True),
                     StructField('Diet', StringType(), True),
                     StructField('Weight', DoubleType(), True),
                     StructField('Weight After', DoubleType(), True),
])

Transformation for Diet data
We want to make column Difference that is made in weight after six weeks diet

In [0]:
diet_df = spark.read.csv(file_location_4, encoding="UTF-8", header=True, schema=schema_def_4)
diet_df = diet_df.withColumn("Difference", round(diet_df["Weight"]-diet_df["Weight After"], 2)).cache()
display(diet_df)
# save in DBFS
diet_df.write.mode("overwrite").csv(path='dbfs:/FileStore/tables/diet_results.csv', header=True)

Gender,Diet,Weight,Weight After,Difference
M,B,60.0,60.0,0.0
M,B,103.0,103.0,0.0
F,A,58.0,54.2,3.8
F,A,60.0,54.0,6.0
F,A,64.0,63.3,0.7
F,A,64.0,61.1,2.9
F,A,65.0,62.2,2.8
F,A,66.0,64.0,2.0
F,A,67.0,65.0,2.0
F,A,69.0,60.5,8.5
