### **Import Libraries**

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType, DateType
from pyspark.sql.functions import *
from pyspark.sql import Window

### Execute notebook with common/reusable functions 

In [0]:
%run "../01-General/02-CommonFunctions"

### Connect to the storage

In [0]:
wasbs_path = connect()

Remote blob path: wasbs://nfl@bgupb202402juanbarriento.blob.core.windows.net/


### Create Paths

In [0]:
srcDataDirRoot,destDataDirRoot = route(wasbs_path,"","silver","gold")
print(f"Source data dir: {srcDataDirRoot}")
print(f"Destination data dir: {destDataDirRoot}")

Source data dir: wasbs://nfl@bgupb202402juanbarriento.blob.core.windows.net/silver/nfl-2022/
Destination data dir: wasbs://nfl@bgupb202402juanbarriento.blob.core.windows.net/gold/nfl-2022/


## Read all the files from the silver layer

### Silver Datasets

These datasets are provided by the silver layer and contain all the necessary columns and transformations for data exploration. The five datasets we will be using are:

| Column Name              | Description                                                                                               |
|--------------------------|-----------------------------------------------------------------------------------------------------------|
| `nflGamesSilver`                 | General information about NFL games.                                                   |
| `nflPlaysSilver`                 | 	Contains information on the events that occurred during the games.                       |
| `nflPFFScoutingDataSilver`        | 	Information about special team plays in the NFL.                                                                |
| `nflPlayersSilver`                | Basic information about NFL players.                                           |
| `nflTrackingSilver`                   | 	Specific data on players during game events.

---

#### Games Dataframe

In [0]:
# Read the data in delta format
nflGamesSilver = spark.read.format("delta").load(f"{srcDataDirRoot}/games").cache()

# Check the data
display(nflGamesSilver)

#### Plays Dataframe

In [0]:
# Read the data in delta format
nflPlaysSilver = spark.read.format("delta").load(f"{srcDataDirRoot}/plays").cache()

# Check the data
display(nflPlaysSilver)

#### PFFScoutingData Dataframe

In [0]:
# Read the data in delta format
nflPFFScoutingDataSilver = spark.read.format("delta").load(f"{srcDataDirRoot}/PFFScoutingData").cache()

# Check the data
display(nflPFFScoutingDataSilver)

#### Players Dataframe


In [0]:
# Read the data in delta format
nflPlayersSilver = spark.read.format("delta").load(f"{srcDataDirRoot}/players").cache()

# Check the data
display(nflPlayersSilver)

#### Tracking Dataframe

In [0]:
# Read the data in delta format
nflTrackingSilver = spark.read.format("delta").load(f"{srcDataDirRoot}/tracking").cache()

# Check the data
display(nflTrackingSilver)

## Gold Layer Datasets
---
The data from the silver layer will be aggregated, consolidated, and transformed into a format that aligns with specific business objectives. 

This are the new datasets for the gold layer:



The data from the silver layer will be aggregated, consolidated, and transformed into a format that aligns with specific business objectives. 

This are the new datasets for the gold layer

### 1. `Aggregated Games and Plays Table`
- **Description**: A binary column indicating whether the intended kick direction matched the actual kick direction.
- **Usefulness**: This flag helps evaluate the accuracy and execution of special teams plays, providing insights into whether the kicking team executed their strategy correctly. It can also highlight instances where a mismatch might have led to breakdowns in coverage or field position.

### 2. `gold_df_Players_statistics`
- **Description**: A binary column indicating whether the intended return direction matched the actual return direction.
- **Usefulness**: This flag helps in analyzing whether the return team successfully executed their set-up for returns, potentially impacting field position or leading to key return opportunities.

### 3. `gold_df_TeamsData_by_season`
- **Description**: A column representing the total time for the play, summing the snap time, operation time, and hang time.
- **Usefulness**: This metric helps quantify the duration of special teams plays, which can be critical in evaluating the effectiveness of punts, kickoffs, and coverage. Longer play times might indicate deeper kicks or more time for the coverage team to get downfield.

### 4. `gold_df_SpecialTeamPlays_by_games_and_teams`
- **Description**: A binary column indicating whether a tackle was made during the play.
- **Usefulness**: This flag helps identify whether a play ended in a successful tackle, which is critical for analyzing defensive effectiveness, especially on punt and kickoff returns. It can also aid in studying missed tackles and defensive breakdowns.

---

### 1. `Aggregated Games and Plays Table`

- **Description**: The `Aggregated Games and Plays Table` table provides a detailed summary of each game, covering scores, penalties, touchdowns, return plays, yards gained, and tackle performance. This table gives an overview of each match within a season, making it valuable for performance analysis and trend identification at the team and season levels.
- **Usefulness**: This table is essential for high-level analysis of game dynamics and team performance. By grouping play-level data at the game level, it allows for insights into scoring patterns, frequency of key events (like touchdowns and penalties), and efficiency in gaining yards. Additionally, with data available by week and season, it supports trend analysis and team improvement tracking over time.

#### Dataset Columns

#### `gameId`
- **Description**: Unique identifier for the game.

#### `season`
- **Description**: Season in which the game took place.

#### `week`
- **Description**: Week of the season corresponding to the game.

#### `homeTeamAbbr`
- **Description**: Three-letter code for the home team.

#### `visitorTeamAbbr`
- **Description**: Three-letter code for the visiting team.

#### `homeScore`
- **Description**: Maximum score reached by the home team during the game.

#### `visitorScore`
- **Description**: Maximum score reached by the visiting team during the game.

#### `penalty_count`
- **Description**: Total number of penalties in the game.

#### `touchdown_count`
- **Description**: Total number of plays resulting in a touchdown.

#### `return_count`
- **Description**: Total number of return plays in the game.

#### `total_yards_gained`
- **Description**: Total yards gained during the game.

#### `total_successful_tackles`
- **Description**: Total successful tackles made in the game.

#### `total_missed_tackles`
- **Description**: Total missed tackles in the game.

**Join between the necessary tables**

In [0]:
#join the tables on the gameId and select the columns needed
gamesPlaysPFFScoutingDataframe = nflPlaysSilver.join(
  nflGamesSilver, 
  nflPlaysSilver.gameId == nflGamesSilver.gameId, "left"
  ).join(
    nflPFFScoutingDataSilver, 
    (nflPlaysSilver.gameId == nflPFFScoutingDataSilver.gameId) & 
    (nflPlaysSilver.playId == nflPFFScoutingDataSilver.playId), "left"     
  ).select(
    nflPlaysSilver.gameId, nflGamesSilver.season, nflGamesSilver.week,
    nflGamesSilver.homeTeamAbbr, nflGamesSilver.visitorTeamAbbr, 
    nflPlaysSilver.playResult, nflPlaysSilver.is_penalty_flag, 
    nflPlaysSilver.is_Touchdown_flag, nflPlaysSilver.is_ReturnPlay_flag,
    nflPlaysSilver.preSnapVisitorScore, nflPlaysSilver.preSnapHomeScore,
    nflPFFScoutingDataSilver.MissedTacklerCount,nflPFFScoutingDataSilver.is_successfulTackle_flag
  )

display(gamesPlaysPFFScoutingDataframe)

**Aggregations and transformations**

In [0]:
# Create the games statistics table
gold_df_Plays_statistics_by_games = gamesPlaysPFFScoutingDataframe.groupBy(
    "gameid", "season","week","homeTeamAbbr", "visitorTeamAbbr"
).agg(
    max("preSnapHomeScore").alias("homeScore"),                        # Score of the home team
    max("preSnapVisitorScore").alias("visitorScore"),                  # Score of the visitor team
    sum("is_penalty_flag").alias("penalty_count"),                     # Count number of penalty plays 
    sum("is_Touchdown_flag").alias("touchdown_count"),                 # Count number of touchdown plays 
    sum("is_ReturnPlay_flag").alias("return_count"),                   # Count number of return plays 
    sum("playResult").alias("total_yards_gained"),                     # Number of total yards gain in the game
    sum("is_successfulTackle_flag").alias("total_successful_tackles"), # Number of successful tackles
    sum("MissedTacklerCount").alias("total_missed_tackles")            # Number of missed tackles   
).orderBy(
    "season", "week", "gameid"
)

display(gold_df_Plays_statistics_by_games)

### Save the data in the **Golden** Layer

In [0]:
#Delete any residual data from prior executions for an idempotent run
dbutils.fs.rm(f"{destDataDirRoot}/aggregated_games_and_plays",recurse=True)

True

In [0]:
#To make Hive Parquet format compatible with Spark Parquet format
sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")

#Save the dataset
gold_df_Plays_statistics_by_games.write.format("delta").mode("overwrite").partitionBy("season","week").save(f"{destDataDirRoot}/aggregated_games_and_plays")

### 2. `Player Performance and Participation Table`

- **Description**: The `Player Performance and Participation Table` provides a comprehensive view of each player's performance within each game and season.
- **Usefulness**: This table is valuable for assessing individual player impact in games, including performance metrics like speed and acceleration, as well as endurance measures through distance traveled. It is particularly useful for coaches, analysts, and scouts to evaluate player stamina, play style, and potential for improvement in real-time game scenarios.

#### Dataset Columns

#### `nflId`
- **Description**: Unique identifier for each player across games.

#### `gameId`
- **Description**: Identifier for the game in which the player participated.

#### `season`
- **Description**: Season in which the game took place.

#### `displayName`
- **Description**: Full name of the player.

#### `Position`
- **Description**: Position played by the player in the team.

#### `avg_speed`
- **Description**: Average speed (yards/second) of the player throughout the game.

#### `max_speed`
- **Description**: Maximum speed reached by the player in the game.

#### `avg_acceleration`
- **Description**: Average acceleration (yards/second²) of the player during the game.

#### `max_acceleration`
- **Description**: Maximum acceleration achieved by the player throughout the game.

#### `total_distance`
- **Description**: Total distance (yards) traveled by the player during the game.

#### `total_plays`
- **Description**: Total number of plays in which the player participated during the game.


**Join between the necessary tables**

In [0]:
# Join the tables of players, tracking, and PFFScoutingData on nflId and playId and select the columns needed
gold_df_Players_Tracking_PFFScoutingData = nflTrackingSilver.join(
  nflPlayersSilver, 
  nflTrackingSilver.nflId == nflPlayersSilver.nflId, "left"

).join(
  nflPFFScoutingDataSilver, 
  (nflTrackingSilver.playId == nflPFFScoutingDataSilver.playId) &
  (nflTrackingSilver.gameId == nflPFFScoutingDataSilver.gameId), "left"
).join(
  nflGamesSilver,
  nflTrackingSilver.gameId == nflGamesSilver.gameId
).select(
  nflTrackingSilver.nflId,nflPlayersSilver.displayName, nflPlayersSilver.Position,
  nflTrackingSilver.gameId,nflGamesSilver.season, nflTrackingSilver.playId,
  nflTrackingSilver.s, nflTrackingSilver.a, nflTrackingSilver.dis, 
  nflPFFScoutingDataSilver.is_kickDirection_correct_flag,
  nflPFFScoutingDataSilver.TotalPlayTime, 
  nflPFFScoutingDataSilver.is_successfulTackle_flag
)

display(gold_df_Players_Tracking_PFFScoutingData)

**Aggregations and transformations**

In [0]:
# Create the Player Statistics table
gold_df_Players_statistics = gold_df_Players_Tracking_PFFScoutingData.groupBy(
    "nflId", "gameid","season"
).agg(
    first("displayName").alias("displayName"),    # Player name
    first("Position").alias("Position"),          # Player position
    avg("s").alias("avg_speed"),                  # Average speed of the player
    max("s").alias("max_speed"),                  # Max speed of the player
    avg("a").alias("avg_acceleration"),           # Average acceleration of the player
    max("a").alias("max_acceleration"),           # Max acceleration of the player
    sum("dis").alias("total_distance"),           # Total distance traveled by the player
    countDistinct("playid").alias("total_plays")  # Total number of plays
).orderBy(
    "nflId", "gameid","season"
)

gold_df_Players_statistics = gold_df_Players_statistics.na.drop()

display(gold_df_Players_statistics)

### Save the data in the **Golden** Layer

In [0]:
#Delete any residual data from prior executions for an idempotent run
dbutils.fs.rm(f"{destDataDirRoot}/player_performance_participation_by_games",recurse=True)

True

In [0]:
#To make Hive Parquet format compatible with Spark Parquet format
sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")

#Save the dataset
gold_df_Players_statistics.write.format("delta").mode("overwrite").partitionBy("season").save(f"{destDataDirRoot}/player_performance_participation_by_games")

### 3. `Team Season Performance By Season Table`

- **Description**: The `Team Season Performance Summary Table` aggregates performance metrics for each team across an entire season, focusing on offensive and defensive accomplishments, penalties, and success in gaining yards and scoring touchdowns. This table captures how each team performed cumulatively across games in various aspects such as yardage gained, penalties, and successful plays.
- **Usefulness**: This table is critical for season-wide analysis, allowing for a thorough examination of team strengths, weaknesses, and consistency across different game metrics. It supports comparative analysis of team performance throughout a season, aiding analysts, coaches, and strategists in evaluating overall team effectiveness.

#### Dataset Columns

#### `possessionTeam`
- **Description**: Abbreviation representing the team in possession during each play.

#### `season`
- **Description**: Season year for which the performance data is summarized.

#### `total_games`
- **Description**: Total number of games played by the team in the season.

#### `total_yards_gain`
- **Description**: Total yards gained by the team, including yards awarded from penalties.

#### `total_penalty_yards`
- **Description**: Sum of yards lost due to penalties committed by the team.

#### `total_yards_gain_from_kick`
- **Description**: Total yards gained through kicking plays.

#### `total_yards_returned_from_kick`
- **Description**: Total yards gained by the team through returns following opponent kicks.

#### `total_successful_plays`
- **Description**: Total number of plays deemed successful for the team.

#### `total_touchdowns`
- **Description**: Total touchdowns scored by the team throughout the season.

#### `total_return_plays`
- **Description**: Total number of plays where a return was attempted.

#### `total_penalties`
- **Description**: Total number of penalties committed by the team across all games.

**Join between the necessary tables**

In [0]:
#join the tables on the gameId and select the columns needed
gamesPlaysDataframe = nflPlaysSilver.join(
  nflGamesSilver, 
  nflPlaysSilver.gameId == nflGamesSilver.gameId, "left"
  ).select(
    nflPlaysSilver.gameId,nflPlaysSilver.possessionTeam, nflGamesSilver.season,
    nflPlaysSilver.playId,nflPlaysSilver.playResult,nflPlaysSilver.is_penalty_flag, 
    nflPlaysSilver.is_Touchdown_flag, nflPlaysSilver.is_ReturnPlay_flag,
    nflPlaysSilver.penaltyYards, nflPlaysSilver.kickLength,
    nflPlaysSilver.is_SuccessfulPlay_flag, nflPlaysSilver.kickReturnYardage,
  )

display(gamesPlaysDataframe)

**Aggregations and transformations**

In [0]:
# Create the Player Statistics table
gold_df_TeamsData_by_season = gamesPlaysDataframe.groupBy(
    "possessionTeam","season"
).agg(
    countDistinct("gameId").alias("total_games"),                     # Total number of games
    sum("playResult").alias("total_yards_gain"),                      # Total yards gained with penties
    sum("penaltyYards").alias("total_penalty_yards"),                 # Total penalty yards
    sum("kickLength").alias("total_yards_gain_from_kick"),            # Total yards from kicking
    sum("kickReturnYardage").alias("total_yards_returned_from_kick"), # Total yards returned from kicking
    sum("is_SuccessfulPlay_flag").alias("total_successful_plays"),    # Total successful plays
    sum("is_Touchdown_flag").alias("total_touchdowns"),               # Total touchdowns
    sum("is_ReturnPlay_flag").alias("total_return_plays"),            # Total return plays
    sum("is_penalty_flag").alias("total_penalties")                   # Total penalties

).orderBy(
    gamesPlaysDataframe.season, gamesPlaysDataframe.possessionTeam
    )

display(gold_df_TeamsData_by_season)

possessionTeam,season,total_games,total_yards_gain,total_penalty_yards,total_yards_gain_from_kick,total_yards_returned_from_kick,total_successful_plays,total_touchdowns,total_return_plays,total_penalties
ARI,2018,16,6178,37,8850,994,172,1,58,13
ATL,2018,16,5905,18,9630,1002,191,0,58,12
BAL,2018,16,6027,49,10383,1060,189,1,59,11
BUF,2018,16,5453,23,8627,1051,165,0,59,19
CAR,2018,16,5351,32,8740,542,175,1,32,5
CHI,2018,16,5891,53,10026,698,194,2,39,7
CIN,2018,16,5878,-9,9160,925,186,1,53,17
CLE,2018,16,6327,100,9964,961,192,0,51,15
DAL,2018,16,5201,1,9195,924,167,1,49,7
DEN,2018,15,5537,32,8636,899,174,1,51,13


### Save the data in the **Golden** Layer

In [0]:
#Delete any residual data from prior executions for an idempotent run
dbutils.fs.rm(f"{destDataDirRoot}/team_performance_by_season",recurse=True)

False

In [0]:
#To make Hive Parquet format compatible with Spark Parquet format
sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")

#Save the dataset
gold_df_TeamsData_by_season.write.format("delta").mode("overwrite").partitionBy("season").save(f"{destDataDirRoot}/team_performance_by_season")

### 4. `Special Team Plays Performance by Game and Team`

- **Description**: The `Special Team Plays Performance by Game and Team` aggregates key statistics related to special teams' plays, such as kickoffs, punts, and return attempts, for each team in every game. This table provides insights into specific aspects of special teams' performance, like kick accuracy, return accuracy, and efficiency in gaining yards through kicks.
- **Usefulness**: This table is essential for evaluating the effectiveness of special teams' strategies and execution in each game. By tracking metrics such as kick length, operation time, and hang time, teams can refine special teams' tactics and address areas for improvement. It enables both game-level and team-level analysis, aiding in adjustments for future games and highlighting consistent strengths or weaknesses.

#### Dataset Columns

#### `gameId`
- **Description**: Unique identifier for the game.

#### `possessionTeam`
- **Description**: Abbreviation for the team that possessed the ball during the special team's play.

#### `Total_Special_Team_Plays`
- **Description**: Total number of special teams plays conducted by the possession team in the game.

#### `total_yards_returned_from_kick`
- **Description**: Total yards gained by the possession team from returns on kicks.

#### `Total_Operation_Time`
- **Description**: Cumulative time taken from the snap to kick across all special teams plays.

#### `Total_Hang_Time`
- **Description**: Cumulative hang time for all special teams kicks made by the possession team.

#### `total_succesfull_kick_direction`
- **Description**: Total count of kicks executed in the intended direction.

#### `total_succesfull_return_direction`
- **Description**: Total count of returns made in the intended direction.

#### `total_kick_length`
- **Description**: Total distance of all kicks made by the possession team during the game.

#### `total_successful_plays`
- **Description**: Total number of special teams plays considered successful for the possession team.







**Join between the necessary tables**

In [0]:
PlaysPFFScoutingDataframe = nflPFFScoutingDataSilver.join(
  nflPlaysSilver,
  (nflPFFScoutingDataSilver.gameId == nflPlaysSilver.gameId) & 
  (nflPFFScoutingDataSilver.playId == nflPlaysSilver.playId),
).select(
  nflPlaysSilver.gameId, nflPlaysSilver.playId,nflPlaysSilver.possessionTeam,
  nflPlaysSilver.kickReturnYardage,nflPlaysSilver.is_SuccessfulPlay_flag,
  nflPlaysSilver.kickLength,nflPFFScoutingDataSilver.operationTime, 
  nflPFFScoutingDataSilver.is_kickDirection_correct_flag,
  nflPFFScoutingDataSilver.is_ReturnDirection_correct_flag,
  nflPFFScoutingDataSilver.hangTime
)

display(PlaysPFFScoutingDataframe)

**Aggregations and transformations**

In [0]:
gold_df_SpecialTeamPlays_by_games_and_teams = PlaysPFFScoutingDataframe.groupBy(
    "gameId", "possessionTeam"
).agg(
    countDistinct("playId").alias("Total_Special_Team_Plays"),                         # Total Special Team Plays
    sum("kickReturnYardage").alias("total_yards_returned_from_kick"),                  # Total yards returned from kick
    sum("operationTime").alias("Total_Operation_Time"),                                # Total operation time
    sum("hangTime").alias("Total_Hang_Time"),                                          # Total hang time
    sum("is_kickDirection_correct_flag").alias("total_succesfull_kick_direction"),     # Count of succesfull kick direction
    sum("is_ReturnDirection_correct_flag").alias("total_succesfull_return_direction"), # Count of succesfull return direction
    sum("kickLength").alias("total_kick_length"),                                      # Total kick length
    sum("is_SuccessfulPlay_flag").alias("total_successful_plays")                      # Total successful plays
).orderBy(
    PlaysPFFScoutingDataframe.gameId, PlaysPFFScoutingDataframe.possessionTeam
)
display(gold_df_SpecialTeamPlays_by_games_and_teams)


### Save the data in the **Golden** Layer

In [0]:
#Delete any residual data from prior executions for an idempotent run
dbutils.fs.rm(f"{destDataDirRoot}/specialteamplays_by_games_and_teams",recurse=True)

False

In [0]:
#To make Hive Parquet format compatible with Spark Parquet format
sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")

#Save the dataset
gold_df_TeamsData_by_season.write.format("delta").mode("overwrite").partitionBy("season").save(f"{destDataDirRoot}/specialteamplays_by_games_and_teams")