* download this SQL script and load it into a mysql DB http://seanlahman.com/files/database/lahman2016-sql.zip
* Name the DB lahman2016
* Set DB User and password


In [1]:
import json
with open('credentials.json') as file:
    data = json.loads(file.read())
    my_user = data['username']
    my_password = data['password']
    aws_access_key_id = data['aws_access_key_id']
    aws_secret_access_key = data['aws_secret_access_key']


In [21]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as funcs
from pyspark.sql.functions import col, when, round, rank, mean
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
import mysql.connector
import pandas as pd
import boto3
s3 = boto3.resource('s3')


spark = SparkSession.builder \
    .master("local[1]") \
    .appName("Lahmans2016") \
    .getOrCreate()


def exception_handler(func):
    def Inner_Function():
        try:
            func()
        except:
            print(f"{func.__name__} input not valid. Please check credentials")
    return func


@exception_handler
def connect_to_db():
    return mysql.connector.connect(user=my_user, password=my_password,
                              database='lahman2016')
    

class AverageSalary():
    def __init__(self):
        self.cnx = connect_to_db()
        
        
    def extract(self):                           
        salaries_data = pd.read_sql_query(
            '''select s.playerID, salary, f.yearID, POS
                from lahman2016.Salaries s 
                join lahman2016.Fielding f 
                on s.playerId=f.playerId
                and f.yearId=s.yearId;"''', self.cnx)      
        self.salaries_sc = spark.createDataFrame(salaries_data)
        return self.salaries_sc
    
    
    def transform(self):
        salaries_sc_2 = self.salaries_sc.groupby('yearID').pivot('POS').agg(funcs.avg('salary'))
        # df2.show(truncate=True)
        salaries_sc_3 = salaries_sc_2.withColumn(
            'infield', (salaries_sc_2['1B'] + salaries_sc_2['2B'] + salaries_sc_2['3B'] + salaries_sc_2['SS'])/4)
        self.salaries_transformed = salaries_sc_3.select(col('yearID').alias('Year'), round(col('infield'),0).cast(IntegerType()).alias('Fielding'),  
                          round(col('P'),0).cast(IntegerType()).alias('Pitching')).orderBy("yearID")

        self.salaries_transformed
        
           
    def load(self):
        salaries_transformed_pd = self.salaries_transformed.toPandas()
        salaries_transformed_pd['Pitching'] = salaries_transformed_pd.apply(lambda x: '{0:,}'.format(x['Pitching']), axis=1)
        salaries_transformed_pd['Fielding'] = salaries_transformed_pd.apply(lambda x: '{0:,}'.format(x['Fielding']), axis=1)
        salaries_transformed_pd.to_csv('AverageSalary.csv', index=False)
        AverageSalary = open('AverageSalary.csv', 'rb')
        s3.Bucket('lahman2016').put_object(Key='AverageSalary.csv', Body=AverageSalary)

    
    
    def run_ETL(self):
        self.extract()
        self.transform()
        self.load()
    
    
class HallofFameAllStarPitchers():
    def __init__(self):
        self.cnx = connect_to_db()

    
    def extract(self):
        self.hof_data = pd.read_sql_query(
            '''select playerID, yearid from lahman2016.HallOfFame 
        where inducted="Y"''', self.cnx)  

        self.pitchers_data = pd.read_sql_query(
            '''SELECT playerID, ERA
                FROM lahman2016.Pitching
                ''',self.cnx) 

        self.allstar_data = pd.read_sql_query(
            '''
            select playerID, yearID as allstar_years, gameNum
            from lahman2016.AllStarFull 
            where GP=1
            ''', self.cnx) 
        

    
    def transform(self):
        hof_sc = spark.createDataFrame(self.hof_data).alias('hof_sc')
        pitchers_sc = spark.createDataFrame(self.pitchers_data).alias('pitchers_sc')
        allstar_sc = spark.createDataFrame(self.allstar_data).alias('allstar_sc')
        pitchers_sc = pitchers_sc.groupby('playerID').avg('ERA')\
            .select(col('playerID'), round(col('avg(ERA)'),2).alias('ERA')).alias('pitchers_sc')
        hof_pitchers = pitchers_sc.join(hof_sc, hof_sc.playerID==pitchers_sc.playerID, how='inner')\
            .select(col('pitchers_sc.playerID'), col('yearid'), col('ERA')).alias('hof_pitchers')
        allstar_agg_sc = allstar_sc.withColumn("gameNum",
        when(col("gameNum") == 0, 1).otherwise(col('gameNum'))).groupby('playerID').sum('gameNum').select(col('playerID'), col('sum(gameNum)').alias('total_games')).alias('allstar_pitcher_num_games')
        self.hof_allstar_pitchers_transformed = hof_pitchers.join(allstar_agg_sc, allstar_agg_sc.playerID==hof_pitchers.playerID,
                                how='inner').select(col('hof_pitchers.playerID').alias('Player'),
                                                    col('ERA'), col('total_games').alias(' # All Star Appearances'),
                                                    col('yearid').alias('Hall of Fame Induction Year')
                                                   )
        
    def load(self):
        self.hof_allstar_pitchers_transformed.toPandas().to_csv('HallofFameAllStarPitchers.csv', index=False)
        HallofFameAllStarPitchers = open('HallofFameAllStarPitchers.csv', 'rb')
        s3.Bucket('lahman2016').put_object(Key='HallofFameAllStarPitchers.csv', Body=HallofFameAllStarPitchers)


    
    def run_ETL(self):
        self.extract()
        self.transform()
        self.load()

        

class Pitching():
    def __init__(self):
        self.cnx = connect_to_db()

        
    def extract(self):
        post_players = pd.read_sql_query(
        """select  playerID, yearID, ER, IPouts, W, L, ERA
        FROM lahman2016.PitchingPost""",  self.cnx)     
        reg_players = pd.read_sql_query('''select  playerID, yearID, ER, IPouts, 
            W, L, ERA FROM lahman2016.Pitching''', self.cnx)   
        spark.createDataFrame(post_players)
        self.post_players_sc = spark.createDataFrame(post_players)
        self.reg_players_sc = spark.createDataFrame(reg_players)

    
    def transform(self):
        self.post_players_sc.createOrReplaceTempView("post_players_sc")
        spark.sql('''SELECT playerID, yearID, avg(ERA) as reg_ERA, avg(ER) as ER, sum(IPouts) as IPouts, sum(W) as W, sum(L) as L, sum(W) + sum(L) as G
                        from post_players_sc
                        group by playerID, yearID
                        ''').createOrReplaceTempView("post_players_sc")

        self.reg_players_sc.createOrReplaceTempView("reg_players_sc")
        spark.sql('''SELECT playerID, yearID, avg(ERA) as post_ERA, avg(ER) as ER, sum(IPouts) as IPouts, sum(W) as W, sum(L) as L, sum(W) + sum(L) as G
                        from reg_players_sc
                        group by playerID, yearID
                        ''').createOrReplaceTempView("reg_players_sc")

        spark.sql(
        '''select reg.yearID, reg.playerID, (reg.ER+post.ER) as ER, (reg.IPouts+post.IPouts)/3 as inningsPitched,
                        reg.W as reg_W, reg.L as reg_L, post.W as post_W, post.L as post_L, reg_ERA, post_ERA
        FROM  post_players_sc post
        join reg_players_sc reg
        on post.playerID=reg.playerID
        and post.yearID=reg.yearID
        where post.G>0'''
        ).createOrReplaceTempView("all_players_sc")

        spark.sql('''select * from (select yearID, PLAYERID, rank() over(partition by yearID order by ER/inningsPitched) as Rank, 
                   reg_W/(reg_W+reg_L) as RegSeasonWinLoss, post_W/(post_W+post_L) as PostSeasonWinLoss, reg_ERA, post_ERA
                  from all_players_sc) as a 
                  where Rank<=10
                  ''').createOrReplaceTempView('best_pitchers_per_year')

        self.pitching_transformed = spark.sql('''
                select playerID, yearID, round(reg_ERA,2) as RegualarERA, 
                round(RegSeasonWinLoss,2) as RegSeasonWinLoss, round(PostSeasonWinLoss,2) as PostSeasonWinLoss,
                        round(post_ERA,2) as PostERA
                from best_pitchers_per_year order by 2 desc
                ''')


    def load(self):
        self.pitching_transformed.toPandas().to_csv('Pitching.csv', index=False)
        Pitching = open('Pitching.csv', 'rb')
        s3.Bucket('lahman2016').put_object(Key='Pitching.csv', Body=Pitching)

    
    def run_ETL(self):
        self.extract()
        self.transform()
#         self.load()


class Rankings():
    def __init__(self):
        self.cnx = connect_to_db()

        
    def extract(self):
        team_data = pd.read_sql_query(
    '''select yearID, teamID, W, L, AB FROM lahman2016.Teams''', self.cnx)  
        self.team_sc = spark.createDataFrame(team_data).alias('team_sc')

        
    def transform(self):
        team_sc_wl = self.team_sc.withColumn('W/L', (col('W')/col('L')))
        windowSpec  = Window.partitionBy("yearID").orderBy("W/L")
        rank_sc = team_sc_wl.withColumn("rank",rank().over(windowSpec))
        best_team = rank_sc.filter(col('rank')==1).select('yearID', 'rank', 'teamID', 'AB')
        worst_rank_years = rank_sc.groupby('yearID').max('rank').select(col('yearID'),col('max(rank)').alias('rank'))
        worst_team = rank_sc.join(worst_rank_years, [worst_rank_years.yearID==rank_sc.yearID,\
                                                     worst_rank_years.rank==rank_sc.rank], how='inner' )
        worst_team = spark.createDataFrame(worst_team.toPandas().iloc[:,[0, 6, 1, 4]])
        self.rankings_transformed = best_team.union(worst_team).orderBy('yearID').select(col('teamID').alias('Team ID'), col('yearID').alias('Year'), col('rank').alias('Rank') ,col('AB').alias('At Bats'))
     
    
    def load(self):
        self.rankings_transformed.toPandas().to_csv('Rankings.csv', index=False)
        Rankings = open('Rankings.csv', 'rb')
        s3.Bucket('lahman2016').put_object(Key='Rankings.csv', Body=Rankings)

    
    def run_ETL(self):
        self.extract()
        self.transform()
        self.load()


def main():
    HallofFameAllStarPitchers().run_ETL()
    AverageSalary().run_ETL()
    Rankings().run_ETL()
    Pitching().run_ETL()
    
    
if __name__ == '__main__':
    main()


In [22]:
! open .