In [0]:
school_details = spark.read.csv('/FileStore/tables/school_details.csv', header = True)

In [0]:
school = spark.read.csv('/FileStore/tables/schools.csv', header = True)

TASK . In each decade, how many schools were there that produced players? 

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.functions import col, round, countDistinct

In [0]:
df_school = (
            school
            .withColumn('decade', floor(col('yearID') / 10 )* 10)
            .groupBy('decade')
            .agg(countDistinct('schoolID').alias('num_schools'))
            .orderBy('decade')
)
df_school.show()
            

+------+-----------+
|decade|num_schools|
+------+-----------+
|  1860|          2|
|  1870|         14|
|  1880|         34|
|  1890|         89|
|  1900|        148|
|  1910|        178|
|  1920|        196|
|  1930|        162|
|  1940|        142|
|  1950|        176|
|  1960|        301|
|  1970|        427|
|  1980|        473|
|  1990|        494|
|  2000|        372|
|  2010|         57|
+------+-----------+



TASK: What are the names of the top 5 schools that produced the most players?


In [0]:
df_school1 = ( school_details
               .join(school, on = 'schoolID', how = 'inner')
               .groupBy('name_full')
               .agg(countDistinct('playerID').alias('num_players'))
               .orderBy(col('num_players').desc())
               .limit(5)
)
df_school1.show()           

+--------------------+-----------+
|           name_full|num_players|
+--------------------+-----------+
|University of Tex...|        107|
|University of Sou...|        105|
|Arizona State Uni...|        101|
| Stanford University|         86|
|University of Mic...|         76|
+--------------------+-----------+



TASK : For each decade, what were the names of the top 3 schools that produced the most players?


In [0]:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number


In [0]:
df1 = ( school
       .join(school_details, on = 'schoolID', how ='inner')
       .withColumn('decade', floor(col('yearID')/10)*10)
       .groupBy('decade','name_full')
       .agg(countDistinct('playerID').alias('num_players')) 
)

window_spec = Window.partitionBy('decade').orderBy(col('num_players').desc())
df2 = (df1
       .withColumn('rn', row_number().over(window_spec))
)

df3 = (df2
       .filter(col('rn').isin(1,2,3))
       .orderBy(col('decade').desc(), col('rn'))
)

df3.show()

+------+--------------------+-----------+---+
|decade|           name_full|num_players| rn|
+------+--------------------+-----------+---+
|  2010|University of Flo...|          5|  1|
|  2010|University of Tex...|          4|  2|
|  2010|University of Sou...|          3|  3|
|  2000|California State ...|         23|  1|
|  2000|Arizona State Uni...|         23|  2|
|  2000| Stanford University|         22|  3|
|  1990| Stanford University|         25|  1|
|  1990|University of Sou...|         23|  2|
|  1990|Louisiana State U...|         22|  3|
|  1980|University of Ari...|         24|  1|
|  1980|Arizona State Uni...|         23|  2|
|  1980|University of Cal...|         22|  3|
|  1970|Arizona State Uni...|         32|  1|
|  1970|University of Sou...|         24|  2|
|  1970|University of Tex...|         20|  3|
|  1960|Arizona State Uni...|         18|  1|
|  1960|University of Sou...|         17|  2|
|  1960|University of Mic...|         14|  3|
|  1950|University of Sou...|     

TASK: Return the top 20% of teams in terms of average annual spending (Salary Analysis )

In [0]:
salary = spark.read.csv('/FileStore/tables/salaries.csv', header = True)

In [0]:
from pyspark.sql.functions import sum

In [0]:
df1 =( salary
       .groupBy('yearID','teamID')
       .agg(sum('salary').alias('total_amount'))
) 
window_spec = Window.orderBy(col('avg_spend').desc())

df2 = ( df1
        .groupBy('teamID')
        .agg(avg('total_amount').alias('avg_spend'))
        .withColumn('NT',ntile(5).over(window_spec))
        
)

df3 = (df2
       .filter(col('NT')==1)
       .select('teamID',round(col('avg_spend') / 1000000,1).alias('spending_millions'))
)
df3.show()

+------+-----------------+
|teamID|spending_millions|
+------+-----------------+
|   SFG|            143.5|
|   LAA|            118.5|
|   NYA|            109.4|
|   BOS|             81.1|
|   LAN|             74.6|
|   WAS|             71.5|
|   ARI|             71.2|
|   PHI|             66.1|
+------+-----------------+



TASK : For each team, show the cumulative sum of spending over the years

In [0]:
df = (salary
      .groupBy('teamID','yearID')
      .agg(sum('salary').alias('total_salary'))
    
)

window_spec = Window.partitionBy(col('teamID')).orderBy(col('yearID'))
df1 = (df
       .withColumn('cummulative_sum', round(sum('total_salary').over(window_spec) / 1000000,1))
)
df1.show()

+------+------+------------+---------------+
|teamID|yearID|total_salary|cummulative_sum|
+------+------+------------+---------------+
|   ANA|  1997| 3.1135472E7|           31.1|
|   ANA|  1998|    4.1281E7|           72.4|
|   ANA|  1999| 5.5388166E7|          127.8|
|   ANA|  2000| 5.1464167E7|          179.3|
|   ANA|  2001| 4.7535167E7|          226.8|
|   ANA|  2002| 6.1721667E7|          288.5|
|   ANA|  2003| 7.9031667E7|          367.6|
|   ANA|  2004|1.00534667E8|          468.1|
|   ARI|  1998|    3.2347E7|           32.3|
|   ARI|  1999| 6.8703999E7|          101.1|
|   ARI|  2000| 8.1027833E7|          182.1|
|   ARI|  2001| 8.5082999E7|          267.2|
|   ARI|  2002|1.02819999E8|          370.0|
|   ARI|  2003|    8.0657E7|          450.6|
|   ARI|  2004|  6.978075E7|          520.4|
|   ARI|  2005| 6.2329166E7|          582.7|
|   ARI|  2006| 5.9684226E7|          642.4|
|   ARI|  2007| 5.2067546E7|          694.5|
|   ARI|  2008| 6.6202712E7|          760.7|
|   ARI|  

TASK : Return the first year that each team's cumulative spending surpassed 1 billion

In [0]:
from pyspark.sql.functions import sum as Fsum

In [0]:
df = (salary
      .groupBy('yearID','teamID')
      .agg(Fsum('salary').alias('total_spend'))
)

window_spec = Window.partitionBy('teamID').orderBy('yearID')
df1 = (df
       .withColumn('cummulative_sum', round(sum('total_spend').over(window_spec) / 100000,1))
)

window_spec1 = Window.partitionBy('teamID').orderBy('cummulative_sum','yearID')
df2 = (df1
       .withColumn('rn', row_number().over(window_spec1))
       .filter(col('cummulative_sum') > 1000)
)

df3 = (df2
       .filter(col('rn')==1)
       .select('teamID','yearID','cummulative_sum')
       
)

df3.show()

+------+------+---------------+
|teamID|yearID|cummulative_sum|
+------+------+---------------+
|   MIA|  2012|         1180.8|
|   SFG|  2014|         1435.1|
+------+------+---------------+



Player Analysis 

 TASK : For each player, calculate their age at their first (debut) game,
  their last game, and their career length (all in years). Sort from longest career to shortest career.


In [0]:
player = spark.read.csv('/FileStore/tables/players.csv', header = True)

In [0]:
from pyspark.sql.functions import year, col, expr

In [0]:
df = (player
      .withColumn('birthYear', col('birthYear'))
      .withColumn('debut', col('debut').cast('date'))
      .withColumn('finalGame',col('finalGame').cast('date'))
)

df1 = (df
       .withColumn('first_game', year(col('debut'))- col('birthYear'))
       .withColumn('last_game',year(col('finalGame'))- col('birthYear'))
       .withColumn('career_length', year(col('finalGame')) - year(col('debut')))
)

df2 = (df1
       .select('nameFirst', 'first_game', 'last_game', 'career_length')
       .orderBy(col("career_length").desc())
       )

df2.show()

+---------+----------+---------+-------------+
|nameFirst|first_game|last_game|career_length|
+---------+----------+---------+-------------+
|      Cap|      19.0|     45.0|           26|
|     Paul|      17.0|     36.0|           19|
|     Jack|      20.0|     39.0|           19|
|      Joe|      18.0|     37.0|           19|
|   Deacon|      24.0|     43.0|           19|
|      Joe|      18.0|     36.0|           18|
|    Candy|      23.0|     41.0|           18|
|      Pop|      19.0|     37.0|           18|
|    Roger|      23.0|     40.0|           17|
|     Buck|      21.0|     38.0|           17|
|      Pud|      19.0|     36.0|           17|
|      Bid|      23.0|     40.0|           17|
|     Ezra|      22.0|     39.0|           17|
|      Lip|      26.0|     42.0|           16|
|    Bobby|      20.0|     36.0|           16|
|      Jim|      17.0|     33.0|           16|
|   Orator|      23.0|     39.0|           16|
|      Tom|      22.0|     38.0|           16|
|     Jack|  

TASK : How have average height and weight at debut game changed over the years, and what's the decade-over-decade difference?

In [0]:
df = (player
      .withColumn('debut_year', year(col('debut')))
      .withColumn('decade', round(col('debut_year') / 10) * 10)

)

df1 = (df
       .groupBy('decade')
       .agg( avg('height').alias('avg_height'), avg('weight').alias('avg_weight'))

)

window_spec = Window.orderBy('decade')
df2 = (df1
       .withColumn('height_diff',col('avg_height') - lag('avg_height').over(window_spec))
       .withColumn('weight_diff', col('avg_weight') - lag('avg_weight').over(window_spec))
       .filter(col('decade').isNotNull())
)

df2.show()

+------+-----------------+------------------+------------------+------------------+
|decade|       avg_height|        avg_weight|       height_diff|       weight_diff|
+------+-----------------+------------------+------------------+------------------+
|1870.0|68.54304635761589|            159.16|-3.953335607873626|-28.11291356184799|
|1880.0|69.41340782122904|168.50912408759123|0.8703614636131505| 9.349124087591235|
|1890.0|69.82142857142857|169.41788617886178|0.4080207501995261|0.9087620912705461|
|1900.0|70.07817589576547|171.20666666666668|0.2567473243368994|1.7887804878048996|
+------+-----------------+------------------+------------------+------------------+



TASK : Create a summary table that shows for each team, what percent of players bat right, left and both.

In [0]:
from pyspark.sql.functions import col, when, sum, count, round

df = (salary
      .join(player, 'playerID', 'left')
)

df1 = (df
       .groupBy('teamID')
       .agg(
           round((sum(when(col('bats') == 'R', 1).otherwise(0)) / count('playerID')) * 100, 1).alias('bats_right'),
           round((sum(when(col('bats') == 'L', 1).otherwise(0)) / count('playerID')) * 100, 1).alias('bats_left'),
           round((sum(when(col('bats') == 'B', 1).otherwise(0)) / count('playerID')) * 100, 1).alias('bats_both')
       )
)
df1.show()

+------+----------+---------+---------+
|teamID|bats_right|bats_left|bats_both|
+------+----------+---------+---------+
|   NYM|      66.7|     29.2|      4.2|
|   TBA|      59.8|     31.0|      9.2|
|   OAK|      62.7|     27.5|      9.9|
|   NYA|      58.8|     30.7|     10.5|
|   DET|      60.8|     28.6|     10.6|
|   FLO|      66.3|     24.3|      9.4|
|   ANA|      61.1|     31.6|      7.3|
|   ML4|      59.6|     29.4|     11.0|
|   SFN|      61.1|     27.5|     11.3|
|   SLN|      61.9|     26.5|     11.6|
|   BAL|      61.8|     29.6|      8.6|
|   BOS|      61.9|     29.3|      8.6|
|   TOR|      64.0|     26.6|      9.4|
|   SFG|      55.6|     25.9|     18.5|
|   ARI|      61.6|     30.4|      7.9|
|   LAA|      68.2|     16.6|     15.2|
|   SEA|      61.7|     28.9|      9.4|
|   LAN|      62.9|     27.8|      9.2|
|   CIN|      62.6|     29.4|      8.0|
|   CLE|      59.6|     29.7|     10.8|
+------+----------+---------+---------+
only showing top 20 rows

