In [38]:
from functools import reduce

from pyspark.sql import SparkSession
from pyspark.sql.types import *

from data_vault_loader import DataVaultLoader

spark = SparkSession.builder.master("local")\
                            .appName("Lahman Teams Load")\
                            .config("spark.executor.memory", "6gb")\
                            .enableHiveSupport()\
                            .getOrCreate()

data_set = spark.read.format("csv").option("header", "true").load("/home/ameadows/Downloads/baseballdatabank-master_2018-03-28/baseballdatabank-master/core/Teams.csv")

oldColumns = data_set.schema.names
newColumns = ["year_id"
              , "league_id"
              , "team_id"
              , 'franchise_id'
              , 'division_id'
              , 'rank'
              , 'games_played'
              , 'home_games_played'
              , 'wins'
              , 'losses'
              , 'division_winner'
              , 'wild_card_winner'
              , 'league_champion'
              , 'world_series_winner'
              , 'runs_scored'
              , 'at_bats'
              , 'hits_by_batters'
              , 'doubles'
              , 'triples'
              , 'homeruns_by_batters'
              , 'walks_by_batters'
              , 'strikeouts_by_batters'
              , 'stolen_bases'
              , 'caught_stealing'
              , 'batters_hit_by_pitch'
              , 'sacrifice_flies'
              , 'opponents_runs_scored'
              , 'earned_runs_allowed'
              , 'earned_run_average'
              , 'complete_games'
              , 'shutouts'
              , 'saves'
              , 'outs_pitched'
              , 'hits_allowed'
              , 'homeruns_allowed'
              , 'walks_allowed'
              , 'strikeouts_by_pitchers'
              , 'errors'
              , 'double_plays'
              , 'fielding_percentage'
              , 'team_full_name'
              , 'home_ballpark_name'
              , 'home_attendance_total'
              , 'three_year_park_factor_for_batters'
              , 'three_year_park_factor_for_pitchers'
              , 'baseball_reference_team_id'
              , 'lahman_previous_team_id'
              , 'retrosheet_team_id']

data_set = reduce(lambda data_set, idx: data_set.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), data_set)
data_set.printSchema()
data_set.show()


root
 |-- year_id: string (nullable = true)
 |-- league_id: string (nullable = true)
 |-- team_id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- division_id: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- games_played: string (nullable = true)
 |-- home_games_played: string (nullable = true)
 |-- wins: string (nullable = true)
 |-- losses: string (nullable = true)
 |-- division_winner: string (nullable = true)
 |-- wild_card_winner: string (nullable = true)
 |-- league_champion: string (nullable = true)
 |-- world_series_winner: string (nullable = true)
 |-- runs_scored: string (nullable = true)
 |-- at_bats: string (nullable = true)
 |-- hits_by_batters: string (nullable = true)
 |-- doubles: string (nullable = true)
 |-- triples: string (nullable = true)
 |-- homeruns_by_batters: string (nullable = true)
 |-- walks_by_batters: string (nullable = true)
 |-- strikeouts_by_batters: string (nullable = true)
 |-- stolen_bases: string (nullabl

In [39]:
data_set = data_set.withColumn('rank', data_set['rank'].cast(IntegerType()))
data_set = data_set.withColumn('games_played', data_set['games_played'].cast(IntegerType()))
data_set = data_set.withColumn('home_games_played', data_set['home_games_played'].cast(IntegerType()))
data_set = data_set.withColumn('wins', data_set['wins'].cast(IntegerType()))
data_set = data_set.withColumn('losses', data_set['losses'].cast(IntegerType()))
data_set = data_set.withColumn('division_winner', data_set['division_winner'].cast(BooleanType()))
data_set = data_set.withColumn('wild_card_winner', data_set['wild_card_winner'].cast(BooleanType()))
data_set = data_set.withColumn('league_champion', data_set['league_champion'].cast(BooleanType()))
data_set = data_set.withColumn('world_series_winner', data_set['world_series_winner'].cast(BooleanType()))
data_set = data_set.withColumn('runs_scored', data_set['runs_scored'].cast(IntegerType()))
data_set = data_set.withColumn('at_bats', data_set['at_bats'].cast(IntegerType()))
data_set = data_set.withColumn('hits_by_batters', data_set['hits_by_batters'].cast(IntegerType()))
data_set = data_set.withColumn('doubles', data_set['doubles'].cast(IntegerType()))
data_set = data_set.withColumn('triples', data_set['triples'].cast(IntegerType()))
data_set = data_set.withColumn('homeruns_by_batters', data_set['homeruns_by_batters'].cast(IntegerType()))
data_set = data_set.withColumn('walks_by_batters', data_set['walks_by_batters'].cast(IntegerType()))
data_set = data_set.withColumn('strikeouts_by_batters', data_set['strikeouts_by_batters'].cast(IntegerType()))
data_set = data_set.withColumn('stolen_bases', data_set['stolen_bases'].cast(IntegerType()))
data_set = data_set.withColumn('caught_stealing', data_set['caught_stealing'].cast(IntegerType()))
data_set = data_set.withColumn('batters_hit_by_pitch', data_set['batters_hit_by_pitch'].cast(IntegerType()))
data_set = data_set.withColumn('sacrifice_flies', data_set['sacrifice_flies'].cast(IntegerType()))
data_set = data_set.withColumn('opponents_run_scored', data_set['opponents_runs_scored'].cast(IntegerType()))
data_set = data_set.withColumn('earned_runs_allowed', data_set['earned_runs_allowed'].cast(IntegerType()))
data_set = data_set.withColumn('complete_games', data_set['complete_games'].cast(IntegerType()))
data_set = data_set.withColumn('shutouts', data_set['shutouts'].cast(IntegerType()))
data_set = data_set.withColumn('saves', data_set['saves'].cast(IntegerType()))
data_set = data_set.withColumn('outs_pitched', data_set['outs_pitched'].cast(IntegerType()))
data_set = data_set.withColumn('hits_allowed', data_set['hits_allowed'].cast(IntegerType()))
data_set = data_set.withColumn('homeruns_allowed', data_set['homeruns_allowed'].cast(IntegerType()))
data_set = data_set.withColumn('walks_allowed', data_set['walks_allowed'].cast(IntegerType()))
data_set = data_set.withColumn('strikeouts_by_pitchers', data_set['strikeouts_by_pitchers'].cast(IntegerType()))
data_set = data_set.withColumn('errors', data_set['errors'].cast(IntegerType()))
data_set = data_set.withColumn('double_plays', data_set['double_plays'].cast(IntegerType()))
data_set = data_set.withColumn('fielding_percentage', data_set['fielding_percentage'].cast(FloatType()))
data_set = data_set.withColumn('home_attendance_total', data_set['home_attendance_total'].cast(IntegerType()))
data_set = data_set.withColumn('runs_scored', data_set['runs_scored'].cast(IntegerType()))
data_set = data_set.withColumn('three_year_park_factor_for_batters', data_set['three_year_park_factor_for_batters'].cast(IntegerType()))
data_set = data_set.withColumn('three_year_park_factor_for_pitchers', data_set['three_year_park_factor_for_pitchers'].cast(IntegerType()))

data_set.printSchema()


root
 |-- year_id: string (nullable = true)
 |-- league_id: string (nullable = true)
 |-- team_id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- division_id: string (nullable = true)
 |-- rank: integer (nullable = true)
 |-- games_played: integer (nullable = true)
 |-- home_games_played: integer (nullable = true)
 |-- wins: integer (nullable = true)
 |-- losses: integer (nullable = true)
 |-- division_winner: boolean (nullable = true)
 |-- wild_card_winner: boolean (nullable = true)
 |-- league_champion: boolean (nullable = true)
 |-- world_series_winner: boolean (nullable = true)
 |-- runs_scored: integer (nullable = true)
 |-- at_bats: integer (nullable = true)
 |-- hits_by_batters: integer (nullable = true)
 |-- doubles: integer (nullable = true)
 |-- triples: integer (nullable = true)
 |-- homeruns_by_batters: integer (nullable = true)
 |-- walks_by_batters: integer (nullable = true)
 |-- strikeouts_by_batters: integer (nullable = true)
 |-- stolen_bases

In [40]:
# Add Hub UUIDs to data_set

data_set = DataVaultLoader().universal_identifier_generator(data_set=data_set, key_field='year_id', key_name='year_uuid')
data_set = DataVaultLoader().universal_identifier_generator(data_set=data_set, key_field='league_id', key_name='league_uuid')
data_set = DataVaultLoader().universal_identifier_generator(data_set=data_set, key_field='team_id', key_name='team_uuid')
data_set = DataVaultLoader().universal_identifier_generator(data_set=data_set, key_field='franchise_id', key_name='franchise_uuid')
data_set = DataVaultLoader().universal_identifier_generator(data_set=data_set, key_field='division_id', key_name='division_uuid')

data_set.printSchema()
data_set.select('year_uuid').show()


root
 |-- year_id: string (nullable = true)
 |-- league_id: string (nullable = true)
 |-- team_id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- division_id: string (nullable = true)
 |-- rank: integer (nullable = true)
 |-- games_played: integer (nullable = true)
 |-- home_games_played: integer (nullable = true)
 |-- wins: integer (nullable = true)
 |-- losses: integer (nullable = true)
 |-- division_winner: boolean (nullable = true)
 |-- wild_card_winner: boolean (nullable = true)
 |-- league_champion: boolean (nullable = true)
 |-- world_series_winner: boolean (nullable = true)
 |-- runs_scored: integer (nullable = true)
 |-- at_bats: integer (nullable = true)
 |-- hits_by_batters: integer (nullable = true)
 |-- doubles: integer (nullable = true)
 |-- triples: integer (nullable = true)
 |-- homeruns_by_batters: integer (nullable = true)
 |-- walks_by_batters: integer (nullable = true)
 |-- strikeouts_by_batters: integer (nullable = true)
 |-- stolen_bases

In [41]:
# Build the Teams Link UUID

data_set = DataVaultLoader().universal_identifier_generator(data_set=data_set, key_field=['year_id', 'league_id', 'team_id', 'franchise_id', 'division_id'], key_name='team_link_uuid')
data_set.printSchema()
data_set.select('team_link_uuid').show()


root
 |-- year_id: string (nullable = true)
 |-- league_id: string (nullable = true)
 |-- team_id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- division_id: string (nullable = true)
 |-- rank: integer (nullable = true)
 |-- games_played: integer (nullable = true)
 |-- home_games_played: integer (nullable = true)
 |-- wins: integer (nullable = true)
 |-- losses: integer (nullable = true)
 |-- division_winner: boolean (nullable = true)
 |-- wild_card_winner: boolean (nullable = true)
 |-- league_champion: boolean (nullable = true)
 |-- world_series_winner: boolean (nullable = true)
 |-- runs_scored: integer (nullable = true)
 |-- at_bats: integer (nullable = true)
 |-- hits_by_batters: integer (nullable = true)
 |-- doubles: integer (nullable = true)
 |-- triples: integer (nullable = true)
 |-- homeruns_by_batters: integer (nullable = true)
 |-- walks_by_batters: integer (nullable = true)
 |-- strikeouts_by_batters: integer (nullable = true)
 |-- stolen_bases

In [42]:
# Build the Teams Sat Hash
column_list = [ 'rank'
              , 'games_played'
              , 'home_games_played'
              , 'wins'
              , 'losses'
              , 'division_winner'
              , 'wild_card_winner'
              , 'league_champion'
              , 'world_series_winner'
              , 'runs_scored'
              , 'at_bats'
              , 'hits_by_batters'
              , 'doubles'
              , 'triples'
              , 'homeruns_by_batters'
              , 'walks_by_batters'
              , 'strikeouts_by_batters'
              , 'stolen_bases'
              , 'caught_stealing'
              , 'batters_hit_by_pitch'
              , 'sacrifice_flies'
              , 'opponents_runs_scored'
              , 'earned_runs_allowed'
              , 'earned_run_average'
              , 'complete_games'
              , 'shutouts'
              , 'saves'
              , 'outs_pitched'
              , 'hits_allowed'
              , 'homeruns_allowed'
              , 'walks_allowed'
              , 'strikeouts_by_pitchers'
              , 'errors'
              , 'double_plays'
              , 'fielding_percentage'
              , 'team_full_name'
              , 'home_ballpark_name'
              , 'home_attendance_total'
              , 'three_year_park_factor_for_batters'
              , 'three_year_park_factor_for_pitchers'
              , 'baseball_reference_team_id'
              , 'lahman_previous_team_id'
              , 'retrosheet_team_id']

data_set = DataVaultLoader().universal_identifier_generator(data_set=data_set, key_field=column_list, key_name='team_sat_hash')
data_set.printSchema()


root
 |-- year_id: string (nullable = true)
 |-- league_id: string (nullable = true)
 |-- team_id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- division_id: string (nullable = true)
 |-- rank: integer (nullable = true)
 |-- games_played: integer (nullable = true)
 |-- home_games_played: integer (nullable = true)
 |-- wins: integer (nullable = true)
 |-- losses: integer (nullable = true)
 |-- division_winner: boolean (nullable = true)
 |-- wild_card_winner: boolean (nullable = true)
 |-- league_champion: boolean (nullable = true)
 |-- world_series_winner: boolean (nullable = true)
 |-- runs_scored: integer (nullable = true)
 |-- at_bats: integer (nullable = true)
 |-- hits_by_batters: integer (nullable = true)
 |-- doubles: integer (nullable = true)
 |-- triples: integer (nullable = true)
 |-- homeruns_by_batters: integer (nullable = true)
 |-- walks_by_batters: integer (nullable = true)
 |-- strikeouts_by_batters: integer (nullable = true)
 |-- stolen_bases

In [43]:
# Add create audit fields to data_set
from data_vault_loader import DataVaultLoader

data_set = DataVaultLoader().audit_field_manager(data_set=data_set, audit_type='create', process='Spark Test', actor='Me', source='Lahman CSV')
data_set.printSchema()


root
 |-- year_id: string (nullable = true)
 |-- league_id: string (nullable = true)
 |-- team_id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- division_id: string (nullable = true)
 |-- rank: integer (nullable = true)
 |-- games_played: integer (nullable = true)
 |-- home_games_played: integer (nullable = true)
 |-- wins: integer (nullable = true)
 |-- losses: integer (nullable = true)
 |-- division_winner: boolean (nullable = true)
 |-- wild_card_winner: boolean (nullable = true)
 |-- league_champion: boolean (nullable = true)
 |-- world_series_winner: boolean (nullable = true)
 |-- runs_scored: integer (nullable = true)
 |-- at_bats: integer (nullable = true)
 |-- hits_by_batters: integer (nullable = true)
 |-- doubles: integer (nullable = true)
 |-- triples: integer (nullable = true)
 |-- homeruns_by_batters: integer (nullable = true)
 |-- walks_by_batters: integer (nullable = true)
 |-- strikeouts_by_batters: integer (nullable = true)
 |-- stolen_bases

In [7]:
# Build the Year Hub
year_hub = data_set.select('year_uuid', 'year_id', 'create_actor_id', 'create_date_time', 'create_process_id', 'create_source_id')
year_hub = year_hub.dropDuplicates()




In [8]:
# Build the League Hub
league_hub = data_set.select('league_uuid', 'league_id', 'create_actor_id', 'create_date_time', 'create_process_id', 'create_source_id')
league_hub = league_hub.dropDuplicates()



In [9]:
# Build the Team Hub

team_hub = data_set.select('team_uuid', 'team_id', 'create_actor_id', 'create_date_time', 'create_process_id', 'create_source_id')
team_hub = team_hub.dropDuplicates()



In [10]:
# Build the Franchise Hub

franchise_hub = data_set.select('franchise_uuid', 'franchise_id', 'create_actor_id', 'create_date_time', 'create_process_id', 'create_source_id')
franchise_hub = franchise_hub.dropDuplicates()



In [11]:
# Build the Division Hub

division_hub = data_set.select('division_uuid', 'division_id', 'create_actor_id', 'create_date_time', 'create_process_id', 'create_source_id')
division_hub = division_hub.dropDuplicates()



In [12]:
# Build the Team Link
team_link = data_set.select('team_link_uuid', 'year_uuid', 'league_uuid', 'team_uuid', 'franchise_uuid', 'division_uuid', 'create_actor_id', 'create_date_time', 'create_process_id', 'create_source_id')
team_link = team_link.dropDuplicates()




In [44]:
# Build the Team Sat
team_sat = data_set.select('team_link_uuid'
                           ,'team_sat_hash'
                           , 'rank'
              , 'games_played'
              , 'home_games_played'
              , 'wins'
              , 'losses'
              , 'division_winner'
              , 'wild_card_winner'
              , 'league_champion'
              , 'world_series_winner'
              , 'runs_scored'
              , 'at_bats'
              , 'hits_by_batters'
              , 'doubles'
              , 'triples'
              , 'homeruns_by_batters'
              , 'walks_by_batters'
              , 'strikeouts_by_batters'
              , 'stolen_bases'
              , 'caught_stealing'
              , 'batters_hit_by_pitch'
              , 'sacrifice_flies'
              , 'opponents_runs_scored'
              , 'earned_runs_allowed'
              , 'earned_run_average'
              , 'complete_games'
              , 'shutouts'
              , 'saves'
              , 'outs_pitched'
              , 'hits_allowed'
              , 'homeruns_allowed'
              , 'walks_allowed'
              , 'strikeouts_by_pitchers'
              , 'errors'
              , 'double_plays'
              , 'fielding_percentage'
              , 'team_full_name'
              , 'home_ballpark_name'
              , 'home_attendance_total'
              , 'three_year_park_factor_for_batters'
              , 'three_year_park_factor_for_pitchers'
              , 'baseball_reference_team_id'
              , 'lahman_previous_team_id'
              , 'retrosheet_team_id')




In [15]:
# Write all the hub, link, and sat files

year_hub.write.parquet('/home/ameadows/Documents/testing/data_vault/hub/year_hub.parquet')
league_hub.write.parquet('/home/ameadows/Documents/testing/data_vault/hub/league_hub.parquet')
team_hub.write.parquet('/home/ameadows/Documents/testing/data_vault/hub/team_hub.parquet')
franchise_hub.write.parquet('/home/ameadows/Documents/testing/data_vault/hub/franchise_hub.parquet')
division_hub.write.parquet('/home/ameadows/Documents/testing/data_vault/hub/division_hub.parquet')
team_link.write.parquet('/home/ameadows/Documents/testing/data_vault/link/team_link.parquet')
team_sat.write.parquet('/home/ameadows/Documents/testing/data_vault/sat/team_sat.parquet')


AnalysisException: 'path file:/home/ameadows/Documents/testing/data_vault/hub/year_hub.parquet already exists.;'

In [14]:
# Attempting to build persistent tables from data.
year_hub.write.saveAsTable('data_vault.year_hub')
league_hub.write.saveAsTable('data_vault.league_hub')
team_hub.write.saveAsTable('data_vault.team_hub')
franchise_hub.write.saveAsTable('data_vault.franchise_hub')
division_hub.write.saveAsTable('data_vault.division_hub')
team_link.write.saveAsTable('data_vault.team_link')
team_sat.write.saveAsTable('data_vault.team_sat')


AnalysisException: 'Table `data_vault`.`year_hub` already exists.;'

In [58]:
# Needed to rebuild the year_hub_table
spark.sql("DROP TABLE year_hub")


DataFrame[]

In [17]:
#Test query
spark.sql("SELECT * FROM year_hub ORDER BY year_id").show()


+--------------------+-------+---------------+--------------------+-----------------+----------------+
|           year_uuid|year_id|create_actor_id|    create_date_time|create_process_id|create_source_id|
+--------------------+-------+---------------+--------------------+-----------------+----------------+
|47eb859381079ad4e...|   1871|             Me|2019-04-24 15:37:...|       Spark Test|      Lahman CSV|
|6d9556e4e91bbb35a...|   1872|             Me|2019-04-24 15:37:...|       Spark Test|      Lahman CSV|
|d9c4f767efdc3bf9f...|   1873|             Me|2019-04-24 15:37:...|       Spark Test|      Lahman CSV|
|accaa848de69f2c43...|   1874|             Me|2019-04-24 15:37:...|       Spark Test|      Lahman CSV|
|6036d88bfac7ab6af...|   1875|             Me|2019-04-24 15:37:...|       Spark Test|      Lahman CSV|
|60fb2786f038c2a4d...|   1876|             Me|2019-04-24 15:37:...|       Spark Test|      Lahman CSV|
|a84d8719ce5a93ddc...|   1877|             Me|2019-04-24 15:37:...|      

In [25]:
# Testing with new spark session

spark2 = SparkSession.builder.master("local").appName("Lahman Teams Load 2").config("spark.executor.memory", "6gb").getOrCreate()
spark2.sql("SELECT * FROM team_sat").show()


+--------------------+--------------------+----+------------+-----------------+----+------+---------------+----------------+---------------+-------------------+-----------+-------+---------------+-------+-------+-------------------+----------------+---------------------+------------+---------------+--------------------+---------------+---------------------+-------------------+------------------+--------------+--------+-----+------------+------------+----------------+-------------+----------------------+------+------------+-------------------+--------------------+--------------------+---------------------+----------------------------------+-----------------------------------+--------------------------+-----------------------+------------------+
|      team_link_uuid|       team_sat_hash|rank|games_played|home_games_played|wins|losses|division_winner|wild_card_winner|league_champion|world_series_winner|runs_scored|at_bats|hits_by_batters|doubles|triples|homeruns_by_batters|walks_by_batte

In [22]:
spark.sql('USE data_vault').show()


++
||
++
++



In [45]:
# Creating incremental data set for 2018
data_set_incremental = spark.read.format("csv").option("header", "true").load("/home/ameadows/Downloads/baseballdatabank-2019.2/core/Teams.csv")

oldColumns = data_set_incremental.schema.names
newColumns = ["year_id"
              , "league_id"
              , "team_id"
              , 'franchise_id'
              , 'division_id'
              , 'rank'
              , 'games_played'
              , 'home_games_played'
              , 'wins'
              , 'losses'
              , 'division_winner'
              , 'wild_card_winner'
              , 'league_champion'
              , 'world_series_winner'
              , 'runs_scored'
              , 'at_bats'
              , 'hits_by_batters'
              , 'doubles'
              , 'triples'
              , 'homeruns_by_batters'
              , 'walks_by_batters'
              , 'strikeouts_by_batters'
              , 'stolen_bases'
              , 'caught_stealing'
              , 'batters_hit_by_pitch'
              , 'sacrifice_flies'
              , 'opponents_runs_scored'
              , 'earned_runs_allowed'
              , 'earned_run_average'
              , 'complete_games'
              , 'shutouts'
              , 'saves'
              , 'outs_pitched'
              , 'hits_allowed'
              , 'homeruns_allowed'
              , 'walks_allowed'
              , 'strikeouts_by_pitchers'
              , 'errors'
              , 'double_plays'
              , 'fielding_percentage'
              , 'team_full_name'
              , 'home_ballpark_name'
              , 'home_attendance_total'
              , 'three_year_park_factor_for_batters'
              , 'three_year_park_factor_for_pitchers'
              , 'baseball_reference_team_id'
              , 'lahman_previous_team_id'
              , 'retrosheet_team_id']

data_set_incremental = reduce(lambda data_set_incremental, idx: data_set_incremental.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), data_set_incremental)
data_set_incremental.printSchema()
data_set_incremental.show()


root
 |-- year_id: string (nullable = true)
 |-- league_id: string (nullable = true)
 |-- team_id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- division_id: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- games_played: string (nullable = true)
 |-- home_games_played: string (nullable = true)
 |-- wins: string (nullable = true)
 |-- losses: string (nullable = true)
 |-- division_winner: string (nullable = true)
 |-- wild_card_winner: string (nullable = true)
 |-- league_champion: string (nullable = true)
 |-- world_series_winner: string (nullable = true)
 |-- runs_scored: string (nullable = true)
 |-- at_bats: string (nullable = true)
 |-- hits_by_batters: string (nullable = true)
 |-- doubles: string (nullable = true)
 |-- triples: string (nullable = true)
 |-- homeruns_by_batters: string (nullable = true)
 |-- walks_by_batters: string (nullable = true)
 |-- strikeouts_by_batters: string (nullable = true)
 |-- stolen_bases: string (nullabl

In [46]:
data_set_incremental = data_set_incremental.withColumn('rank', data_set_incremental['rank'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('games_played', data_set_incremental['games_played'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('home_games_played', data_set_incremental['home_games_played'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('wins', data_set_incremental['wins'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('losses', data_set_incremental['losses'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('division_winner', data_set_incremental['division_winner'].cast(BooleanType()))
data_set_incremental = data_set_incremental.withColumn('wild_card_winner', data_set_incremental['wild_card_winner'].cast(BooleanType()))
data_set_incremental = data_set_incremental.withColumn('league_champion', data_set_incremental['league_champion'].cast(BooleanType()))
data_set_incremental = data_set_incremental.withColumn('world_series_winner', data_set_incremental['world_series_winner'].cast(BooleanType()))
data_set_incremental = data_set_incremental.withColumn('runs_scored', data_set_incremental['runs_scored'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('at_bats', data_set_incremental['at_bats'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('hits_by_batters', data_set_incremental['hits_by_batters'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('doubles', data_set_incremental['doubles'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('triples', data_set_incremental['triples'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('homeruns_by_batters', data_set_incremental['homeruns_by_batters'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('walks_by_batters', data_set_incremental['walks_by_batters'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('strikeouts_by_batters', data_set_incremental['strikeouts_by_batters'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('stolen_bases', data_set_incremental['stolen_bases'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('caught_stealing', data_set_incremental['caught_stealing'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('batters_hit_by_pitch', data_set_incremental['batters_hit_by_pitch'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('sacrifice_flies', data_set_incremental['sacrifice_flies'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('opponents_run_scored', data_set_incremental['opponents_runs_scored'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('earned_runs_allowed', data_set_incremental['earned_runs_allowed'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('complete_games', data_set_incremental['complete_games'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('shutouts', data_set_incremental['shutouts'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('saves', data_set_incremental['saves'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('outs_pitched', data_set_incremental['outs_pitched'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('hits_allowed', data_set_incremental['hits_allowed'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('homeruns_allowed', data_set_incremental['homeruns_allowed'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('walks_allowed', data_set_incremental['walks_allowed'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('strikeouts_by_pitchers', data_set_incremental['strikeouts_by_pitchers'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('errors', data_set_incremental['errors'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('double_plays', data_set_incremental['double_plays'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('fielding_percentage', data_set_incremental['fielding_percentage'].cast(FloatType()))
data_set_incremental = data_set_incremental.withColumn('home_attendance_total', data_set_incremental['home_attendance_total'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('runs_scored', data_set_incremental['runs_scored'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('three_year_park_factor_for_batters', data_set_incremental['three_year_park_factor_for_batters'].cast(IntegerType()))
data_set_incremental = data_set_incremental.withColumn('three_year_park_factor_for_pitchers', data_set_incremental['three_year_park_factor_for_pitchers'].cast(IntegerType()))

data_set_incremental.printSchema()


root
 |-- year_id: string (nullable = true)
 |-- league_id: string (nullable = true)
 |-- team_id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- division_id: string (nullable = true)
 |-- rank: integer (nullable = true)
 |-- games_played: integer (nullable = true)
 |-- home_games_played: integer (nullable = true)
 |-- wins: integer (nullable = true)
 |-- losses: integer (nullable = true)
 |-- division_winner: boolean (nullable = true)
 |-- wild_card_winner: boolean (nullable = true)
 |-- league_champion: boolean (nullable = true)
 |-- world_series_winner: boolean (nullable = true)
 |-- runs_scored: integer (nullable = true)
 |-- at_bats: integer (nullable = true)
 |-- hits_by_batters: integer (nullable = true)
 |-- doubles: integer (nullable = true)
 |-- triples: integer (nullable = true)
 |-- homeruns_by_batters: integer (nullable = true)
 |-- walks_by_batters: integer (nullable = true)
 |-- strikeouts_by_batters: integer (nullable = true)
 |-- stolen_bases

In [47]:
# Add Hub UUIDs to data_set

data_set_incremental = DataVaultLoader().universal_identifier_generator(data_set=data_set_incremental, key_field='year_id', key_name='year_uuid')
data_set_incremental = DataVaultLoader().universal_identifier_generator(data_set=data_set_incremental, key_field='league_id', key_name='league_uuid')
data_set_incremental = DataVaultLoader().universal_identifier_generator(data_set=data_set_incremental, key_field='team_id', key_name='team_uuid')
data_set_incremental = DataVaultLoader().universal_identifier_generator(data_set=data_set_incremental, key_field='franchise_id', key_name='franchise_uuid')
data_set_incremental = DataVaultLoader().universal_identifier_generator(data_set=data_set_incremental, key_field='division_id', key_name='division_uuid')

data_set_incremental.printSchema()
data_set_incremental.select('year_uuid').show()


root
 |-- year_id: string (nullable = true)
 |-- league_id: string (nullable = true)
 |-- team_id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- division_id: string (nullable = true)
 |-- rank: integer (nullable = true)
 |-- games_played: integer (nullable = true)
 |-- home_games_played: integer (nullable = true)
 |-- wins: integer (nullable = true)
 |-- losses: integer (nullable = true)
 |-- division_winner: boolean (nullable = true)
 |-- wild_card_winner: boolean (nullable = true)
 |-- league_champion: boolean (nullable = true)
 |-- world_series_winner: boolean (nullable = true)
 |-- runs_scored: integer (nullable = true)
 |-- at_bats: integer (nullable = true)
 |-- hits_by_batters: integer (nullable = true)
 |-- doubles: integer (nullable = true)
 |-- triples: integer (nullable = true)
 |-- homeruns_by_batters: integer (nullable = true)
 |-- walks_by_batters: integer (nullable = true)
 |-- strikeouts_by_batters: integer (nullable = true)
 |-- stolen_bases

In [48]:
# Build the Teams Link UUID

data_set_incremental = DataVaultLoader().universal_identifier_generator(data_set=data_set_incremental, key_field=['year_id', 'league_id', 'team_id', 'franchise_id', 'division_id'], key_name='team_link_uuid')
data_set_incremental.printSchema()
data_set_incremental.select('team_link_uuid').show()


root
 |-- year_id: string (nullable = true)
 |-- league_id: string (nullable = true)
 |-- team_id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- division_id: string (nullable = true)
 |-- rank: integer (nullable = true)
 |-- games_played: integer (nullable = true)
 |-- home_games_played: integer (nullable = true)
 |-- wins: integer (nullable = true)
 |-- losses: integer (nullable = true)
 |-- division_winner: boolean (nullable = true)
 |-- wild_card_winner: boolean (nullable = true)
 |-- league_champion: boolean (nullable = true)
 |-- world_series_winner: boolean (nullable = true)
 |-- runs_scored: integer (nullable = true)
 |-- at_bats: integer (nullable = true)
 |-- hits_by_batters: integer (nullable = true)
 |-- doubles: integer (nullable = true)
 |-- triples: integer (nullable = true)
 |-- homeruns_by_batters: integer (nullable = true)
 |-- walks_by_batters: integer (nullable = true)
 |-- strikeouts_by_batters: integer (nullable = true)
 |-- stolen_bases

In [49]:
# Build the Teams Sat Hash
column_list = [ 'rank'
              , 'games_played'
              , 'home_games_played'
              , 'wins'
              , 'losses'
              , 'division_winner'
              , 'wild_card_winner'
              , 'league_champion'
              , 'world_series_winner'
              , 'runs_scored'
              , 'at_bats'
              , 'hits_by_batters'
              , 'doubles'
              , 'triples'
              , 'homeruns_by_batters'
              , 'walks_by_batters'
              , 'strikeouts_by_batters'
              , 'stolen_bases'
              , 'caught_stealing'
              , 'batters_hit_by_pitch'
              , 'sacrifice_flies'
              , 'opponents_runs_scored'
              , 'earned_runs_allowed'
              , 'earned_run_average'
              , 'complete_games'
              , 'shutouts'
              , 'saves'
              , 'outs_pitched'
              , 'hits_allowed'
              , 'homeruns_allowed'
              , 'walks_allowed'
              , 'strikeouts_by_pitchers'
              , 'errors'
              , 'double_plays'
              , 'fielding_percentage'
              , 'team_full_name'
              , 'home_ballpark_name'
              , 'home_attendance_total'
              , 'three_year_park_factor_for_batters'
              , 'three_year_park_factor_for_pitchers'
              , 'baseball_reference_team_id'
              , 'lahman_previous_team_id'
              , 'retrosheet_team_id']

data_set_incremental = DataVaultLoader().universal_identifier_generator(data_set=data_set_incremental, key_field=column_list, key_name='team_sat_hash')
data_set_incremental.printSchema()


root
 |-- year_id: string (nullable = true)
 |-- league_id: string (nullable = true)
 |-- team_id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- division_id: string (nullable = true)
 |-- rank: integer (nullable = true)
 |-- games_played: integer (nullable = true)
 |-- home_games_played: integer (nullable = true)
 |-- wins: integer (nullable = true)
 |-- losses: integer (nullable = true)
 |-- division_winner: boolean (nullable = true)
 |-- wild_card_winner: boolean (nullable = true)
 |-- league_champion: boolean (nullable = true)
 |-- world_series_winner: boolean (nullable = true)
 |-- runs_scored: integer (nullable = true)
 |-- at_bats: integer (nullable = true)
 |-- hits_by_batters: integer (nullable = true)
 |-- doubles: integer (nullable = true)
 |-- triples: integer (nullable = true)
 |-- homeruns_by_batters: integer (nullable = true)
 |-- walks_by_batters: integer (nullable = true)
 |-- strikeouts_by_batters: integer (nullable = true)
 |-- stolen_bases

In [50]:
# Add create audit fields to data_set
data_set_incremental = DataVaultLoader().audit_field_manager(data_set=data_set_incremental, audit_type='create', process='Spark Test', actor='Me', source='Lahman CSV')
data_set_incremental.printSchema()


root
 |-- year_id: string (nullable = true)
 |-- league_id: string (nullable = true)
 |-- team_id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- division_id: string (nullable = true)
 |-- rank: integer (nullable = true)
 |-- games_played: integer (nullable = true)
 |-- home_games_played: integer (nullable = true)
 |-- wins: integer (nullable = true)
 |-- losses: integer (nullable = true)
 |-- division_winner: boolean (nullable = true)
 |-- wild_card_winner: boolean (nullable = true)
 |-- league_champion: boolean (nullable = true)
 |-- world_series_winner: boolean (nullable = true)
 |-- runs_scored: integer (nullable = true)
 |-- at_bats: integer (nullable = true)
 |-- hits_by_batters: integer (nullable = true)
 |-- doubles: integer (nullable = true)
 |-- triples: integer (nullable = true)
 |-- homeruns_by_batters: integer (nullable = true)
 |-- walks_by_batters: integer (nullable = true)
 |-- strikeouts_by_batters: integer (nullable = true)
 |-- stolen_bases

In [22]:
# Build the Year Hub
year_hub_incremental = data_set_incremental.select('year_uuid', 'year_id', 'create_actor_id', 'create_date_time', 'create_process_id', 'create_source_id')
year_hub_incremental = year_hub_incremental.dropDuplicates()




In [23]:
# Build the League Hub
league_hub_incremental = data_set_incremental.select('league_uuid', 'league_id', 'create_actor_id', 'create_date_time', 'create_process_id', 'create_source_id')
league_hub_incremental = league_hub_incremental.dropDuplicates()



In [24]:
# Build the Team Hub

team_hub_incremental = data_set_incremental.select('team_uuid', 'team_id', 'create_actor_id', 'create_date_time', 'create_process_id', 'create_source_id')
team_hub_incremental = team_hub_incremental.dropDuplicates()



In [25]:
# Build the Franchise Hub

franchise_hub_incremental = data_set_incremental.select('franchise_uuid', 'franchise_id', 'create_actor_id', 'create_date_time', 'create_process_id', 'create_source_id')
franchise_hub_incremental = franchise_hub_incremental.dropDuplicates()



In [26]:
# Build the Division Hub

division_hub_incremental = data_set_incremental.select('division_uuid', 'division_id', 'create_actor_id', 'create_date_time', 'create_process_id', 'create_source_id')
division_hub_incremental = division_hub_incremental.dropDuplicates()



In [27]:
# Build the Team Link
team_link_incremental = data_set_incremental.select('team_link_uuid', 'year_uuid', 'league_uuid', 'team_uuid', 'franchise_uuid', 'division_uuid', 'create_actor_id', 'create_date_time', 'create_process_id', 'create_source_id')
team_link_incremental = team_link_incremental.dropDuplicates()




In [51]:
# Build the Team Sat
team_sat_incremental = data_set_incremental.select('team_link_uuid'
                           ,'team_sat_hash'
                           , 'rank'
              , 'games_played'
              , 'home_games_played'
              , 'wins'
              , 'losses'
              , 'division_winner'
              , 'wild_card_winner'
              , 'league_champion'
              , 'world_series_winner'
              , 'runs_scored'
              , 'at_bats'
              , 'hits_by_batters'
              , 'doubles'
              , 'triples'
              , 'homeruns_by_batters'
              , 'walks_by_batters'
              , 'strikeouts_by_batters'
              , 'stolen_bases'
              , 'caught_stealing'
              , 'batters_hit_by_pitch'
              , 'sacrifice_flies'
              , 'opponents_runs_scored'
              , 'earned_runs_allowed'
              , 'earned_run_average'
              , 'complete_games'
              , 'shutouts'
              , 'saves'
              , 'outs_pitched'
              , 'hits_allowed'
              , 'homeruns_allowed'
              , 'walks_allowed'
              , 'strikeouts_by_pitchers'
              , 'errors'
              , 'double_plays'
              , 'fielding_percentage'
              , 'team_full_name'
              , 'home_ballpark_name'
              , 'home_attendance_total'
              , 'three_year_park_factor_for_batters'
              , 'three_year_park_factor_for_pitchers'
              , 'baseball_reference_team_id'
              , 'lahman_previous_team_id'
              , 'retrosheet_team_id')
team_sat_incremental.show()


+--------------------+--------------------+----+------------+-----------------+----+------+---------------+----------------+---------------+-------------------+-----------+-------+---------------+-------+-------+-------------------+----------------+---------------------+------------+---------------+--------------------+---------------+---------------------+-------------------+------------------+--------------+--------+-----+------------+------------+----------------+-------------+----------------------+------+------------+-------------------+--------------------+--------------------+---------------------+----------------------------------+-----------------------------------+--------------------------+-----------------------+------------------+
|      team_link_uuid|       team_sat_hash|rank|games_played|home_games_played|wins|losses|division_winner|wild_card_winner|league_champion|world_series_winner|runs_scored|at_bats|hits_by_batters|doubles|triples|homeruns_by_batters|walks_by_batte

In [29]:
# Need to compare current with previous - starting with year_hub
current_year_hub = year_hub.select('year_uuid', 'year_id')
year_hub_delta = year_hub_incremental.select('year_uuid', 'year_id').subtract(current_year_hub.select('year_uuid', 'year_id'))
year_hub_delta.show()
year_hub_delta.printSchema()


+--------------------+-------+
|           year_uuid|year_id|
+--------------------+-------+
|db08fc98c9897a7ea...|   2018|
+--------------------+-------+

root
 |-- year_uuid: string (nullable = true)
 |-- year_id: string (nullable = true)



In [87]:
# Testing dynamic select
filter_field = ['year_uuid', 'year_id']
test_year_hub = year_hub.select(*filter_field)
test_year_hub.show()


+--------------------+-------+
|           year_uuid|year_id|
+--------------------+-------+
|86bfa887cec45b0cd...|   1894|
|f73a6a9ac4a14ecbe...|   1949|
|0272e80f4c075448b...|   1900|
|d5d1a7acda37f7be9...|   1930|
|7a3d2964b02fa7802...|   1898|
|0a228d2af72dcac77...|   1979|
|0d8f218692cb46718...|   1925|
|98f7a21edf3a12c0b...|   1985|
|6d9556e4e91bbb35a...|   1872|
|eb195e87b50c3af1a...|   1882|
|5388b4baffece7664...|   1956|
|1e8c92ad2f022b12b...|   1927|
|298a67083b66c73a2...|   1913|
|730bb68d3f5dff9d6...|   2004|
|385241d8c706c5b9f...|   1881|
|9513eb867cbbee254...|   1980|
|e45de8a892bcb7636...|   1972|
|1ce00c02830d33e65...|   1921|
|f6dda54793349a9e2...|   2002|
|919d5469c0a0b74bb...|   2006|
+--------------------+-------+
only showing top 20 rows



In [83]:
# Playing with filter
year_hub_incremental.filter(year_hub_incremental.year_id.isin(['2018'])).show()



+--------------------+-------+---------------+--------------------+-----------------+----------------+
|           year_uuid|year_id|create_actor_id|    create_date_time|create_process_id|create_source_id|
+--------------------+-------+---------------+--------------------+-----------------+----------------+
|db08fc98c9897a7ea...|   2018|             Me|2019-04-26 09:24:...|       Spark Test|      Lahman CSV|
+--------------------+-------+---------------+--------------------+-----------------+----------------+



In [80]:
# Getting the delta records and loading them into the year_hub table.
# Had to create the filter list first, then use that list to get the records we actually care about.
year_uuid_list = [row.year_uuid for row in year_hub_delta.select('year_uuid').collect()]
print(year_uuid_list)


['db08fc98c9897a7ea9ccd50ff639f5197124ff1890f0f26adc6ae8b20a877098d380c99994adb43a3a2018dcfc47b9caf26f4c4b49702b4b11227435109829aa']


In [81]:
year_hub_delta = year_hub_incremental.filter(year_hub_incremental.year_uuid.isin(year_uuid_list))
year_hub_delta.show()


+--------------------+-------+---------------+--------------------+-----------------+----------------+
|           year_uuid|year_id|create_actor_id|    create_date_time|create_process_id|create_source_id|
+--------------------+-------+---------------+--------------------+-----------------+----------------+
|db08fc98c9897a7ea...|   2018|             Me|2019-04-26 09:24:...|       Spark Test|      Lahman CSV|
+--------------------+-------+---------------+--------------------+-----------------+----------------+



In [67]:
# Testing join conditions
key_fields = ['team_link_uuid']
orig = team_sat.alias('orig')
temp_team_sat = orig.select('team_link_uuid', 'team_sat_hash')

delta = team_sat_incremental.alias('delta')
temp_team_sat_incremental = delta.select('team_link_uuid', 'team_sat_hash')

potential_update_list = orig.join(delta, (orig.team_link_uuid == delta.team_link_uuid) & (orig.team_sat_hash != delta.team_sat_hash))
potential_update_list = potential_update_list.select('orig.team_link_uuid')
potential_update_list.show()


+--------------------+
|      team_link_uuid|
+--------------------+
|aa5cd45417d543601...|
|fff2a86ca503b9904...|
|f808ba92ed6443230...|
|e367558ea0b5c4912...|
|0ac6c7cbd29cce5e4...|
|d10f78b4ed47d7792...|
|584b9b559a222b3af...|
|9e4021357682a38e6...|
|434970d67e4979e30...|
|067a74870bde04f60...|
|787793f15fd93d781...|
|ede2aed79283c6cc9...|
|3e7465b47edfdb5a8...|
|ebd3edac2dee122ab...|
|71f8f2d39236d09e7...|
|0953830a6a1f1d873...|
|10638867f132ae4d4...|
|04df124eb67c1bb7c...|
|4cebd4f5d9ccdb436...|
|a023de1505063ae4e...|
+--------------------+
only showing top 20 rows



In [69]:
team_link_delta_list = [row.team_link_uuid for row in potential_update_list.select('team_link_uuid').collect()]

team_link_sat_delta = team_sat_incremental.filter(team_sat_incremental.team_link_uuid.isin(team_link_delta_list))
team_link_sat_delta.show()


+--------------------+--------------------+----+------------+-----------------+----+------+---------------+----------------+---------------+-------------------+-----------+-------+---------------+-------+-------+-------------------+----------------+---------------------+------------+---------------+--------------------+---------------+---------------------+-------------------+------------------+--------------+--------+-----+------------+------------+----------------+-------------+----------------------+------+------------+-------------------+--------------------+--------------------+---------------------+----------------------------------+-----------------------------------+--------------------------+-----------------------+------------------+
|      team_link_uuid|       team_sat_hash|rank|games_played|home_games_played|wins|losses|division_winner|wild_card_winner|league_champion|world_series_winner|runs_scored|at_bats|hits_by_batters|doubles|triples|homeruns_by_batters|walks_by_batte

In [100]:
#Testing join with multiple keys
from functools import reduce
key_cols = ['team_link_uuid', 'team_sat_hash']
temp_team_sat = orig.select(*key_cols)
temp_team_sat_incremental = delta.select(*key_cols)
hash_field = ['team_sat_hash']

comparison_query = reduce(
    lambda  a,b: (a | b),
    [temp_team_sat[col] != temp_team_sat_incremental[col]
    for col in hash_field
    ]
)

test_update_list = temp_team_sat.join(temp_team_sat_incremental, on=(temp_team_sat.team_link_uuid == temp_team_sat_incremental.team_link_uuid) & comparison_query)

test_update_list.count()


171

SyntaxError: invalid syntax (<ipython-input-83-93a3c54a17f5>, line 2)