# Tournament Winners

### Description

Table: Players

| Column Name | Type  |
|-------------|-------|
| player_id   | int   |
| group_id    | int   |

player_id is the primary key (column with unique values) of this table.
Each row of this table indicates the group of each player.

Table: Matches

| Column Name   | Type    |
|---------------|---------|
| match_id      | int     |
| first_player  | int     |
| second_player | int     | 
| first_score   | int     |
| second_score  | int     |

match_id is the primary key (column with unique values) of this table.
Each row is a record of a match, first_player and second_player contain the player_id of each match.
first_score and second_score contain the number of points of the first_player and second_player respectively.
You may assume that, in each match, players belong to the same group.
 
The winner in each group is the player who scored the maximum total points within the group. In the case of a tie, the lowest player_id wins.

Write a solution to find the winner in each group.

Return the result table in any order.

### Imports

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime

### Sample data

In [0]:
players_schema = StructType([
    StructField("player_id", IntegerType(), nullable=False),
    StructField("group_id", IntegerType(), nullable=False)
])

matches_schema = StructType([
    StructField("match_id", IntegerType(), nullable=False),
    StructField("first_player", IntegerType(), nullable=False),
    StructField("second_player", IntegerType(), nullable=False),
    StructField("first_score", IntegerType(), nullable=False),
    StructField("second_score", IntegerType(), nullable=False)
])

players_data = [
    (15, 1),
    (25, 1),
    (30, 1),
    (45, 1),
    (10, 2),
    (35, 2),
    (50, 2),
    (20, 3),
    (40, 3)
]

matches_data = [
    (1, 15, 45, 3, 0),
    (2, 30, 25, 1, 2),
    (3, 30, 15, 2, 0),
    (4, 40, 20, 5, 2),
    (5, 35, 50, 1, 1)
]

spark.createDataFrame(players_data, schema=players_schema).createOrReplaceTempView("Players")
spark.createDataFrame(matches_data, schema=matches_schema).createOrReplaceTempView("Matches")

display(spark.table("Players"))
display(spark.table("Matches"))

player_id,group_id
15,1
25,1
30,1
45,1
10,2
35,2
50,2
20,3
40,3


match_id,first_player,second_player,first_score,second_score
1,15,45,3,0
2,30,25,1,2
3,30,15,2,0
4,40,20,5,2
5,35,50,1,1


### Solution

In [0]:
first_player_scores = (
    spark.read.table(("Matches"))
        .select(F.col("first_player").alias("player_id"), "first_score")
        .groupBy("player_id")
        .agg(F.sum("first_score").alias("score"))

)

second_player_scores = (
    spark.read.table(("Matches"))
        .select(F.col("second_player").alias("player_id"), "second_score")
        .groupBy("player_id")
        .agg(F.sum("second_score").alias("score"))

)

scores_total = (
    first_player_scores.unionAll(second_player_scores)
        .groupBy("player_id")
        .agg(F.sum("score").alias("score"))
)

scores_total_with_group_id = (
    scores_total.alias("s")
    .join(spark.table("Players").alias("p"), "player_id", "inner")
)

window = Window.partitionBy("group_id").orderBy(F.col("score").desc(), F.col("player_id").asc())

result = (
    scores_total_with_group_id.withColumn("row_no", F.row_number().over(window))
        .filter("row_no = 1")
        .select("group_id", "player_id")
)

display(result)

group_id,player_id
1,15
2,35
3,40
