# MSBX 5420 Assignment 2
## Task 1 - Warm Up with Spark

### Replicate the MapReduce Calculations in Assignment 1 with Spark RDD, DataFrame and SQL
As a warm-up, your first task will be replicating the same MapReduce jobs in assignment 1 with spark RDD and DataFrame/SQL.
First, let's load the NFL dataset into Spark. For convenience we will use spark session and dataframe.

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').config("spark.driver.memory", "2g").appName('spark_nfl_data').getOrCreate()

If you look at the NFL dataset, those missing values we have tried to skip in assignment 1 are "NA". By default, spark will treat empty or null as missing values, so here we need to let spark treat "NA" as the missing values.

In [3]:
df_nfl = spark.read.options(header=True, nullValue='NA', inferSchema=True).csv('./NFL_Play_by_Play_2009-2018.csv')
df_nfl.show(5)

+-------+----------+---------+---------+-------+------------+-------+-------------+----------+-------------------------+----------------------+----------------------+---------+----+----------+---------+------------+----------------+----------------+-------------+-------------+------------------+---------------------+------------------+---------+--------------+--------------+----------------+
|play_id|   game_id|home_team|away_team|posteam|posteam_type|defteam|side_of_field| game_date|quarter_seconds_remaining|half_seconds_remaining|game_seconds_remaining|game_half|down|goal_to_go|play_type|yards_gained|total_home_score|total_away_score|posteam_score|defteam_score|score_differential|fourth_down_converted|fourth_down_failed|touchdown|pass_touchdown|rush_touchdown|return_touchdown|
+-------+----------+---------+---------+-------+------------+-------+-------------+----------+-------------------------+----------------------+----------------------+---------+----+----------+---------+--------

In [4]:
#let's check and clean the data with dataframe
print(df_nfl.count())
print(df_nfl.distinct().count())

449371
446982


In [5]:
#drop duplicate rows
df_nfl = df_nfl.dropDuplicates()

In [6]:
#let's look at the data
df_nfl.printSchema()

root
 |-- play_id: integer (nullable = true)
 |-- game_id: integer (nullable = true)
 |-- home_team: string (nullable = true)
 |-- away_team: string (nullable = true)
 |-- posteam: string (nullable = true)
 |-- posteam_type: string (nullable = true)
 |-- defteam: string (nullable = true)
 |-- side_of_field: string (nullable = true)
 |-- game_date: date (nullable = true)
 |-- quarter_seconds_remaining: integer (nullable = true)
 |-- half_seconds_remaining: double (nullable = true)
 |-- game_seconds_remaining: double (nullable = true)
 |-- game_half: string (nullable = true)
 |-- down: integer (nullable = true)
 |-- goal_to_go: integer (nullable = true)
 |-- play_type: string (nullable = true)
 |-- yards_gained: integer (nullable = true)
 |-- total_home_score: integer (nullable = true)
 |-- total_away_score: integer (nullable = true)
 |-- posteam_score: integer (nullable = true)
 |-- defteam_score: integer (nullable = true)
 |-- score_differential: integer (nullable = true)
 |-- fourth_d

In [7]:
#take a look at the dta via pandas
import pandas as pd
#disable the row/column limits to not truncate the displayed data
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
df_nfl.limit(20).toPandas().head(20)

Unnamed: 0,play_id,game_id,home_team,away_team,posteam,posteam_type,defteam,side_of_field,game_date,quarter_seconds_remaining,half_seconds_remaining,game_seconds_remaining,game_half,down,goal_to_go,play_type,yards_gained,total_home_score,total_away_score,posteam_score,defteam_score,score_differential,fourth_down_converted,fourth_down_failed,touchdown,pass_touchdown,rush_touchdown,return_touchdown
0,2781,2009091304,CLE,MIN,CLE,home,MIN,CLE,2009-09-13,900,900.0,900.0,Half2,1,0,pass,12,12,24,12,24,-12,0,0,0,0,0,0
1,868,2009091307,NO,DET,NO,home,DET,NO,2009-09-13,91,991.0,2791.0,Half1,3,0,pass,0,14,3,14,3,11,0,0,0,0,0,0
2,1738,2009091305,HOU,NYJ,HOU,home,NYJ,HOU,2009-09-13,120,120.0,1920.0,Half1,2,0,pass,7,0,10,0,10,-10,0,0,0,0,0,0
3,158,2009091301,BAL,KC,BAL,home,KC,BAL,2009-09-13,815,1715.0,3515.0,Half1,1,0,run,4,0,0,0,0,0,0,0,0,0,0,0
4,1327,2009091300,ATL,MIA,ATL,home,MIA,ATL,2009-09-13,322,322.0,2122.0,Half1,1,0,run,3,7,0,7,0,7,0,0,0,0,0,0
5,1765,2009091309,ARI,SF,ARI,home,SF,ARI,2009-09-13,420,420.0,2220.0,Half1,1,0,run,3,3,6,3,6,-3,0,0,0,0,0,0
6,2483,2009091309,ARI,SF,SF,away,ARI,SF,2009-09-13,900,1800.0,1800.0,Half2,1,0,run,4,6,13,13,6,7,0,0,0,0,0,0
7,4094,2009091309,ARI,SF,SF,away,ARI,SF,2009-09-13,331,331.0,331.0,Half2,1,0,run,2,16,20,20,16,4,0,0,0,0,0,0
8,2354,2009091311,SEA,STL,SEA,home,STL,SEA,2009-09-13,781,1681.0,1681.0,Half2,3,0,pass,6,14,0,14,0,14,0,0,0,0,0,0
9,115,2009091310,NYG,WAS,NYG,home,WAS,NYG,2009-09-13,823,1723.0,3523.0,Half1,3,0,pass,7,0,0,0,0,0,0,0,0,0,0,0


In [8]:
#let's also see the number of partitions
rdd_nfl = df_nfl.rdd
rdd_nfl.getNumPartitions()

4

In [9]:
#convert the dataset to a RDD
rdd_nfl = rdd_nfl.repartition(4)

Now we have the RDD converted from Dataframe so we can do RDD operations with `rdd_nfl`. Now let's replicate the two calculations (1) number of plays in each game (2) average yarns gained in each game.

In [10]:
#mapreduce with spark RDD for sum of plays
#sum of plays = rdd.reduceByKey(lambda x,y: x + y)
plays_per_game = rdd_nfl.map(lambda x: (x['game_id'],1)).reduceByKey(lambda x, y: x+y)

results_sum = plays_per_game.collect()
for game_id, num_plays in results_sum[:20]:
    print(f'Game ID: {game_id}, Plays: {num_plays}')

Game ID: 2009092712, Plays: 154
Game ID: 2009092800, Plays: 164
Game ID: 2009100404, Plays: 185
Game ID: 2009100408, Plays: 165
Game ID: 2009102508, Plays: 162
Game ID: 2009102500, Plays: 163
Game ID: 2009113000, Plays: 161
Game ID: 2009120300, Plays: 165
Game ID: 2009120608, Plays: 197
Game ID: 2010010304, Plays: 184
Game ID: 2010010312, Plays: 192
Game ID: 2010092612, Plays: 188
Game ID: 2010092700, Plays: 159
Game ID: 2010100300, Plays: 172
Game ID: 2010101800, Plays: 166
Game ID: 2010102500, Plays: 188
Game ID: 2010103104, Plays: 196
Game ID: 2010100400, Plays: 167
Game ID: 2010101000, Plays: 169
Game ID: 2010101008, Plays: 173


In [11]:
#mapreduce with spark RDD for average yards gained
#hint: there are "NA"s (None) in the column "yards_gained"; after into dataframe and transformed as RDD, "NA" is None now, and others are integers
#avg yards gained = rdd.reduceByKey(lambda x
yards_per_game = rdd_nfl.map(lambda x: (x[1], (0 if x['yards_gained'] is None else x['yards_gained'], 1))) \
                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# Calculate average yards gained per game
average_yards_per_game = yards_per_game.mapValues(lambda x: x[0] / x[1])

# Attempt to collect and print the results, handling any potential errors gracefully - thx Chat :)
try:
    results_avg = average_yards_per_game.collect()
    for game_id, avg_yards in results_avg[:20]:  # Adjust as necessary to print more results
        print(f'Game ID: {game_id}, Average Yards Gained: {avg_yards}')
except Exception as e:
    print(f"An error occurred: {e}")

Game ID: 2009092712, Average Yards Gained: 3.3051948051948052
Game ID: 2009092800, Average Yards Gained: 4.390243902439025
Game ID: 2009100404, Average Yards Gained: 4.448648648648649
Game ID: 2009100408, Average Yards Gained: 3.2848484848484847
Game ID: 2009102508, Average Yards Gained: 4.382716049382716
Game ID: 2009102500, Average Yards Gained: 4.460122699386503
Game ID: 2009113000, Average Yards Gained: 5.254658385093168
Game ID: 2009120300, Average Yards Gained: 3.1818181818181817
Game ID: 2009120608, Average Yards Gained: 4.659898477157361
Game ID: 2010010304, Average Yards Gained: 4.168478260869565
Game ID: 2010010312, Average Yards Gained: 5.395833333333333
Game ID: 2010092612, Average Yards Gained: 3.143617021276596
Game ID: 2010092700, Average Yards Gained: 4.119496855345912
Game ID: 2010100300, Average Yards Gained: 3.877906976744186
Game ID: 2010101800, Average Yards Gained: 3.4518072289156625
Game ID: 2010102500, Average Yards Gained: 4.00531914893617
Game ID: 2010103104, 

Next, let's do it with spark dataframe and SQL. The dataframe is `df_nfl`.

In [12]:
#use dataframe operations/api
df_nfl.createOrReplaceTempView('nfl_table')


In [13]:
#Number of plays in each game
plays_per_game_sql = """
SELECT game_id, COUNT(*) as num_plays
FROM nfl_table
GROUP BY game_id
"""

plays_per_game_result_sql = spark.sql(plays_per_game_sql)
plays_per_game_result_sql.show()

+----------+---------+
|   game_id|num_plays|
+----------+---------+
|2011103008|      181|
|2009092011|      164|
|2010110703|      157|
|2010112111|      161|
|2012111100|      178|
|2013101302|      186|
|2014092105|      177|
|2013110700|      173|
|2015110810|      202|
|2016102301|      165|
|2017102208|      163|
|2017112609|      188|
|2018092303|      174|
|2018111110|      184|
|2018120205|      171|
|2015121312|      180|
|2009122012|      163|
|2010100304|      165|
|2009121302|      166|
|2011010203|      170|
+----------+---------+
only showing top 20 rows



In [14]:
#use spark sql - don't forget to create temp view for df_nfl before querying the dataframe
avg_yards_sql = '''
SELECT game_id, AVG(CAST(yards_gained AS FLOAT)) as avg_yards
FROM nfl_table
GROUP BY game_id
'''

avg_yards_per_game_sql = spark.sql(avg_yards_sql)
avg_yards_per_game_sql.show()

+----------+------------------+
|   game_id|         avg_yards|
+----------+------------------+
|2011103008|3.5359116022099446|
|2009092011|3.5548780487804876|
|2010110703| 4.840764331210191|
|2010112111| 3.031055900621118|
|2012111100|3.4269662921348316|
|2013101302| 4.091397849462366|
|2014092105|3.0395480225988702|
|2013110700| 4.277456647398844|
|2015110810| 4.306930693069307|
|2016102301|5.5212121212121215|
|2017102208|3.6748466257668713|
|2017112609|  3.00531914893617|
|2018092303| 4.632183908045977|
|2018111110| 4.728260869565218|
|2018120205| 2.783625730994152|
|2015121312|3.9166666666666665|
|2009122012|3.8895705521472395|
|2010100304| 4.193939393939394|
|2009121302|3.4397590361445785|
|2011010203|3.7823529411764705|
+----------+------------------+
only showing top 20 rows



## Task 2 - Data Analytics with Spark DataFrame and SQL
### Answer four data analytics questions on NFL dataset to solve the problems
With the NFL Dataframe `df_nfl`, use either dataframe operations/API or spark SQL to answer the following questions.
First of all, let's build a data viewer to look at the data so we can understand the values better.

In [15]:
#build a data viewer to check data for a game; the list basically contains all the columns to use in this task
game_info_all = ['play_id', 'game_id', 'home_team', 'away_team', 'game_date', 
                 'posteam', 'posteam_type', 'defteam',
                 'total_home_score', 'total_away_score',
                 'touchdown', 'pass_touchdown', 'rush_touchdown', 'return_touchdown']
#df_nfl.select(game_info).limit(200).toPandas().head(200)
df_nfl.select(game_info_all).where('game_id = 2018111110').toPandas().head(200)

Unnamed: 0,play_id,game_id,home_team,away_team,game_date,posteam,posteam_type,defteam,total_home_score,total_away_score,touchdown,pass_touchdown,rush_touchdown,return_touchdown
0,1625,2018111110,LA,SEA,2018-11-11,LA,home,SEA,16,14,1.0,0.0,1.0,0.0
1,3405,2018111110,LA,SEA,2018-11-11,SEA,away,LA,29,24,0.0,0.0,0.0,0.0
2,3594,2018111110,LA,SEA,2018-11-11,SEA,away,LA,36,24,0.0,0.0,0.0,0.0
3,3918,2018111110,LA,SEA,2018-11-11,LA,home,SEA,36,31,0.0,0.0,0.0,0.0
4,653,2018111110,LA,SEA,2018-11-11,SEA,away,LA,7,7,0.0,0.0,0.0,0.0
5,2669,2018111110,LA,SEA,2018-11-11,LA,home,SEA,20,21,0.0,0.0,0.0,0.0
6,1120,2018111110,LA,SEA,2018-11-11,LA,home,SEA,10,14,0.0,0.0,0.0,0.0
7,4083,2018111110,LA,SEA,2018-11-11,SEA,away,LA,36,31,0.0,0.0,0.0,0.0
8,3120,2018111110,LA,SEA,2018-11-11,SEA,away,LA,26,21,0.0,0.0,0.0,0.0
9,190,2018111110,LA,SEA,2018-11-11,SEA,away,LA,0,0,0.0,0.0,0.0,0.0


Now you are going to answer the following questions using spark dataframe or spark SQL. You can choose either one to solve the problem and output results.
1. Which game(s) has the highest number of plays from 2009 to 2018? And which game has the highest final score difference?

In [16]:
from pyspark.sql import functions as fn
from pyspark.sql import Window

#you need to show the game info with the highest plays, so let's obtain game level information
game_info = ['game_id', 'home_team', 'away_team', 'game_date', 'total_home_score', 'total_away_score']
#because we need the final scores for each game as game level info, we can do that by filtering the maxiumn play id to get game level info
window = Window.partitionBy('game_id')
nfl_game_info = df_nfl.withColumn("max_play_id", fn.max("play_id").over(window)).filter("max_play_id = play_id").drop("max_play_id").select(game_info)
nfl_game_info.show()

+----------+---------+---------+----------+----------------+----------------+
|   game_id|home_team|away_team| game_date|total_home_score|total_away_score|
+----------+---------+---------+----------+----------------+----------------+
|2009091300|      ATL|      MIA|2009-09-13|              19|               7|
|2009091400|       NE|      BUF|2009-09-14|              25|              23|
|2009091401|      OAK|       SD|2009-09-14|              20|              24|
|2009092001|      DET|      MIN|2009-09-20|              13|              27|
|2009092004|       KC|      OAK|2009-09-20|              10|              13|
|2009092010|       SF|      SEA|2009-09-20|              23|              10|
|2009092011|      CHI|      PIT|2009-09-20|              17|              14|
|2009092012|      DEN|      CLE|2009-09-20|              27|               6|
|2009092013|       SD|      BAL|2009-09-20|              26|              31|
|2009092100|      MIA|      IND|2009-09-21|              23|    

In [17]:
#get number of plays in each game
nfl_num_play = df_nfl.groupBy('game_id').agg(fn.count('play_id').alias('num_plays'))
nfl_num_play.show()

#join the two dataframes
nfl_game_info = nfl_game_info.join(nfl_num_play, 'game_id')

+----------+---------+
|   game_id|num_plays|
+----------+---------+
|2011103008|      181|
|2009092011|      164|
|2010110703|      157|
|2010112111|      161|
|2012111100|      178|
|2013101302|      186|
|2014092105|      177|
|2013110700|      173|
|2015110810|      202|
|2016102301|      165|
|2017102208|      163|
|2017112609|      188|
|2018092303|      174|
|2018111110|      184|
|2018120205|      171|
|2015121312|      180|
|2009122012|      163|
|2010100304|      165|
|2009121302|      166|
|2011010203|      170|
+----------+---------+
only showing top 20 rows



In [18]:
#[Your Code] to get the game with highest number of plays
highest_game_df = nfl_game_info.orderBy(nfl_game_info.num_plays.desc()).limit(1)

# To show the result
highest_game_df.show()

+----------+---------+---------+----------+----------------+----------------+---------+
|   game_id|home_team|away_team| game_date|total_home_score|total_away_score|num_plays|
+----------+---------+---------+----------+----------------+----------------+---------+
|2011120406|       NO|      DET|2011-12-04|              52|              38|      272|
+----------+---------+---------+----------+----------------+----------------+---------+



In [19]:
#now it is the score difference
nfl_game_info = nfl_game_info.withColumn('score_diff', fn.abs(nfl_game_info['total_home_score'] - nfl_game_info['total_away_score']))

In [20]:
#[Your Code] to get the game with highest score difference
highest_game_df = nfl_game_info.orderBy(nfl_game_info.score_diff.desc()).limit(1)

# To show the result
highest_game_df.show()

+----------+---------+---------+----------+----------------+----------------+---------+----------+
|   game_id|home_team|away_team| game_date|total_home_score|total_away_score|num_plays|score_diff|
+----------+---------+---------+----------+----------------+----------------+---------+----------+
|2009101810|       NE|      TEN|2009-10-18|              59|               0|      175|        59|
+----------+---------+---------+----------+----------------+----------------+---------+----------+



2. On average how many plays are needed for a successful touchdown? And how many plays are needed for home team and away team, respectively?

In [21]:
nfl_game_play = df_nfl.groupBy('game_id').agg(fn.count('play_id').alias('total_plays'), fn.sum('touchdown').alias('total_touchdowns'))
nfl_game_play.show()

+----------+-----------+----------------+
|   game_id|total_plays|total_touchdowns|
+----------+-----------+----------------+
|2011103008|        181|               4|
|2009092011|        164|               4|
|2010110703|        157|               6|
|2010112111|        161|               3|
|2012111100|        178|               6|
|2013101302|        186|               6|
|2014092105|        177|               1|
|2013110700|        173|               7|
|2015110810|        202|               7|
|2016102301|        165|               6|
|2017102208|        163|               4|
|2017112609|        188|               5|
|2018092303|        174|               5|
|2018111110|        184|               8|
|2018120205|        171|               0|
|2015121312|        180|               5|
|2009122012|        163|               5|
|2010100304|        165|               6|
|2009121302|        166|               4|
|2011010203|        170|               6|
+----------+-----------+----------

In [22]:
#[Your Code] to take average for total_plays/total_touchdowns
nfl_game_play.createOrReplaceTempView("nfl_game_play_view")

In [26]:
avg_plays_sql = spark.sql('''
SELECT AVG(total_plays/total_touchdowns)
FROM nfl_game_play_view
''')

avg_plays_sql.show()

+-------------------------------------+
|avg((total_plays / total_touchdowns))|
+-------------------------------------+
|                    43.57610031013333|
+-------------------------------------+



In [33]:
nfl_team_play = df_nfl.groupBy('game_id', 'posteam_type').agg(fn.count('play_id').alias('total_plays'), fn.sum('touchdown').alias('total_touchdowns'))
nfl_team_play.show()

+----------+------------+-----------+----------------+
|   game_id|posteam_type|total_plays|total_touchdowns|
+----------+------------+-----------+----------------+
|2010091912|        home|         95|               0|
|2010112106|        home|         75|               0|
|2011100909|        home|        102|               3|
|2009092003|        home|         90|               3|
|2010121213|        home|         85|               3|
|2012110410|        home|         68|               1|
|2013092212|        home|         86|               6|
|2012120911|        away|         91|               2|
|2013120106|        home|         99|               3|
|2012010110|        home|         74|               6|
|2013112407|        home|         73|               4|
|2014101203|        home|         77|               4|
|2011120800|        home|         72|               2|
|2014090705|        home|         94|               3|
|2015091400|        home|         98|               2|
|201609110

In [34]:
#[Your Code] to take average for total_plays/total_touchdowns by posteam_type
nfl_team_play.createOrReplaceTempView("nfl_team_play_view")

In [35]:
query = spark.sql('''
SELECT DISTINCT(posteam_type)
FROM nfl_team_play_view
WHERE posteam_type IS NOT NULL
''')
query.show()

+------------+
|posteam_type|
+------------+
|        away|
|        home|
+------------+



In [37]:
avg_posteam_plays_sql = spark.sql('''
SELECT posteam_type, AVG(total_plays/total_touchdowns)
FROM nfl_team_play_view
WHERE posteam_type IS NOT NULL
GROUP BY posteam_type
''')

avg_posteam_plays_sql.show()

+------------+-------------------------------------+
|posteam_type|avg((total_plays / total_touchdowns))|
+------------+-------------------------------------+
|        away|                    42.94638401085656|
|        home|                    40.42116942690338|
+------------+-------------------------------------+



3. For touchdown, which type happened more likely on average, rush touchdown, pass touchdown or return touchdown? Are the probabilities different by home and away team?

In [39]:
#[Your Code] to calculate total touchdowns and total of each type of touchdowns by game
#then take average for each type of touchdown divided by total touchdowns
nfl_touch_prob = df_nfl.groupBy('game_id','posteam_type').agg(fn.sum('touchdown').alias('total_touchdowns'), fn.sum('pass_touchdown').alias('total_pass_touchdowns'), fn.sum('rush_touchdown').alias('total_rush_touchdowns'), fn.sum('return_touchdown').alias('total_return_touchdowns'))
nfl_touch_prob.show()

+----------+------------+----------------+---------------------+---------------------+-----------------------+
|   game_id|posteam_type|total_touchdowns|total_pass_touchdowns|total_rush_touchdowns|total_return_touchdowns|
+----------+------------+----------------+---------------------+---------------------+-----------------------+
|2010091912|        home|               0|                    0|                    0|                      0|
|2010112106|        home|               0|                    0|                    0|                      0|
|2011100909|        home|               3|                    1|                    2|                      0|
|2009092003|        home|               3|                    2|                    0|                      0|
|2010121213|        home|               3|                    2|                    1|                      0|
|2012110410|        home|               1|                    0|                    1|                      0|
|

In [40]:
nfl_touch_prob.createOrReplaceTempView("nfl_touch_prob_view")

In [41]:
query = spark.sql('''
SELECT AVG(total_pass_touchdowns)
FROM nfl_touch_prob_view
''')
query.show()

+--------------------------+
|avg(total_pass_touchdowns)|
+--------------------------+
|         1.520878685929151|
+--------------------------+



In [45]:
prob_sql = spark.sql('''
SELECT AVG(total_pass_touchdowns/total_touchdowns) AS Prob_Pass, AVG(total_rush_touchdowns/total_touchdowns) AS Prob_Rush, AVG(total_return_touchdowns/total_touchdowns) AS Prob_Return
FROM nfl_touch_prob_view
''')
prob_sql.show()

+------------------+------------------+--------------------+
|         Prob_Pass|         Prob_Rush|         Prob_Return|
+------------------+------------------+--------------------+
|0.6096223265048273|0.3095852259623573|0.055884697530307184|
+------------------+------------------+--------------------+



In [44]:
#[Your Code] to calculate total touchdowns and total of each type of touchdowns by game and posteam_type
#then take average for each type of touchdown divided by total touchdowns by posteam_type
prob_sql = spark.sql('''
SELECT posteam_type, AVG(total_pass_touchdowns/total_touchdowns) AS Prob_Pass, AVG(total_rush_touchdowns/total_touchdowns) AS Prob_Rush, AVG(total_return_touchdowns/total_touchdowns) AS Prob_Return
FROM nfl_touch_prob_view
WHERE posteam_type IS NOT NULL
GROUP BY posteam_type
''')
prob_sql.show()

+------------+------------------+------------------+--------------------+
|posteam_type|         Prob_Pass|         Prob_Rush|         Prob_Return|
+------------+------------------+------------------+--------------------+
|        away|0.6106175690182498|0.3009621032589985| 0.06078011383200662|
|        home|0.6086465904126451|0.3180393383375042|0.051085229754954484|
+------------+------------------+------------------+--------------------+



4. For each calendar year, which team(s) has the highest winning rate?

In [46]:
#let's look at the available teams
df_nfl.select('home_team').distinct().show(50)

+---------+
|home_team|
+---------+
|      NYJ|
|      CAR|
|       TB|
|      OAK|
|      DET|
|      TEN|
|      BUF|
|      BAL|
|       NE|
|       GB|
|      JAC|
|      DEN|
|      ARI|
|       SF|
|       KC|
|      SEA|
|      CIN|
|      DAL|
|      CLE|
|      MIA|
|       SD|
|      STL|
|      MIN|
|      ATL|
|      PHI|
|      WAS|
|      NYG|
|      PIT|
|       NO|
|      IND|
|      HOU|
|      CHI|
|       LA|
|      JAX|
|      LAC|
+---------+



In [47]:
nfl_game_info = nfl_game_info.withColumn('win_team', fn.when(fn.col('total_home_score') > fn.col('total_away_score'), fn.col('home_team')).otherwise(fn.col('away_team')))
nfl_game_info = nfl_game_info.withColumn('game_year', fn.substring('game_date', 0, 4))
nfl_game_info.show()

+----------+---------+---------+----------+----------------+----------------+---------+----------+--------+---------+
|   game_id|home_team|away_team| game_date|total_home_score|total_away_score|num_plays|score_diff|win_team|game_year|
+----------+---------+---------+----------+----------------+----------------+---------+----------+--------+---------+
|2009091300|      ATL|      MIA|2009-09-13|              19|               7|      161|        12|     ATL|     2009|
|2009091400|       NE|      BUF|2009-09-14|              25|              23|      171|         2|      NE|     2009|
|2009091401|      OAK|       SD|2009-09-14|              20|              24|      184|         4|      SD|     2009|
|2009092001|      DET|      MIN|2009-09-20|              13|              27|      167|        14|     MIN|     2009|
|2009092004|       KC|      OAK|2009-09-20|              10|              13|      183|         3|     OAK|     2009|
|2009092010|       SF|      SEA|2009-09-20|             

In [51]:
#create three sub dataframe, by team-year
win_count = nfl_game_info.groupBy(fn.col('win_team').alias('team'), 'game_year').agg(fn.count('win_team').alias('win_count'))
home_count = nfl_game_info.groupBy(fn.col('home_team').alias('team'), 'game_year').agg(fn.count('home_team').alias('home_count'))
away_count = nfl_game_info.groupBy(fn.col('away_team').alias('team'), 'game_year').agg(fn.count('away_team').alias('away_count'))

In [49]:
#[Your Code] to join the three dataframes by 'team' and 'game_year' for subsequent calculations
team_count = win_count.join(home_count, ['team', 'game_year']).join(away_count, ['team', 'game_year'])
team_count.show()

+----+---------+---------+----------+----------+
|team|game_year|win_count|home_count|away_count|
+----+---------+---------+----------+----------+
| ATL|     2012|       14|         9|         8|
| STL|     2012|        8|         9|         8|
| WAS|     2015|        8|         8|         7|
| WAS|     2014|        4|         8|         8|
| PHI|     2012|        5|         9|         8|
|  NE|     2013|       11|         8|         8|
|  GB|     2010|       11|         7|         9|
| PHI|     2010|       10|         7|         9|
| DET|     2012|        4|         8|         9|
| BAL|     2011|       12|         9|         7|
| DEN|     2012|       13|         9|         8|
| TEN|     2016|        8|         7|         9|
| DET|     2009|        2|         7|         8|
| MIN|     2009|       11|         7|         8|
| CLE|     2018|        6|         7|         7|
| OAK|     2010|        7|         9|         7|
| HOU|     2011|       11|         8|         8|
| SEA|     2015|    

In [66]:
#generate total game counts and winning rate
team_count = team_count.withColumn('game_count', team_count['home_count'] + team_count['away_count'])
team_count = team_count.withColumn('win_rate', team_count['win_count'] / team_count['game_count'])

In [71]:
#[Your Code] to obtain the team(s) with highest winning rate in each calendar year
windowSpec = Window.partitionBy('game_year').orderBy(fn.desc('win_rate'))

# Rank teams within each year based on win_rate
team_count = team_count.withColumn('rank', fn.rank().over(windowSpec))

# Filter for the top team(s) in each year (rank = 1)
top_teams_each_year = team_count.filter(fn.col('rank') == 1)

# Optionally, select only the columns of interest
top_teams_each_year = top_teams_each_year.select('team', 'game_year', 'win_rate', 'game_count', 'win_count')
top_teams_each_year.show()

+----+---------+------------------+----------+---------+
|team|game_year|          win_rate|game_count|win_count|
+----+---------+------------------+----------+---------+
| IND|     2009|0.9333333333333333|        15|       14|
| ATL|     2010|            0.8125|        16|       13|
|  NE|     2010|            0.8125|        16|       13|
|  GB|     2011|            0.9375|        16|       15|
| ATL|     2012|0.8235294117647058|        17|       14|
| DEN|     2013|            0.8125|        16|       13|
| SEA|     2013|            0.8125|        16|       13|
| DAL|     2014|              0.75|        16|       12|
|  GB|     2014|              0.75|        16|       12|
|  NE|     2014|              0.75|        16|       12|
| SEA|     2014|              0.75|        16|       12|
| DEN|     2014|              0.75|        16|       12|
| CAR|     2015|0.9333333333333333|        15|       14|
| DAL|     2016|            0.8125|        16|       13|
|  NE|     2016|            0.8