In [0]:
%load_ext autoreload
%autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0

Once upon a time, in a boardroom far, far away, a group of bored CEOs decided to play a game of bowling. Being high-school dropouts with limited understanding of maths, they couldn't grok the scoring rules of regular bowling and decided on a simplified version instead.

As the latest hire you're tasked with calculating the scores for the latest meeting. The rules for CEO bowling is very simple:

* Players take turns rolling a ball and trying to down all ten pins in what is called a frame. If they don't down all ten pins on the first roll they get to roll the ball one more time at the remaining pins before passing the turn to the next player.
* The score for a frame is the total number of pins knocked down in that frame.
* There is no limit on the number of frames in a game of CEO-bowling; the game continues until the CEOs are bored and decide on doing something else.

The input to your assignment is a file where each row consists of the name of the CEO and a series of numbers representing all their rolls. Your assignment is to figure out who won and how many points they scored.

To ease your implementation, the CEOs have provided an old scorecard for a short game played years ago.

```
Yatas Del Lana 3 5 3 5 7 2 3 0 10 4 3
Eve Stojbs 3 7 3 3 9 1 6 4 2 3 1 0
```

Summing up the scores you arrive at:

* Yatas Del Lana: 3 + 5 + 3 + 5 + 7 + 2 + 3 + 0 + 10 + 4 + 3 = 45
* Eve Stojbs: 3 + 7 + 3 + 3 + 9 + 1 + 6 + 4 + 2 + 3 + 1 + 0 = 42 

And can declare Yatas Del Lana the winner with 45 points.

Given the scores in the provided series, calculate the winner and their final score. 

In [0]:
from utils import load_input, load_all_input_into_dataframes

input = load_input(3)
players, frames = load_all_input_into_dataframes(spark)
players.write.format("delta").mode("overwrite").saveAsTable("ceo_bowling_players")
frames.write.format("delta").mode("overwrite").saveAsTable("ceo_bowling_frames")

In [0]:
from pyspark.sql.functions import when, sum, col

frames = spark.table("ceo_bowling_frames").where("Game = 1")
players = spark.table("ceo_bowling_players")
frames_with_score = frames \
     .withColumn("Score", frames.Throw1 + when(frames.Throw2.isNull(), 0).otherwise(frames.Throw2)) \
     .withColumn("Strike", when(frames.Throw1 == 10, True).otherwise(False)) \
     .withColumn("Spare", when(frames.Throw1 + frames.Throw2 == 10, True).otherwise(False))

base_scores = frames_with_score \
    .groupBy("PlayerID") \
     .agg(sum(frames_with_score.Score).alias("Score")) \
    .join(players, ["PlayerID",]) \
    .orderBy(col("Score").desc())

display(base_scores)