In [1]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as funcs
from pyspark.sql.functions import col, when, round, rank, mean
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
import mysql.connector
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

In [2]:
my_user = 'root'
my_password = 'Hamed_11351'
my_database='lahman2016'

def connect_to_db():
    return mysql.connector.connect(user=my_user, password=my_password, database=my_database)

In [3]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [4]:
spark = SparkSession.builder.appName("lahman2016").getOrCreate()

In [5]:
spark

## Average Salary

In [None]:
# Connect to db
db = connect_to_db()

# Extract Data
salaries_data = pd.read_sql_query(
    '''select s.playerID, salary, f.yearID, POS
    from salaries as s join fielding as f 
    on s.playerId=f.playerId and f.yearId=s.yearId;''', db) 

salaries_sc = spark.createDataFrame(salaries_data)

# transform
salaries_sc_2 = salaries_sc.groupby('yearID').pivot('POS').avg('salary')
salaries_sc_3 = salaries_sc_2.withColumn('infield', (salaries_sc_2['1B'] + salaries_sc_2['2B'] + \
                                                     salaries_sc_2['3B'] + salaries_sc_2['SS'])/4)
salaries_transformed = salaries_sc_3.select(col('yearID').alias('Year'), 
                                            round(col('infield'),0).cast(IntegerType()).alias('Fielding'),  
                                            round(col('P'),0).cast(IntegerType()).alias('Pitching')).orderBy('yearID')

# load
salaries_transformed_pd = salaries_transformed.toPandas()
salaries_transformed_pd['Pitching'] = salaries_transformed_pd.apply(lambda x: '{0:,}'.format(x['Pitching']), axis=1)
salaries_transformed_pd['Fielding'] = salaries_transformed_pd.apply(lambda x: '{0:,}'.format(x['Fielding']), axis=1)
salaries_transformed_pd.to_csv('Results\AverageSalary.csv', index=False)

## Hall of Fame All Star Pitchers

In [7]:
db = connect_to_db()

# extract   
hof_data = pd.read_sql_query(
    '''select playerID, yearid 
    from halloffame 
    where inducted="Y"''', db)  

pitchers_data = pd.read_sql_query(
    '''select playerID, ERA
    from pitching''', db) 

allstar_data = pd.read_sql_query(
    '''select playerID, yearID as allstar_years, gameNum
    from allstarfull 
    where GP=1''', db) 

# transform
hof_sc = spark.createDataFrame(hof_data).alias('hof_sc')
pitchers_sc = spark.createDataFrame(pitchers_data).alias('pitchers_sc')
allstar_sc = spark.createDataFrame(allstar_data).alias('allstar_sc')

pitchers_sc = pitchers_sc.groupby('playerID').avg('ERA')\
    .select(col('playerID'), round(col('avg(ERA)'),2).alias('ERA')).alias('pitchers_sc')

hof_pitchers = pitchers_sc.join(hof_sc, hof_sc.playerID==pitchers_sc.playerID, how='inner')\
    .select(col('pitchers_sc.playerID'), col('yearid'), col('ERA')).alias('hof_pitchers')

allstar_agg_sc = allstar_sc.withColumn('gameNum', when(col('gameNum')==0, 1).otherwise(col('gameNum')))\
    .groupby('playerID').sum('gameNum').select(col('playerID'), col('sum(gameNum)').alias('total_games'))\
    .alias('allstar_pitcher_num_games')

hof_allstar_pitchers_transformed = hof_pitchers.join(allstar_agg_sc, allstar_agg_sc.playerID==hof_pitchers.playerID,how='inner')\
    .select(col('hof_pitchers.playerID').alias('Player'), col('ERA'), col('total_games').alias(' # All Star Appearances'),
    col('yearid').alias('Hall of Fame Induction Year'))
        
# load
hof_allstar_pitchers_transformed.toPandas().to_csv('HallofFameAllStarPitchers.csv', index=False)

## Pitching

In [8]:
db = connect_to_db()

#extract
post_players = pd.read_sql_query(
    '''select  playerID, yearID, ER, IPouts, W, L, ERA
    from pitchingpost''', db)     
reg_players = pd.read_sql_query(
    '''select  playerID, yearID, ER, IPouts, W, L, ERA 
    from pitching''', db)   

#transform
post_players_sc = spark.createDataFrame(post_players)
reg_players_sc = spark.createDataFrame(reg_players)

post_players_sc.createOrReplaceTempView("post_players_sc")
spark.sql('''
    select playerID, yearID, avg(ERA) as reg_ERA, avg(ER) as ER, sum(IPouts) as IPouts, sum(W) as W, sum(L) as L, sum(W) + sum(L) as G
    from post_players_sc
    group by playerID, yearID''').createOrReplaceTempView("post_players_sc")

reg_players_sc.createOrReplaceTempView("reg_players_sc")
spark.sql('''
    select playerID, yearID, avg(ERA) as post_ERA, avg(ER) as ER, sum(IPouts) as IPouts, sum(W) as W, sum(L) as L, sum(W) + sum(L) as G
    from reg_players_sc
    group by playerID, yearID''').createOrReplaceTempView("reg_players_sc")

spark.sql('''
    select reg.yearID, reg.playerID, (reg.ER+post.ER) as ER, (reg.IPouts+post.IPouts)/3 as inningsPitched,
        reg.W as reg_W, reg.L as reg_L, post.W as post_W, post.L as post_L, reg_ERA, post_ERA
    from post_players_sc as post join reg_players_sc as reg
    on post.playerID=reg.playerID and post.yearID=reg.yearID
    where post.G>0''').createOrReplaceTempView("all_players_sc")

spark.sql('''
    select * from (select yearID, PLAYERID, rank() over(partition by yearID order by ER/inningsPitched) as Rank, 
        reg_W/(reg_W+reg_L) as RegSeasonWinLoss, post_W/(post_W+post_L) as PostSeasonWinLoss, reg_ERA, post_ERA
    from all_players_sc) as a 
    where Rank<=10''').createOrReplaceTempView('best_pitchers_per_year')

pitching_transformed = spark.sql('''
    select yearID, playerID, round(reg_ERA,2) as RegualarERA, round(RegSeasonWinLoss,2) as RegSeasonWinLoss,
        round(post_ERA,2) as PostERA, round(PostSeasonWinLoss,2) as PostSeasonWinLoss
    from best_pitchers_per_year order by 2 desc''')

pitching_transformed = pitching_transformed.select(col('yearID').alias('Year'), 
                                                   col('playerID').alias('Player'), 
                                                   col('RegualarERA').alias('Regular Season ERA'), 
                                                   col('RegSeasonWinLoss').alias('Regular Season Win/Loss'), 
                                                   col('PostERA').alias('Post-season ERA'), 
                                                   col('PostSeasonWinLoss').alias('Post-season Win/Loss')).orderBy('yearID')

#load
pitching_transformed.toPandas().to_csv('Pitching.csv', index=False)

## Rankings

In [9]:
db = connect_to_db()

#extract
team_data = pd.read_sql_query(
    '''select yearID, teamID, W, L, AB from teams''', db)  

team_sc = spark.createDataFrame(team_data).alias('team_sc')

        
#transform
team_sc_wl = team_sc.withColumn('W/L', (col('W')/col('L')))
windowSpec  = Window.partitionBy('yearID').orderBy('W/L')
rank_sc = team_sc_wl.withColumn('rank',rank().over(windowSpec))
best_team = rank_sc.filter(col('rank')==1).select('yearID', 'rank', 'teamID', 'AB')
worst_rank_years = rank_sc.groupby('yearID').max('rank').select(col('yearID'),col('max(rank)').alias('rank'))
worst_team = rank_sc.join(worst_rank_years, [worst_rank_years.yearID==rank_sc.yearID,\
                                             worst_rank_years.rank==rank_sc.rank], how='inner' )
worst_team = spark.createDataFrame(worst_team.toPandas().iloc[:,[0, 6, 1, 4]])
rankings_transformed = best_team.union(worst_team).orderBy('yearID').\
    select(col('teamID').alias('Team ID'), col('yearID').alias('Year'), col('rank').alias('Rank') ,col('AB').alias('At Bats'))

#load
rankings_transformed.toPandas().to_csv('Rankings.csv', index=False)