# PYSPARK CODE:

In [29]:
from pyspark.sql.functions import *
from pyspark.sql import Window, Row
from pyspark.sql.types import *

In [30]:
spark

### EXTRACT:

In [31]:
def load_dataframe(filename):
    df = spark.read.format('csv').options(header='true').load(filename)
    return df

#creating a dataframe
df_matches = load_dataframe('../data/Matches.csv')
df_matches.limit(5).show()

+--------+---+------+----------+-------------+--------------+----+----+---+
|Match_ID|Div|Season|      Date|     HomeTeam|      AwayTeam|FTHG|FTAG|FTR|
+--------+---+------+----------+-------------+--------------+----+----+---+
|       1| D2|  2009|2010-04-04|   Oberhausen|Kaiserslautern|   2|   1|  H|
|       2| D2|  2009|2009-11-01|  Munich 1860|Kaiserslautern|   0|   1|  A|
|       3| D2|  2009|2009-10-04|Frankfurt FSV|Kaiserslautern|   1|   1|  D|
|       4| D2|  2009|2010-02-21|Frankfurt FSV|     Karlsruhe|   2|   1|  H|
|       5| D2|  2009|2009-12-06|        Ahlen|     Karlsruhe|   1|   3|  A|
+--------+---+------+----------+-------------+--------------+----+----+---+



In [48]:
df_matches.count()

24625

### TRANSFORM:
#### Data Cleaning:

In [32]:
#renaming some of the columns
old_cols = df_matches.columns[-3:]
new_cols = ["HomeTeamGoals", "AwayTeamGoals", "FinalResult"]
old_new_cols = [*zip(old_cols, new_cols)]
for old_col, new_col in old_new_cols:
    df_matches = df_matches.withColumnRenamed(old_col, new_col)

In [33]:
df_matches.show()

+--------+---+------+----------+------------------+--------------+-------------+-------------+-----------+
|Match_ID|Div|Season|      Date|          HomeTeam|      AwayTeam|HomeTeamGoals|AwayTeamGoals|FinalResult|
+--------+---+------+----------+------------------+--------------+-------------+-------------+-----------+
|       1| D2|  2009|2010-04-04|        Oberhausen|Kaiserslautern|            2|            1|          H|
|       2| D2|  2009|2009-11-01|       Munich 1860|Kaiserslautern|            0|            1|          A|
|       3| D2|  2009|2009-10-04|     Frankfurt FSV|Kaiserslautern|            1|            1|          D|
|       4| D2|  2009|2010-02-21|     Frankfurt FSV|     Karlsruhe|            2|            1|          H|
|       5| D2|  2009|2009-12-06|             Ahlen|     Karlsruhe|            1|            3|          A|
|       6| D2|  2009|2010-04-03|      Union Berlin|     Karlsruhe|            1|            1|          D|
|       7| D2|  2009|2009-08-14|     

### TRANSFORM:
#### Data Transformations:

In [34]:
df_matches = df_matches \
    .withColumn('HomeTeamWin', when(col('FinalResult') == 'H', 1).otherwise(0)) \
    .withColumn('AwayTeamWin', when(col('FinalResult') == 'A', 1).otherwise(0)) \
    .withColumn('GameTie', when(col('FinalResult') == 'D', 1).otherwise(0))

In [35]:
#filtering for seasons between 2010 and 2000
seasons = df_matches \
                    .filter((col('Season') >= 2000) & 
                            (col('Season') <= 2010) & 
                            (col('Div') == 'D1'))

In [36]:
# home team features
home = seasons.groupby('Season', 'HomeTeam') \
       .agg(sum('HomeTeamWin').alias('TotalHomeWin'),
            sum('AwayTeamWin').alias('TotalHomeLoss'),
            sum('GameTie').alias('TotalHomeTie'),
            sum('HomeTeamGoals').alias('HomeScoredGoals'),
            sum('AwayTeamGoals').alias('HomeAgainstGoals')) \
       .withColumnRenamed('HomeTeam', 'Team')

In [37]:
#away game features 
away =  seasons.groupby('Season', 'AwayTeam') \
       .agg(sum('AwayTeamWin').alias('TotalAwayWin'),
            sum('HomeTeamWin').alias('TotalAwayLoss'),
            sum('GameTie').alias('TotalAwayTie'),
            sum('AwayTeamGoals').alias('AwayScoredGoals'),
            sum('HomeTeamGoals').alias('AwayAgainstGoals'))  \
       .withColumnRenamed('AwayTeam', 'Team')

In [38]:
window = ['Season']
window = Window.partitionBy(window).orderBy(col('WinPct').desc(), col('GoalDifferentials').desc())
winners = home.join(away, ['Season', 'Team'],  'inner') \
    .withColumn('GoalsScored', col('HomeScoredGoals') + col('AwayScoredGoals')) \
    .withColumn('GoalsAgainst', col('HomeAgainstGoals') + col('AwayAgainstGoals')) \
    .withColumn('GoalDifferentials', col('GoalsScored') - col('GoalsAgainst')) \
    .withColumn('Win', col('TotalHomeWin') + col('TotalAwayWin')) \
    .withColumn('Loss', col('TotalHomeLoss') + col('TotalAwayLoss')) \
    .withColumn('Tie', col('TotalHomeTie') + col('TotalAwayTie')) \
    .withColumn('WinPct', round((100* col('Win')/(col('Win') + col('Loss') + col('Tie'))), 2)) \
    .drop('HomeScoredGoals', 'AwayScoredGoals', 'HomeAgainstGoals', 'AwayAgainstGoals') \
    .drop('TotalHomeWin', 'TotalAwayWin', 'TotalHomeLoss', 'TotalAwayLoss', 'TotalHomeTie', 'TotalAwayTie') \
    .withColumn('TeamPosition', rank().over(window)) 

winners_df = winners.filter(col('TeamPosition') == 1).orderBy(asc('Season')).drop('TeamPosition').withColumnRenamed('Team', 'SeasonWinnerTeam')
winners_df.show()

+------+----------------+-----------+------------+-----------------+---+----+---+------+
|Season|SeasonWinnerTeam|GoalsScored|GoalsAgainst|GoalDifferentials|Win|Loss|Tie|WinPct|
+------+----------------+-----------+------------+-----------------+---+----+---+------+
|  2000|   Bayern Munich|       62.0|        37.0|             25.0| 19|   9|  6| 55.88|
|  2001|      Leverkusen|       77.0|        38.0|             39.0| 21|   7|  6| 61.76|
|  2002|   Bayern Munich|       70.0|        25.0|             45.0| 23|   5|  6| 67.65|
|  2003|   Werder Bremen|       79.0|        38.0|             41.0| 22|   4|  8| 64.71|
|  2004|   Bayern Munich|       75.0|        33.0|             42.0| 24|   5|  5| 70.59|
|  2005|   Bayern Munich|       67.0|        32.0|             35.0| 22|   3|  9| 64.71|
|  2006|       Stuttgart|       61.0|        37.0|             24.0| 21|   6|  7| 61.76|
|  2007|   Bayern Munich|       68.0|        21.0|             47.0| 22|   2| 10| 64.71|
|  2008|       Wolfsb

### LOAD:
#### Write output in parquet format:

In [39]:
winners_df.write.mode("overwrite").parquet("../output/pyspark_output/")

In [40]:
spark.read.parquet("../output/pyspark_output", header=True).show()

+------+----------------+-----------+------------+-----------------+---+----+---+------+
|Season|SeasonWinnerTeam|GoalsScored|GoalsAgainst|GoalDifferentials|Win|Loss|Tie|WinPct|
+------+----------------+-----------+------------+-----------------+---+----+---+------+
|  2000|   Bayern Munich|       62.0|        37.0|             25.0| 19|   9|  6| 55.88|
|  2001|      Leverkusen|       77.0|        38.0|             39.0| 21|   7|  6| 61.76|
|  2002|   Bayern Munich|       70.0|        25.0|             45.0| 23|   5|  6| 67.65|
|  2003|   Werder Bremen|       79.0|        38.0|             41.0| 22|   4|  8| 64.71|
|  2004|   Bayern Munich|       75.0|        33.0|             42.0| 24|   5|  5| 70.59|
|  2005|   Bayern Munich|       67.0|        32.0|             35.0| 22|   3|  9| 64.71|
|  2006|       Stuttgart|       61.0|        37.0|             24.0| 21|   6|  7| 61.76|
|  2007|   Bayern Munich|       68.0|        21.0|             47.0| 22|   2| 10| 64.71|
|  2008|       Wolfsb

# DUCK DB CODE:

In [15]:
import duckdb

In [16]:
conn = duckdb.connect("../duckdb_database/myduckdb.db")

### EXTRACT:

In [17]:
duck_db_df = conn.execute("""
select
*
from read_csv_auto('../data/Matches.csv', header=True)
""").df()

In [18]:
conn.register("duckdb_view", duck_db_df)

<duckdb.DuckDBPyConnection at 0x20ec9db42b0>

In [19]:
conn.execute("select * from duckdb_view").df()

Unnamed: 0,Match_ID,Div,Season,Date,HomeTeam,AwayTeam,FTHG,FTAG,FTR
0,1,D2,2009,2010-04-04,Oberhausen,Kaiserslautern,2,1,H
1,2,D2,2009,2009-11-01,Munich 1860,Kaiserslautern,0,1,A
2,3,D2,2009,2009-10-04,Frankfurt FSV,Kaiserslautern,1,1,D
3,4,D2,2009,2010-02-21,Frankfurt FSV,Karlsruhe,2,1,H
4,5,D2,2009,2009-12-06,Ahlen,Karlsruhe,1,3,A
...,...,...,...,...,...,...,...,...,...
24620,46770,E0,2016,2017-05-21,Liverpool,Middlesbrough,3,0,H
24621,46771,E0,2016,2017-05-21,Man United,Crystal Palace,2,0,H
24622,46772,E0,2016,2017-05-21,Southampton,Stoke,0,1,A
24623,46773,E0,2016,2017-05-21,Swansea,West Brom,2,1,H


### TRANSFORM:
#### Data Cleaning:

In [20]:
conn.execute("""
create or replace table duckdb_table as
select
Match_ID,Div, Season, Date, HomeTeam, AwayTeam,
FTHG as HomeTeamGoals,
FTAG as AwayTeamGoals,
FTR as FinalResult
from duckdb_view
""")

<duckdb.DuckDBPyConnection at 0x20ec9db42b0>

In [21]:
conn.execute("""
select * from duckdb_table
""").df()

Unnamed: 0,Match_ID,Div,Season,Date,HomeTeam,AwayTeam,HomeTeamGoals,AwayTeamGoals,FinalResult
0,1,D2,2009,2010-04-04,Oberhausen,Kaiserslautern,2,1,H
1,2,D2,2009,2009-11-01,Munich 1860,Kaiserslautern,0,1,A
2,3,D2,2009,2009-10-04,Frankfurt FSV,Kaiserslautern,1,1,D
3,4,D2,2009,2010-02-21,Frankfurt FSV,Karlsruhe,2,1,H
4,5,D2,2009,2009-12-06,Ahlen,Karlsruhe,1,3,A
...,...,...,...,...,...,...,...,...,...
24620,46770,E0,2016,2017-05-21,Liverpool,Middlesbrough,3,0,H
24621,46771,E0,2016,2017-05-21,Man United,Crystal Palace,2,0,H
24622,46772,E0,2016,2017-05-21,Southampton,Stoke,0,1,A
24623,46773,E0,2016,2017-05-21,Swansea,West Brom,2,1,H


### TRANSFORM:
#### Data Transformations:

In [22]:
conn.execute("""
create or replace view duckdb_matches as
select *,
case when FinalResult = 'H' then 1 else 0 end as HomeTeamWin,
case when FinalResult = 'A' then 1 else 0 end as AwayTeamWin,
case when FinalResult = 'D' then 1 else 0 end as GameTie
from duckdb_table
""")
conn.execute("select * from duckdb_matches").df()

Unnamed: 0,Match_ID,Div,Season,Date,HomeTeam,AwayTeam,HomeTeamGoals,AwayTeamGoals,FinalResult,HomeTeamWin,AwayTeamWin,GameTie
0,1,D2,2009,2010-04-04,Oberhausen,Kaiserslautern,2,1,H,1,0,0
1,2,D2,2009,2009-11-01,Munich 1860,Kaiserslautern,0,1,A,0,1,0
2,3,D2,2009,2009-10-04,Frankfurt FSV,Kaiserslautern,1,1,D,0,0,1
3,4,D2,2009,2010-02-21,Frankfurt FSV,Karlsruhe,2,1,H,1,0,0
4,5,D2,2009,2009-12-06,Ahlen,Karlsruhe,1,3,A,0,1,0
...,...,...,...,...,...,...,...,...,...,...,...,...
24620,46770,E0,2016,2017-05-21,Liverpool,Middlesbrough,3,0,H,1,0,0
24621,46771,E0,2016,2017-05-21,Man United,Crystal Palace,2,0,H,1,0,0
24622,46772,E0,2016,2017-05-21,Southampton,Stoke,0,1,A,0,1,0
24623,46773,E0,2016,2017-05-21,Swansea,West Brom,2,1,H,1,0,0


In [23]:
conn.execute("""
create or replace view duckdb_seasons as
select *
from duckdb_matches
where Season >= 2000 and Season <= 2010 and Div = 'D1';
from duckdb_table
""")
conn.execute("select * from duckdb_seasons").df()

Unnamed: 0,Match_ID,Div,Season,Date,HomeTeam,AwayTeam,HomeTeamGoals,AwayTeamGoals,FinalResult,HomeTeamWin,AwayTeamWin,GameTie
0,21,D1,2009,2010-02-06,Bochum,Leverkusen,1,1,D,0,0,1
1,22,D1,2009,2009-11-22,Bayern Munich,Leverkusen,1,1,D,0,0,1
2,23,D1,2009,2010-05-08,M'gladbach,Leverkusen,1,1,D,0,0,1
3,24,D1,2009,2009-08-08,Mainz,Leverkusen,2,2,D,0,0,1
4,25,D1,2009,2009-10-17,Hamburg,Leverkusen,0,0,D,0,0,1
...,...,...,...,...,...,...,...,...,...,...,...,...
3361,37179,D1,2004,2005-05-21,Hertha,Hannover,0,0,D,0,0,1
3362,37180,D1,2004,2005-05-21,Kaiserslautern,Werder Bremen,1,2,A,0,1,0
3363,37181,D1,2004,2005-05-21,Leverkusen,M'gladbach,5,1,H,1,0,0
3364,37182,D1,2004,2005-05-21,Nurnberg,Mainz,1,2,A,0,1,0


In [24]:
conn.execute("""
create or replace view duckdb_home as
select
    Season,
    HomeTeam as Team,
    sum(HomeTeamWin) as TotalHomeWin,
    sum(AwayTeamWin) as TotalHomeLoss,
    sum(GameTie) as TotalHomeTie,
    sum(HomeTeamGoals) as HomeScoredGoals,
    sum(AwayTeamGoals) as HomeAgainstGoals
from
    duckdb_seasons
group by 
    Season, HomeTeam;
""")
conn.execute("select * from duckdb_home").df()

Unnamed: 0,Season,Team,TotalHomeWin,TotalHomeLoss,TotalHomeTie,HomeScoredGoals,HomeAgainstGoals
0,2009,Bochum,2.0,9.0,6.0,18.0,35.0
1,2009,Bayern Munich,12.0,1.0,4.0,39.0,13.0
2,2009,M'gladbach,8.0,4.0,5.0,25.0,22.0
3,2009,Mainz,9.0,2.0,6.0,22.0,14.0
4,2009,Hamburg,8.0,3.0,6.0,25.0,12.0
...,...,...,...,...,...,...,...
193,2001,Bayern Munich,12.0,0.0,5.0,42.0,10.0
194,2001,Nurnberg,7.0,7.0,3.0,21.0,23.0
195,2002,Nurnberg,4.0,9.0,4.0,16.0,24.0
196,2002,Hannover,4.0,7.0,6.0,28.0,33.0


In [25]:
conn.execute("""
create or replace view duckdb_away as
select
    Season,
    AwayTeam as Team,
    sum(AwayTeamWin) as TotalAwayWin,
    sum(HomeTeamWin) as TotalAwayLoss,
    sum(GameTie) as TotalAwayTie,
    sum(AwayTeamGoals) as AwayScoredGoals,
    sum(HomeTeamGoals) as AwayAgainstGoals
from
    duckdb_seasons
group by 
    Season, AwayTeam;
""")
conn.execute("select * from duckdb_away").df()

Unnamed: 0,Season,Team,TotalAwayWin,TotalAwayLoss,TotalAwayTie,AwayScoredGoals,AwayAgainstGoals
0,2009,Leverkusen,4.0,4.0,9.0,28.0,24.0
1,2009,Nurnberg,3.0,11.0,3.0,14.0,35.0
2,2009,Schalke 04,8.0,3.0,6.0,24.0,16.0
3,2009,Stuttgart,7.0,5.0,5.0,23.0,20.0
4,2009,Werder Bremen,9.0,4.0,4.0,37.0,19.0
...,...,...,...,...,...,...,...
193,2001,Cottbus,1.0,12.0,4.0,9.0,39.0
194,2001,Freiburg,1.0,12.0,4.0,12.0,38.0
195,2002,Hannover,8.0,8.0,1.0,19.0,24.0
196,2002,Nurnberg,4.0,11.0,2.0,17.0,36.0


In [26]:
conn.execute("""
create or replace table duckdb_winners as
with winners as (
    select
        h.Season,
        h.Team as SeasonWinnerTeam,
        h.HomeScoredGoals + a.AwayScoredGoals as GoalsScored,
        h.HomeAgainstGoals + a.AwayAgainstGoals as GoalsAgainst,
        h.HomeScoredGoals + a.AwayScoredGoals - (h.HomeAgainstGoals + a.AwayAgainstGoals) as GoalDifferentials,
        h.TotalHomeWin + a.TotalAwayWin as Win,
        h.TotalHomeLoss + a.TotalAwayLoss as Loss,
        h.TotalHomeTie + a.TotalAwayTie as Tie,
        round(100 * (h.TotalHomeWin + a.TotalAwayWin) / (h.TotalHomeWin + a.TotalAwayWin + h.TotalHomeLoss + a.TotalAwayLoss + h.TotalHomeTie + a.TotalAwayTie), 2) as WinPct,
        rank() over (partition by h.Season order by round(100 * (h.TotalHomeWin + a.TotalAwayWin) / (h.TotalHomeWin + a.TotalAwayWin + h.TotalHomeLoss + a.TotalAwayLoss + h.TotalHomeTie + a.TotalAwayTie), 2) desc, GoalDifferentials desc) as TeamPosition
    FROM duckdb_home h JOIN duckdb_away a 
    on h.Season = a.Season and h.Team = a.Team
)
select
    Season,
    SeasonWinnerTeam,
    GoalsScored,GoalsAgainst,GoalDifferentials,Win,Loss,Tie,WinPct
from
    winners
where
    TeamPosition = 1
order by
    Season asc;
""")
conn.execute("select * from duckdb_winners").df()

Unnamed: 0,Season,SeasonWinnerTeam,GoalsScored,GoalsAgainst,GoalDifferentials,Win,Loss,Tie,WinPct
0,2000,Bayern Munich,62.0,37.0,25.0,19.0,9.0,6.0,55.88
1,2001,Leverkusen,77.0,38.0,39.0,21.0,7.0,6.0,61.76
2,2002,Bayern Munich,70.0,25.0,45.0,23.0,5.0,6.0,67.65
3,2003,Werder Bremen,79.0,38.0,41.0,22.0,4.0,8.0,64.71
4,2004,Bayern Munich,75.0,33.0,42.0,24.0,5.0,5.0,70.59
5,2005,Bayern Munich,67.0,32.0,35.0,22.0,3.0,9.0,64.71
6,2006,Stuttgart,61.0,37.0,24.0,21.0,6.0,7.0,61.76
7,2007,Bayern Munich,68.0,21.0,47.0,22.0,2.0,10.0,64.71
8,2008,Wolfsburg,80.0,41.0,39.0,21.0,7.0,6.0,61.76
9,2009,Bayern Munich,72.0,31.0,41.0,20.0,4.0,10.0,58.82


### LOAD:
#### Write output in parquet format:

In [27]:
conn.execute("copy (from duckdb_winners) to '../output/duckdb_output.parquet' (format 'parquet')")

<duckdb.DuckDBPyConnection at 0x20ec9db42b0>

In [28]:
conn.execute("select * from '../output/duckdb_output.parquet'").df()

Unnamed: 0,Season,SeasonWinnerTeam,GoalsScored,GoalsAgainst,GoalDifferentials,Win,Loss,Tie,WinPct
0,2000,Bayern Munich,62.0,37.0,25.0,19.0,9.0,6.0,55.88
1,2001,Leverkusen,77.0,38.0,39.0,21.0,7.0,6.0,61.76
2,2002,Bayern Munich,70.0,25.0,45.0,23.0,5.0,6.0,67.65
3,2003,Werder Bremen,79.0,38.0,41.0,22.0,4.0,8.0,64.71
4,2004,Bayern Munich,75.0,33.0,42.0,24.0,5.0,5.0,70.59
5,2005,Bayern Munich,67.0,32.0,35.0,22.0,3.0,9.0,64.71
6,2006,Stuttgart,61.0,37.0,24.0,21.0,6.0,7.0,61.76
7,2007,Bayern Munich,68.0,21.0,47.0,22.0,2.0,10.0,64.71
8,2008,Wolfsburg,80.0,41.0,39.0,21.0,7.0,6.0,61.76
9,2009,Bayern Munich,72.0,31.0,41.0,20.0,4.0,10.0,58.82


In [41]:
lst = [1,2,3,4,5]
print([ele**2 for ele in lst])

[1, 4, 9, 16, 25]


In [45]:
lst

[1, 2, 3, 4, 5]

In [47]:
print(list(map(lambda x:x**2, lst)))

[1, 4, 9, 16, 25]
