## ETL PROCESS FOR CHADWICK BASEBALL DATA
This is an ETL process to import data from the Chadwick Databank into a Microsoft SQL Server database, either locally hosted or on AWS.
https://github.com/chadwickbureau/baseballdatabank

## INTENT:
* become part of AWS lambda job
* import from S3
* call SSMS job to do post-import updates

### TODO:
* Build ETL to iterate over whole "core" folder and import everything --> complete except for salary stuff
* Integrate logging framework
* Add other parts of schema
* Define schema and key relationships for entire Chadwick db upon import
* Validation testing on imports -- basic metadata catalog to check on number of rows and full set of tables etc
* Normalization where appropriate in anticipation of new data sets?

In [68]:
import pandas as pd
import os.path
# import matplotlib.pyplot as plt
# import numpy as np
# import zipfile
# import wget
import pyodbc
import sqlalchemy

In [55]:
# import os
# os.getcwd()
# from os.path import exists
# print(exists("../README.md"))
print(sqlalchemy.__version__)

1.4.22


In [111]:
# migrate me to a config file.
root_dir = "../../baseballdatabank/"

server = "(localdb)\MSSQLLocalDB"
database = "baseball"

engine = sqlalchemy.create_engine("mssql+pyodbc://" + server + "/" + database + "?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server")

In [119]:
conn = engine.connect()
conn.close()

In [134]:
#refactor me
subdirs = ["core","contrib"]

with engine.connect() as conn:

    for subdir in subdirs:
        print(subdir)
    
        for i in os.listdir(root_dir + subdir):

            if i.endswith(".csv"):

                file_name = root_dir + subdir + "/" + i
                table_name = subdir + "_" + i.replace(".csv","")

                df = pd.read_csv(file_name)

                df.replace({np.inf: np.nan, -np.inf: np.nan}, inplace=True) # infinite ERAs are unfortunate.  
                ## didn't realize that inf was an actual valid state for a pandas float

                ## should probably add data validation checks at this step prior to import into sql

                df.to_sql(name=table_name, con=engine, if_exists='replace', index=False)

                print(i + " successfully uploaded.")
    

core
AllstarFull.csv successfully uploaded.
Appearances.csv successfully uploaded.
Batting.csv successfully uploaded.
BattingPost.csv successfully uploaded.
Fielding.csv successfully uploaded.
FieldingOF.csv successfully uploaded.
FieldingOFsplit.csv successfully uploaded.
FieldingPost.csv successfully uploaded.
HomeGames.csv successfully uploaded.
Managers.csv successfully uploaded.
ManagersHalf.csv successfully uploaded.
Parks.csv successfully uploaded.
People.csv successfully uploaded.
Pitching.csv successfully uploaded.
PitchingPost.csv successfully uploaded.
SeriesPost.csv successfully uploaded.
Teams.csv successfully uploaded.
TeamsFranchises.csv successfully uploaded.
TeamsHalf.csv successfully uploaded.
contrib
AwardsManagers.csv successfully uploaded.
AwardsPlayers.csv successfully uploaded.
AwardsShareManagers.csv successfully uploaded.
AwardsSharePlayers.csv successfully uploaded.
CollegePlaying.csv successfully uploaded.
HallOfFame.csv successfully uploaded.
Salaries.csv su

In [None]:
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session

Base = automap_base()
Base.prepare(engine, reflect=True)



# still in progress: generating read call in sqlalchemy to pull data from defined view
#metadata = sqlalchemy.MetaData()
#sqlalchemy.MetaData.reflect(metadata)
#test_view = sqlalchemy.Table('test_view', metadata)

In [28]:
Base.classes.keys()

['people', 'appearances', 'teams']

In [29]:
salaries_csv = zf.open('baseballdatabank-2022.2/contrib/Salaries.csv')


salaries_df = pd.read_csv(salaries_csv)
salaries_df.head(5)

Unnamed: 0,yearID,teamID,lgID,playerID,salary
0,1985,ATL,NL,barkele01,870000
1,1985,ATL,NL,bedrost01,550000
2,1985,ATL,NL,benedbr01,545000
3,1985,ATL,NL,campri01,633333
4,1985,ATL,NL,ceronri01,625000


In [30]:
salaries_df.sample(50)

Unnamed: 0,yearID,teamID,lgID,playerID,salary
21393,2010,TEX,AL,youngmi02,13174974
22822,2012,NYN,NL,santajo01,23145011
11152,1998,SLN,NL,morrima01,300000
12155,1999,SLN,NL,osbordo01,5080000
13219,2001,BOS,AL,bicheda01,7000000
6508,1993,TOR,AL,colesda01,500000
2523,1988,SLN,NL,penato01,1100000
24600,2014,SFN,NL,affelje01,6000000
26054,2016,MIN,AL,plouftr01,7250000
975,1986,MON,NL,tibbsja01,127500


In [31]:
# compare with (maybe scrape?) http://www.stevetheump.com/Payrolls.htm
payroll_df = salaries_df.groupby(['teamID', 'yearID']).sum()
payroll_df.sample(15)

Unnamed: 0_level_0,Unnamed: 1_level_0,salary
teamID,yearID,Unnamed: 2_level_1
FLO,2001,35762500
TEX,2000,70795921
TBA,2009,63313034
NYN,1996,24479500
SFN,1988,12380000
LAN,1994,38000001
PIT,2010,34943000
CIN,1989,11072000
LAA,2015,120005415
CIN,2000,46867200


In [32]:
payroll_df['pct_of_year_avg'] = payroll_df[['salary']] / payroll_df.groupby(['yearID']).mean()

In [33]:
payroll_df.groupby(['yearID']).mean()

Unnamed: 0_level_0,salary,pct_of_year_avg
yearID,Unnamed: 1_level_1,Unnamed: 2_level_1
1985,10075570.0,1.0
1986,11840560.0,1.0
1987,10483670.0,1.0
1988,11555860.0,1.0
1989,13845990.0,1.0
1990,17072350.0,1.0
1991,23578790.0,1.0
1992,30982440.0,1.0
1993,32205000.0,1.0
1994,33137010.0,1.0


In [34]:
payroll_df

Unnamed: 0_level_0,Unnamed: 1_level_0,salary,pct_of_year_avg
teamID,yearID,Unnamed: 2_level_1,Unnamed: 3_level_1
ANA,1997,31135472,0.773356
ANA,1998,41281000,0.968823
ANA,1999,55388166,1.112042
ANA,2000,51464167,0.926651
ANA,2001,47535167,0.727333
...,...,...,...
WAS,2012,80855143,0.827095
WAS,2013,113703270,1.124096
WAS,2014,131983680,1.240325
WAS,2015,155587472,1.328240


In [35]:
payroll_df.reset_index(inplace=True)

In [36]:
for col in range(0,len(payroll_df.columns)):
    col_dict[payroll_df.columns[col]] = str.lower(payroll_df.columns[col])

payroll_df.rename(columns=col_dict, inplace=True)
payroll_df.columns

Index(['teamid', 'yearid', 'salary', 'pct_of_year_avg'], dtype='object')

In [37]:
# Inflation data from US Federal Reserve: https://www.bea.gov/data/prices-inflation/gdp-price-deflator
# "Price deflator" pegs value of dollar to 2012.  Grab the january values and re-baseline them to 2021
# df = df[df['Date'].dt.month == 11]
inflation_csv = 'Resources/GDPDEF.csv'
inflation_df = pd.read_csv(inflation_csv)
inflation_df

Unnamed: 0,DATE,GDPDEF
0,1947-01-01,11.952
1,1947-04-01,12.122
2,1947-07-01,12.326
3,1947-10-01,12.630
4,1948-01-01,12.731
...,...,...
295,2020-10-01,114.439
296,2021-01-01,115.652
297,2021-04-01,117.413
298,2021-07-01,119.115


In [38]:
inflation_df.dtypes

DATE       object
GDPDEF    float64
dtype: object

In [39]:
inflation_df['date_fixed'] = pd.to_datetime(inflation_df['DATE'])
inflation_df.drop(columns='DATE', inplace=True)
inflation_df

Unnamed: 0,GDPDEF,date_fixed
0,11.952,1947-01-01
1,12.122,1947-04-01
2,12.326,1947-07-01
3,12.630,1947-10-01
4,12.731,1948-01-01
...,...,...
295,114.439,2020-10-01
296,115.652,2021-01-01
297,117.413,2021-04-01
298,119.115,2021-07-01


In [40]:
inflation_df = inflation_df.loc[inflation_df['date_fixed'].dt.month == 4]
inflation_df

Unnamed: 0,GDPDEF,date_fixed
1,12.122,1947-04-01
5,12.845,1948-04-01
9,12.923,1949-04-01
13,12.888,1950-04-01
17,13.992,1951-04-01
...,...,...
281,107.369,2017-04-01
285,110.234,2018-04-01
289,112.152,2019-04-01
293,112.859,2020-04-01


In [41]:
inflation_df['yearid'] = inflation_df['date_fixed'].dt.year
inflation_df.drop(columns='date_fixed', inplace=True)
inflation_df

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  errors=errors,


Unnamed: 0,GDPDEF,yearid
1,12.122,1947
5,12.845,1948
9,12.923,1949
13,12.888,1950
17,13.992,1951
...,...,...
281,107.369,2017
285,110.234,2018
289,112.152,2019
293,112.859,2020


In [42]:
# rebaseline index to 2021
inflation_multiplier = inflation_df.sort_values(by='yearid', ascending=False).head(1)['GDPDEF'].values[0]
print(inflation_multiplier)

117.413


In [43]:
inflation_df['inflation_index'] = inflation_df['GDPDEF'] * 100 / inflation_multiplier
inflation_df = inflation_df.reindex(columns=['yearid', 'GDPDEF', 'inflation_index']).set_index('yearid')
inflation_df

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.


Unnamed: 0_level_0,GDPDEF,inflation_index
yearid,Unnamed: 1_level_1,Unnamed: 2_level_1
1947,12.122,10.324240
1948,12.845,10.940015
1949,12.923,11.006447
1950,12.888,10.976638
1951,13.992,11.916909
...,...,...
2017,107.369,91.445581
2018,110.234,93.885686
2019,112.152,95.519236
2020,112.859,96.121383


In [44]:
payroll_inflation_df = payroll_df.merge(right=inflation_df, how='inner', left_on='yearid', right_index=True)
payroll_inflation_df

Unnamed: 0,teamid,yearid,salary,pct_of_year_avg,GDPDEF,inflation_index
0,ANA,1997,31135472,0.773356,74.210,63.204245
39,ATL,1997,52278500,1.298515,74.210,63.204245
71,BAL,1997,58516400,1.453455,74.210,63.204245
103,BOS,1997,43558750,1.081931,74.210,63.204245
147,CHA,1997,57740000,1.434170,74.210,63.204245
...,...,...,...,...,...,...
738,SEA,1996,41328501,1.209214,72.991,62.166029
770,SFN,1996,37144725,1.086803,72.991,62.166029
802,SLN,1996,40269667,1.178234,72.991,62.166029
853,TEX,1996,39041528,1.142300,72.991,62.166029


In [45]:
payroll_inflation_df['salary_adjusted'] = 100 * payroll_inflation_df['salary'] / payroll_inflation_df['inflation_index']
payroll_inflation_df.sample(10)

Unnamed: 0,teamid,yearid,salary,pct_of_year_avg,GDPDEF,inflation_index,salary_adjusted
616,OAK,2002,40004167,0.592924,80.783,68.802432,58143540.0
893,TOR,2004,50017000,0.724651,84.569,72.026948,69442070.0
505,ML4,1988,8402000,0.727077,58.676,49.974023,16812730.0
310,DET,2008,137685196,1.538463,94.13,80.169998,171741500.0
345,HOU,1992,15407500,0.497298,67.097,57.146142,26961570.0
894,TOR,2005,45719500,0.626663,87.082,74.167256,61643780.0
399,KCA,2014,74594075,0.701002,103.525,88.171668,84600960.0
151,CHA,2001,65653667,1.004563,79.683,67.865569,96740760.0
97,BOS,1991,35167500,1.491489,65.545,55.824312,62996750.0
751,SEA,2009,98904166,1.113482,94.852,80.784922,122429000.0


In [46]:
payroll_inflation_df.drop(columns=['GDPDEF', 'inflation_index'], inplace=True)
payroll_inflation_df

Unnamed: 0,teamid,yearid,salary,pct_of_year_avg,salary_adjusted
0,ANA,1997,31135472,0.773356,4.926168e+07
39,ATL,1997,52278500,1.298515,8.271359e+07
71,BAL,1997,58516400,1.453455,9.258302e+07
103,BOS,1997,43558750,1.081931,6.891744e+07
147,CHA,1997,57740000,1.434170,9.135462e+07
...,...,...,...,...,...
738,SEA,1996,41328501,1.209214,6.648084e+07
770,SFN,1996,37144725,1.086803,5.975084e+07
802,SLN,1996,40269667,1.178234,6.477761e+07
853,TEX,1996,39041528,1.142300,6.280203e+07


In [47]:
payroll_inflation_df.to_sql(name='payroll', con=engine, if_exists='replace', index=False)
    
print("Payroll df import complete.")

Payroll df import complete.
