**ძირითადი ნაწილი (6 ქულა)**

ამ დავალების პირობა თქვენთვის კარგად ნაცნობია შუალედური გამოცდიდან. 😁😁 მოგიწევთ, რომ იდენტური ამოცანა ამოხსნათ, ოღონდ PySpark-ის გამოყენებით. არ გამოიყენოთ Pandas-ის ბიბლიოთეკა. წინააღმდეგ შემთხვევაში არ ჩაითვლება. 

შეგახსენებთ პირობას:


You are given dataset of Marvel superheroes (some DC heroes may have sneaked through as usual), your first task is to randomly generate teams of 5 heroes with following columns:
- team_name: team name should be generated using strongest hero name in team with following template "Team: {Strongest_Hero_Name}" to calculate stongest hero in team you'll need to calculate average of following attributes in charcters_stats file: Intelligence, Strength, Speed, Durability, Power, Combat
- leader: name of hero with highest Intelligence score in team
- tank: name of hero with highest sum value of Strength and Durability
- damage: hero with highest sum value of Speed, Power and Combat 
- the_other_guy: one of the hero who's not specified in above columns
- the_other_guy2: one of the hero who's not specified in above columns
- top_average_speed: average of top 3 highest speed entries in team
- top_average_height: average of top 3 tallest heroes in team
- flight_count: number of heroes who can fly in team (available in superheroes_power_matrix)  

**same hero should not end up in multiple teams.**

**all negative values should be converted to 0.**


In [1]:
!sudo apt-get update

!apt-get install openjdk-8-jre

!apt-get install scala
!pip install py4j
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:5 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [73.9 kB]
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Get:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:9 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:11 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:13 https://developer.download.nvi

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"


import findspark
findspark.init()


from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("Test Setup").getOrCreate()
sc = spark.sparkContext

In [3]:
df_stats = spark.read.option("header", True).csv("character_stats.csv").drop('Alignment', 'Total')
df_info = spark.read.option("header", True).csv("marvel_characters_info.csv").select('Name', 'Height')
df_power_matrix = spark.read.option("header", True).csv("superheroes_power_matrix.csv").select('Name', 'Flight')

In [4]:
df_tmp = df_stats.join(df_info,'Name', 'full')
df = df_tmp.join(df_power_matrix, 'Name', 'full')

del df_stats
del df_info
del df_power_matrix

In [5]:
from pyspark.sql.types import *

df = df.withColumn("Intelligence", df.Intelligence.cast(DoubleType()))\
  .withColumn("Strength", df.Strength.cast(DoubleType()))\
  .withColumn("Speed", df.Speed.cast(DoubleType()))\
  .withColumn("Durability", df.Durability.cast(DoubleType()))\
  .withColumn("Power", df.Power.cast(DoubleType()))\
  .withColumn("Combat", df.Combat.cast(DoubleType()))\
  .withColumn("Height", df.Height.cast(DoubleType()))\
  .withColumn("Flight", df.Flight.cast(BooleanType()))\


df = df.withColumn("Flight", df.Flight.cast(IntegerType()))

In [6]:
from pyspark.sql.functions import udf

@udf(returnType=DoubleType())
def filter_negative(stat):
     return None if stat == None else max(0.0, stat)

In [7]:
df = df.withColumn("Intelligence", filter_negative(df.Intelligence))\
  .withColumn("Strength", filter_negative(df.Strength))\
  .withColumn("Speed", filter_negative(df.Speed))\
  .withColumn("Durability", filter_negative(df.Durability))\
  .withColumn("Power", filter_negative(df.Power))\
  .withColumn("Combat", filter_negative(df.Combat))\
  .withColumn("Height", filter_negative(df.Height))

In [8]:
df = df.dropDuplicates(['Name'])

In [9]:
from pyspark.sql.functions import rand 

df_randomized = df.orderBy(rand())

In [10]:
count = df_randomized.count()
df = df_randomized.limit(count - count%5)
del df_randomized

In [11]:
from pyspark.sql.functions import array, lit
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window

num_teams = df.count() // 5
teams = list(range(num_teams)) * 5
team_df = spark.createDataFrame(teams, IntegerType()).withColumnRenamed("value", "group_number")

# df.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
# other idea: apply func to df which maps each row_idx to floor multiple of 5 (x - x%5)
a = df.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
b = team_df.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
team_df = team_df.limit(df.count() // 5)

df = a.join(b, a.row_idx == b.row_idx).\
             drop("row_idx")

del num_teams
del teams
del team_df
del a
del b

In [12]:
df = df.fillna(0)

In [13]:
from pyspark.sql.functions import lit, col

df = df.withColumn("avg_total", (col('Intelligence') + col('Strength') + col('Speed') + col('Durability') + col('Power') + col('Combat')) / lit(6))
df = df.withColumn("tankiness", col('Strength') + col('Durability'))
df = df.withColumn("damage", col('Speed') + col('Power') + col('Combat'))
df_cp = df

In [14]:
df_team_name = df.groupBy(df.group_number).max('avg_total').withColumnRenamed('max(avg_total)', 'avg_total')
df_result = df_team_name.join(df,['group_number', 'avg_total'], 'left').select('group_number' , 'Name').withColumnRenamed('Name', 'team_name').drop_duplicates(['group_number'])
del df_team_name

In [15]:
df_leader = df.groupBy(df.group_number).max('Intelligence').withColumnRenamed('max(Intelligence)', 'Intelligence')
df_tmp = df_leader.join(df,['group_number', 'Intelligence'], 'left').select('group_number' , 'Name').withColumnRenamed('Name', 'leader').drop_duplicates(['group_number'])
df_result = df_result.join(df_tmp, 'group_number')

df = df.join(df_tmp, df.Name == df_tmp.leader, 'leftanti')
del df_leader

In [16]:
df_tank = df.groupBy(df.group_number).max('tankiness').withColumnRenamed('max(tankiness)', 'tankiness')
df_tmp = df_tank.join(df,['group_number', 'tankiness'], 'left').select('group_number' , 'Name').withColumnRenamed('Name', 'tank').drop_duplicates(['group_number'])
df_result = df_result.join(df_tmp, 'group_number')

df = df.join(df_tmp, df.Name == df_tmp.tank, 'leftanti')
del df_tank

In [17]:
df_damage = df.groupBy(df.group_number).max('damage').withColumnRenamed('max(damage)', 'damage')
df_tmp = df_damage.join(df,['group_number', 'damage'], 'left').select('group_number' , 'Name').withColumnRenamed('Name', 'damage').drop_duplicates(['group_number'])
df_result = df_result.join(df_tmp, 'group_number')

df = df.join(df_tmp, df.Name == df_tmp.damage, 'leftanti')
del df_damage

In [18]:
df_other = df.groupBy(df.group_number).max('Height').withColumnRenamed('max(Height)', 'Height')
df_tmp = df_other.join(df,['group_number', 'Height'], 'left').select('group_number', 'Name').withColumnRenamed('Name', 'other_guy_1').drop_duplicates(['group_number'])
df_result = df_result.join(df_tmp, 'group_number')
df = df.join(df_tmp, df.Name == df_tmp.other_guy_1, 'leftanti')
del df_other

In [19]:
df_tmp = df.select('Name', 'group_number').withColumnRenamed('Name', 'other_guy_2')
df_result = df_result.join(df_tmp, 'group_number')

In [20]:
from pyspark.sql.functions import concat, col, lit

df_result = df_result.withColumn('team_name',  concat(lit("Team: {"), col("team_name"), lit("}")))
df = df_cp
del df_cp

In [21]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
window_speed = Window.partitionBy("group_number").orderBy(col("speed").desc())
window_height = Window.partitionBy("group_number").orderBy(col("height").desc())

df_tmp = df.withColumn("row",row_number().over(window_speed)).filter(col("row") <= 3)
top_average_speed_df = df_tmp.groupBy("group_number").avg("Speed").withColumnRenamed("avg(Speed)", "top_average_speed")
df_result = df_result.join(top_average_speed_df, 'group_number')


df_tmp = df.withColumn("row",row_number().over(window_height)).filter(col("row") <= 3)
top_average_height_df = df_tmp.groupBy("group_number").avg("Height").withColumnRenamed("avg(Height)", "top_average_height")
df_result = df_result.join(top_average_height_df, 'group_number')

del window_speed
del window_height
del df_tmp
del top_average_speed_df
del top_average_height_df

In [22]:
df_flight = df.groupBy(df.group_number).sum('Flight').withColumnRenamed('sum(Flight)', 'flight_count')
df_result = df_result.join(df_flight, 'group_number')
del df_flight

In [23]:
# df_result.show()

In [24]:
df_result.write.csv('result_1.csv', mode='overwrite', header=True)

---


**ბონუს ნაწილი (2 ქულა)**

ეს ნაწილიც თქვენთვის ნაცნობია და გუნდების ერთმანეთთან შეჯიბრებას, ბოლოს კი გამარჯვებულის გამოვლენას ეხება.
შეგახსენებთ პირობას:

After generating teams, obvious next task would be to perform basketball face off between teams. To determine winning teams we'll be using following logic:
* team scores 1 point if their top_average_speed is superior 
* team scores 1 point if their top_average_height is superior 
* team scores 1 point for each member inside team who can fly (available in superheroes_power_matrix)  
team with highest score wins!

Based on rules above you should perform competition: first you'll need to randomly divide teams (all of them) into 4 brackets and generate data frames for each of them, data frame should contain all possible combinations of teams,
for example if bracket consists of teams [A,B,C,D] data frame should contain pairs (A,B) (A,C) (A,D) (B,C) (B,D) (C,D) with following schema
- team_a: name for first team
- team_b: name for second team
- team_a_score: score for first team against second team, calculated using rules described above
- team_b_score: same for second team  

after generating data frame, pick one team from each bracket with most wins: team1, team2, team3 and team4
perform semi-finals: team1 vs team2, team3 vs team4 and finals with winner from these games

**if you get a tie at any point when facing teams with each other, you can pick any team, you may also drop some teams if there is too much missing data**  

---

გაწვდით 3 ფაილს, რომელიც ამ დავალებისთვის დაგჭირდებათ. ფაილებში პერსონაჟები არ დუბლირდება და შეგიძლიათ, ეგ არ შეამოწმოთ.

ამოხსნა ატვირთეთ ნოუთბუქის სახით.

წარმატებები!

In [25]:
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"


# import findspark
# findspark.init()


# from pyspark.sql import SparkSession
# spark = SparkSession.builder.master("local[*]").appName("Test Setup").getOrCreate()
# sc = spark.sparkContext

In [26]:
df_result = spark.read.option("header", True).csv("result_1.csv")

In [27]:
from pyspark.sql.types import *

df_result = df_result.select("group_number", "team_name", "top_average_speed", "top_average_height", "flight_count")

df_result = df_result.withColumn("group_number", df_result.group_number.cast(IntegerType()))\
  .withColumn("team_name", df_result.team_name.cast(StringType()))\
  .withColumn("top_average_speed", df_result.top_average_speed.cast(DoubleType()))\
  .withColumn("top_average_height", df_result.top_average_height.cast(DoubleType()))\
  .withColumn("flight_count", df_result.flight_count.cast(IntegerType()))

df_result_cp = df_result

In [28]:
from math import ceil
count = df_result.count()
bracket_num = ceil(count / 4)

bracket_1 = df_result.limit(bracket_num)
df_result = df_result.subtract(bracket_1)
df_result = df_result.orderBy("group_number")

bracket_2 = df_result.limit(bracket_num)
df_result = df_result.subtract(bracket_2)
df_result = df_result.orderBy("group_number")

bracket_3 = df_result.limit(bracket_num)
df_result = df_result.subtract(bracket_3)
df_result = df_result.orderBy("group_number")

bracket_4 = df_result.limit(bracket_num)
df_result = df_result.subtract(bracket_4)
del df_result
del count
del bracket_num

In [29]:
bracket_1_1 = bracket_1.withColumnRenamed("group_number", "group_number_1")\
  .withColumnRenamed("team_name", "team_name_1")\
  .withColumnRenamed("top_average_speed", "top_average_speed_1")\
  .withColumnRenamed("top_average_height", "top_average_height_1")\
  .withColumnRenamed("flight_count", "flight_count_1")

bracket_2_1 = bracket_2.withColumnRenamed("group_number", "group_number_1")\
  .withColumnRenamed("team_name", "team_name_1")\
  .withColumnRenamed("top_average_speed", "top_average_speed_1")\
  .withColumnRenamed("top_average_height", "top_average_height_1")\
  .withColumnRenamed("flight_count", "flight_count_1")

bracket_3_1 = bracket_3.withColumnRenamed("group_number", "group_number_1")\
  .withColumnRenamed("team_name", "team_name_1")\
  .withColumnRenamed("top_average_speed", "top_average_speed_1")\
  .withColumnRenamed("top_average_height", "top_average_height_1")\
  .withColumnRenamed("flight_count", "flight_count_1")

bracket_4_1 = bracket_4.withColumnRenamed("group_number", "group_number_1")\
  .withColumnRenamed("team_name", "team_name_1")\
  .withColumnRenamed("top_average_speed", "top_average_speed_1")\
  .withColumnRenamed("top_average_height", "top_average_height_1")\
  .withColumnRenamed("flight_count", "flight_count_1")
bracket_1 = bracket_1.crossJoin(bracket_1_1)
bracket_2 = bracket_2.crossJoin(bracket_2_1)
bracket_3 = bracket_3.crossJoin(bracket_3_1)
bracket_4 = bracket_4.crossJoin(bracket_4_1)

del bracket_1_1
del bracket_2_1
del bracket_3_1
del bracket_4_1

In [30]:
from pyspark.sql.functions import udf, col, when

bracket_1 = bracket_1.select(
    when(col('team_name') > col('team_name_1'), col('group_number')).otherwise(col('group_number_1')).alias('group_number'),
    when(col('team_name') > col('team_name_1'), col('group_number_1')).otherwise(col('group_number')).alias('group_number_1'),
    when(col('team_name') > col('team_name_1'), col('team_name')).otherwise(col('team_name_1')).alias('team_name'),
    when(col('team_name') > col('team_name_1'), col('team_name_1')).otherwise(col('team_name')).alias('team_name_1'),
    when(col('team_name') > col('team_name_1'), col('top_average_speed')).otherwise(col('top_average_speed_1')).alias('top_average_speed'),
    when(col('team_name') > col('team_name_1'), col('top_average_speed_1')).otherwise(col('top_average_speed')).alias('top_average_speed_1'),
    when(col('team_name') > col('team_name_1'), col('top_average_height')).otherwise(col('top_average_height_1')).alias('top_average_height'),
    when(col('team_name') > col('team_name_1'), col('top_average_height_1')).otherwise(col('top_average_height')).alias('top_average_height_1'),
    when(col('team_name') > col('team_name_1'), col('flight_count')).otherwise(col('flight_count_1')).alias('flight_count'),
    when(col('team_name') > col('team_name_1'), col('flight_count_1')).otherwise(col('flight_count')).alias('flight_count_1')
)
bracket_1 = bracket_1.dropDuplicates(['team_name', 'team_name_1']).orderBy('group_number', 'group_number_1')
bracket_1 = bracket_1.filter(bracket_1.group_number != bracket_1.group_number_1)

In [31]:
bracket_2 = bracket_2.select(
    when(col('team_name') > col('team_name_1'), col('group_number')).otherwise(col('group_number_1')).alias('group_number'),
    when(col('team_name') > col('team_name_1'), col('group_number_1')).otherwise(col('group_number')).alias('group_number_1'),
    when(col('team_name') > col('team_name_1'), col('team_name')).otherwise(col('team_name_1')).alias('team_name'),
    when(col('team_name') > col('team_name_1'), col('team_name_1')).otherwise(col('team_name')).alias('team_name_1'),
    when(col('team_name') > col('team_name_1'), col('top_average_speed')).otherwise(col('top_average_speed_1')).alias('top_average_speed'),
    when(col('team_name') > col('team_name_1'), col('top_average_speed_1')).otherwise(col('top_average_speed')).alias('top_average_speed_1'),
    when(col('team_name') > col('team_name_1'), col('top_average_height')).otherwise(col('top_average_height_1')).alias('top_average_height'),
    when(col('team_name') > col('team_name_1'), col('top_average_height_1')).otherwise(col('top_average_height')).alias('top_average_height_1'),
    when(col('team_name') > col('team_name_1'), col('flight_count')).otherwise(col('flight_count_1')).alias('flight_count'),
    when(col('team_name') > col('team_name_1'), col('flight_count_1')).otherwise(col('flight_count')).alias('flight_count_1')
)
bracket_2 = bracket_2.dropDuplicates(['team_name', 'team_name_1']).orderBy('group_number', 'group_number_1')
bracket_2 = bracket_2.filter(bracket_2.group_number != bracket_2.group_number_1)

In [32]:
bracket_3 = bracket_3.select(
    when(col('team_name') > col('team_name_1'), col('group_number')).otherwise(col('group_number_1')).alias('group_number'),
    when(col('team_name') > col('team_name_1'), col('group_number_1')).otherwise(col('group_number')).alias('group_number_1'),
    when(col('team_name') > col('team_name_1'), col('team_name')).otherwise(col('team_name_1')).alias('team_name'),
    when(col('team_name') > col('team_name_1'), col('team_name_1')).otherwise(col('team_name')).alias('team_name_1'),
    when(col('team_name') > col('team_name_1'), col('top_average_speed')).otherwise(col('top_average_speed_1')).alias('top_average_speed'),
    when(col('team_name') > col('team_name_1'), col('top_average_speed_1')).otherwise(col('top_average_speed')).alias('top_average_speed_1'),
    when(col('team_name') > col('team_name_1'), col('top_average_height')).otherwise(col('top_average_height_1')).alias('top_average_height'),
    when(col('team_name') > col('team_name_1'), col('top_average_height_1')).otherwise(col('top_average_height')).alias('top_average_height_1'),
    when(col('team_name') > col('team_name_1'), col('flight_count')).otherwise(col('flight_count_1')).alias('flight_count'),
    when(col('team_name') > col('team_name_1'), col('flight_count_1')).otherwise(col('flight_count')).alias('flight_count_1')
)
bracket_3 = bracket_3.dropDuplicates(['team_name', 'team_name_1']).orderBy('group_number', 'group_number_1')
bracket_3 = bracket_3.filter(bracket_3.group_number != bracket_3.group_number_1)

In [33]:
bracket_4 = bracket_4.select(
    when(col('team_name') > col('team_name_1'), col('group_number')).otherwise(col('group_number_1')).alias('group_number'),
    when(col('team_name') > col('team_name_1'), col('group_number_1')).otherwise(col('group_number')).alias('group_number_1'),
    when(col('team_name') > col('team_name_1'), col('team_name')).otherwise(col('team_name_1')).alias('team_name'),
    when(col('team_name') > col('team_name_1'), col('team_name_1')).otherwise(col('team_name')).alias('team_name_1'),
    when(col('team_name') > col('team_name_1'), col('top_average_speed')).otherwise(col('top_average_speed_1')).alias('top_average_speed'),
    when(col('team_name') > col('team_name_1'), col('top_average_speed_1')).otherwise(col('top_average_speed')).alias('top_average_speed_1'),
    when(col('team_name') > col('team_name_1'), col('top_average_height')).otherwise(col('top_average_height_1')).alias('top_average_height'),
    when(col('team_name') > col('team_name_1'), col('top_average_height_1')).otherwise(col('top_average_height')).alias('top_average_height_1'),
    when(col('team_name') > col('team_name_1'), col('flight_count')).otherwise(col('flight_count_1')).alias('flight_count'),
    when(col('team_name') > col('team_name_1'), col('flight_count_1')).otherwise(col('flight_count')).alias('flight_count_1')
)
bracket_4 = bracket_4.dropDuplicates(['team_name', 'team_name_1']).orderBy('group_number', 'group_number_1')
bracket_4 = bracket_4.filter(bracket_4.group_number != bracket_4.group_number_1)

In [34]:
from pyspark.sql.functions import udf

@udf(returnType=IntegerType())
def calculate_score(a_flight, b_flight, a_speed, b_speed, a_height, b_height):
    score = a_flight
    if a_speed > b_speed:
      score += 1
    if a_height > b_height:
      score += 1
    return score

In [35]:
bracket_1 = bracket_1.withColumn("team_a_score", calculate_score(bracket_1.flight_count, bracket_1.flight_count_1, bracket_1.top_average_speed, bracket_1.top_average_speed_1, bracket_1.top_average_height, bracket_1.top_average_height_1))
bracket_1 = bracket_1.withColumn("team_b_score", calculate_score(bracket_1.flight_count_1, bracket_1.flight_count, bracket_1.top_average_speed_1, bracket_1.top_average_speed, bracket_1.top_average_height_1, bracket_1.top_average_height))
bracket_1 = bracket_1.withColumnRenamed("team_name", "team_a").withColumnRenamed("team_name_1", "team_b").select("team_a", "team_b", "team_a_score", "team_b_score")

bracket_2 = bracket_2.withColumn("team_a_score", calculate_score(bracket_2.flight_count, bracket_2.flight_count_1, bracket_2.top_average_speed, bracket_2.top_average_speed_1, bracket_2.top_average_height, bracket_2.top_average_height_1))
bracket_2 = bracket_2.withColumn("team_b_score", calculate_score(bracket_2.flight_count_1, bracket_2.flight_count, bracket_2.top_average_speed_1, bracket_2.top_average_speed, bracket_2.top_average_height_1, bracket_2.top_average_height))
bracket_2 = bracket_2.withColumnRenamed("team_name", "team_a").withColumnRenamed("team_name_1", "team_b").select("team_a", "team_b", "team_a_score", "team_b_score")


bracket_3 = bracket_3.withColumn("team_a_score", calculate_score(bracket_3.flight_count, bracket_3.flight_count_1, bracket_3.top_average_speed, bracket_3.top_average_speed_1, bracket_3.top_average_height, bracket_3.top_average_height_1))
bracket_3 = bracket_3.withColumn("team_b_score", calculate_score(bracket_3.flight_count_1, bracket_3.flight_count, bracket_3.top_average_speed_1, bracket_3.top_average_speed, bracket_3.top_average_height_1, bracket_3.top_average_height))
bracket_3 = bracket_3.withColumnRenamed("team_name", "team_a").withColumnRenamed("team_name_1", "team_b").select("team_a", "team_b", "team_a_score", "team_b_score")


bracket_4 = bracket_4.withColumn("team_a_score", calculate_score(bracket_4.flight_count, bracket_4.flight_count_1, bracket_4.top_average_speed, bracket_4.top_average_speed_1, bracket_4.top_average_height, bracket_4.top_average_height_1))
bracket_4 = bracket_4.withColumn("team_b_score", calculate_score(bracket_4.flight_count_1, bracket_4.flight_count, bracket_4.top_average_speed_1, bracket_4.top_average_speed, bracket_4.top_average_height_1, bracket_4.top_average_height))
bracket_4 = bracket_4.withColumnRenamed("team_name", "team_a").withColumnRenamed("team_name_1", "team_b").select("team_a", "team_b", "team_a_score", "team_b_score")

In [36]:
@udf(returnType=StringType())
def determine_winner(team_a, team_b, a_score, b_score):
         return team_a if a_score > b_score else team_b 

In [37]:
bracket_1 = bracket_1.withColumn("winner", determine_winner(bracket_1.team_a, bracket_1.team_b, bracket_1.team_a_score, bracket_1.team_b_score))
bracket_2 = bracket_2.withColumn("winner", determine_winner(bracket_2.team_a, bracket_2.team_b, bracket_2.team_a_score, bracket_2.team_b_score))
bracket_3 = bracket_3.withColumn("winner", determine_winner(bracket_3.team_a, bracket_3.team_b, bracket_3.team_a_score, bracket_3.team_b_score))
bracket_4 = bracket_4.withColumn("winner", determine_winner(bracket_4.team_a, bracket_4.team_b, bracket_4.team_a_score, bracket_4.team_b_score))

In [38]:
win_counts = bracket_1.groupBy(bracket_1.winner).count()
max_wins = win_counts.groupBy().max('count').withColumnRenamed("max(count)", "count")
winner_team_1 = max_wins.join(win_counts, "count", 'inner').drop_duplicates(['count'])

win_counts = bracket_2.groupBy(bracket_2.winner).count()
max_wins = win_counts.groupBy().max('count').withColumnRenamed("max(count)", "count")
winner_team_2 = max_wins.join(win_counts, "count", 'inner').drop_duplicates(['count'])

win_counts = bracket_3.groupBy(bracket_3.winner).count()
max_wins = win_counts.groupBy().max('count').withColumnRenamed("max(count)", "count")
winner_team_3 = max_wins.join(win_counts, "count", 'inner').drop_duplicates(['count'])

win_counts = bracket_4.groupBy(bracket_4.winner).count()
max_wins = win_counts.groupBy().max('count').withColumnRenamed("max(count)", "count")
winner_team_4 = max_wins.join(win_counts, "count", 'inner').drop_duplicates(['count'])

del bracket_1
del bracket_2
del bracket_3
del bracket_4

In [39]:
df_result = df_result_cp
del df_result_cp
winner_team_1 = winner_team_1.join(df_result, winner_team_1.winner == df_result.team_name, 'left').drop("count", "group_number", "winner")
winner_team_2 = winner_team_2.join(df_result, winner_team_2.winner == df_result.team_name, 'left').drop("count", "group_number", "winner")
winner_team_3 = winner_team_3.join(df_result, winner_team_3.winner == df_result.team_name, 'left').drop("count", "group_number", "winner")
winner_team_4 = winner_team_4.join(df_result, winner_team_4.winner == df_result.team_name, 'left').drop("count", "group_number", "winner")

In [40]:
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window


winner_team_1 = winner_team_1.withColumn("row_id", row_number().over(Window.orderBy(monotonically_increasing_id())))
winner_team_2 = winner_team_2.withColumn("row_id", row_number().over(Window.orderBy(monotonically_increasing_id())))
winner_team_2 = winner_team_2.withColumnRenamed("team_name", "team_name_1")\
  .withColumnRenamed("top_average_speed", "top_average_speed_1")\
  .withColumnRenamed("top_average_height", "top_average_height_1")\
  .withColumnRenamed("flight_count", "flight_count_1")
semifinal_1 = winner_team_1.join(winner_team_2, "row_id", "left").drop("row_id")



winner_team_3 = winner_team_3.withColumn("row_id", row_number().over(Window.orderBy(monotonically_increasing_id())))
winner_team_4 = winner_team_4.withColumn("row_id", row_number().over(Window.orderBy(monotonically_increasing_id())))
winner_team_4 = winner_team_4.withColumnRenamed("team_name", "team_name_1")\
  .withColumnRenamed("top_average_speed", "top_average_speed_1")\
  .withColumnRenamed("top_average_height", "top_average_height_1")\
  .withColumnRenamed("flight_count", "flight_count_1")
semifinal_2 = winner_team_3.join(winner_team_4, "row_id", "left").drop("row_id")

del winner_team_1
del winner_team_2
del winner_team_3
del winner_team_4

In [41]:
semifinal_1 = semifinal_1.withColumn("team_a_score", calculate_score(semifinal_1.flight_count, semifinal_1.flight_count_1, semifinal_1.top_average_speed, semifinal_1.top_average_speed_1, semifinal_1.top_average_height, semifinal_1.top_average_height_1))
semifinal_1 = semifinal_1.withColumn("team_b_score", calculate_score(semifinal_1.flight_count_1, semifinal_1.flight_count, semifinal_1.top_average_speed_1, semifinal_1.top_average_speed, semifinal_1.top_average_height_1, semifinal_1.top_average_height))
semifinal_1 = semifinal_1.withColumnRenamed("team_name", "team_a").withColumnRenamed("team_name_1", "team_b").select("team_a", "team_b", "team_a_score", "team_b_score")

semifinal_2 = semifinal_2.withColumn("team_a_score", calculate_score(semifinal_2.flight_count, semifinal_2.flight_count_1, semifinal_2.top_average_speed, semifinal_2.top_average_speed_1, semifinal_2.top_average_height, semifinal_2.top_average_height_1))
semifinal_2 = semifinal_2.withColumn("team_b_score", calculate_score(semifinal_2.flight_count_1, semifinal_2.flight_count, semifinal_2.top_average_speed_1, semifinal_2.top_average_speed, semifinal_2.top_average_height_1, semifinal_2.top_average_height))
semifinal_2 = semifinal_2.withColumnRenamed("team_name", "team_a").withColumnRenamed("team_name_1", "team_b").select("team_a", "team_b", "team_a_score", "team_b_score")

In [42]:
semifinal_1 = semifinal_1.withColumn("winner", determine_winner(semifinal_1.team_a, semifinal_1.team_b, semifinal_1.team_a_score, semifinal_1.team_b_score))
semifinal_2 = semifinal_2.withColumn("winner", determine_winner(semifinal_2.team_a, semifinal_2.team_b, semifinal_2.team_a_score, semifinal_2.team_b_score))

In [43]:
semifinal_1_winner = semifinal_1.select("winner")
semifinal_2_winner = semifinal_2.select("winner")
del semifinal_1
del semifinal_2

In [44]:
semifinal_1_winner = semifinal_1_winner.join(df_result, semifinal_1_winner.winner == df_result.team_name, 'left').drop("group_number", "winner")
semifinal_2_winner = semifinal_2_winner.join(df_result, semifinal_2_winner.winner == df_result.team_name, 'left').drop("group_number", "winner")

In [45]:
semifinal_1_winner = semifinal_1_winner.withColumn("row_id", row_number().over(Window.orderBy(monotonically_increasing_id())))
semifinal_2_winner = semifinal_2_winner.withColumn("row_id", row_number().over(Window.orderBy(monotonically_increasing_id())))
semifinal_2_winner = semifinal_2_winner.withColumnRenamed("team_name", "team_name_1")\
  .withColumnRenamed("top_average_speed", "top_average_speed_1")\
  .withColumnRenamed("top_average_height", "top_average_height_1")\
  .withColumnRenamed("flight_count", "flight_count_1")
final = semifinal_1_winner.join(semifinal_2_winner, "row_id", "left").drop("row_id")
del semifinal_1_winner
del semifinal_2_winner

In [46]:
final = final.withColumn("team_a_score", calculate_score(final.flight_count, final.flight_count_1, final.top_average_speed, final.top_average_speed_1, final.top_average_height, final.top_average_height_1))
final = final.withColumn("team_b_score", calculate_score(final.flight_count_1, final.flight_count, final.top_average_speed_1, final.top_average_speed, final.top_average_height_1, final.top_average_height))
final = final.withColumnRenamed("team_name", "team_a").withColumnRenamed("team_name_1", "team_b").select("team_a", "team_b", "team_a_score", "team_b_score")

In [47]:
final = final.withColumn("winner", determine_winner(final.team_a, final.team_b, final.team_a_score, final.team_b_score))
# final.show()

In [48]:
final.write.csv('result_2.csv', mode='overwrite', header=True)

In [49]:
# final_winner = final.select("winner")
# final_winner.show()
# del final_winner

del final