## importing necessary libs

i'm using poetry for managing dependencies and my virtual env, so i'd recommend to look `pyproject.toml` to know what's been used here.

In [8]:
import findspark
from pyspark import SparkContext
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt

## setting up spark session and dataframe

In [None]:
#https://spark.apache.org/docs/latest/sql-getting-started.html
spark = SparkSession \
        .builder \
        .appName("firstSpark") \
        .getOrCreate()

In [11]:
# loading dataframe
def load_dataframe(filename):
    df = spark.read.format('csv').options(header='true').load(filename)
    return df

#creating a dataframe
df_matches = load_dataframe('./data/matches.csv')
df_matches.limit(5).show()

+--------+---+------+----------+-------------+--------------+----+----+---+
|Match_ID|Div|Season|      Date|     HomeTeam|      AwayTeam|FTHG|FTAG|FTR|
+--------+---+------+----------+-------------+--------------+----+----+---+
|       1| D2|  2009|2010-04-04|   Oberhausen|Kaiserslautern|   2|   1|  H|
|       2| D2|  2009|2009-11-01|  Munich 1860|Kaiserslautern|   0|   1|  A|
|       3| D2|  2009|2009-10-04|Frankfurt FSV|Kaiserslautern|   1|   1|  D|
|       4| D2|  2009|2010-02-21|Frankfurt FSV|     Karlsruhe|   2|   1|  H|
|       5| D2|  2009|2009-12-06|        Ahlen|     Karlsruhe|   1|   3|  A|
+--------+---+------+----------+-------------+--------------+----+----+---+



In [25]:
# im doing this just to understand zip command

slice_a = 'abcdefg'
slice_b = range(3)
slice_c = 'vwxyz'

print(f'lets go! \nslices:\n\t{slice_a}\n\t{list(slice_b)}\n\t{slice_c}')

# so if i use zip i expect a list with the array values intercalated, until the shorter list reachs its end
print(f'ZIP doing magic!\n\t{list(zip(slice_a, slice_b, slice_c))}') # got what was expected, yay!

# now im using starred expressions used for unpacking lists
unpacked = [*zip(slice_a, slice_b, slice_c)]
print(f'STARRED ZIP doing magic!\n\t{unpacked}') # need to practice more of this, seems useful

lets go! 
slices:
	abcdefg
	[0, 1, 2]
	vwxyz
ZIP doing magic!
	[('a', 0, 'v'), ('b', 1, 'w'), ('c', 2, 'x')]
STARRED ZIP doing magic!
	[('a', 0, 'v'), ('b', 1, 'w'), ('c', 2, 'x')]


In [12]:
# lets rename some of the columns (FTHG, FTAG, FTR)
old_cols = df_matches.columns[-3:]
new_cols = ["HomeTeamGoals", "AwayTeamGoals", "FinalResult"]
old_new_cols = [*zip(old_cols, new_cols)]
for old_col, new_col in old_new_cols:
    df_matches = df_matches.withColumnRenamed(old_col, new_col)

df_matches.limit(5).toPandas()

Unnamed: 0,Match_ID,Div,Season,Date,HomeTeam,AwayTeam,HomeTeamGoals,AwayTeamGoals,FinalResult
0,1,D2,2009,2010-04-04,Oberhausen,Kaiserslautern,2,1,H
1,2,D2,2009,2009-11-01,Munich 1860,Kaiserslautern,0,1,A
2,3,D2,2009,2009-10-04,Frankfurt FSV,Kaiserslautern,1,1,D
3,4,D2,2009,2010-02-21,Frankfurt FSV,Karlsruhe,2,1,H
4,5,D2,2009,2009-12-06,Ahlen,Karlsruhe,1,3,A


## shine pyspark!

### 1) Who are the winners of the D1 division in the Germany Football Association (Bundesliga) between 2000–2010?

In [26]:
df_matches = df_matches \
    .withColumn('HomeTeamWin', when(col('FinalResult') == 'H', 1).otherwise(0)) \
    .withColumn('AwayTeamWin', when(col('FinalResult') == 'A', 1).otherwise(0)) \
    .withColumn('GameTie', when(col('FinalResult') == 'D', 1).otherwise(0))


#bundesliga is a D1 division and we are interested in season <= 2010 and >= 2000
bundesliga = df_matches \
                    .filter((col('Season') >= 2000) & 
                            (col('Season') <= 2010) & 
                            (col('Div') == 'D1'))

# home team features
home = bundesliga.groupby('Season', 'HomeTeam') \
       .agg(sum('HomeTeamWin').alias('TotalHomeWin'),
            sum('AwayTeamWin').alias('TotalHomeLoss'),
            sum('GameTie').alias('TotalHomeTie'),
            sum('HomeTeamGoals').alias('HomeScoredGoals'),
            sum('AwayTeamGoals').alias('HomeAgainstGoals')) \
       .withColumnRenamed('HomeTeam', 'Team')

#away game features 
away =  bundesliga.groupby('Season', 'AwayTeam') \
       .agg(sum('AwayTeamWin').alias('TotalAwayWin'),
            sum('HomeTeamWin').alias('TotalAwayLoss'),
            sum('GameTie').alias('TotalAwayTie'),
            sum('AwayTeamGoals').alias('AwayScoredGoals'),
            sum('HomeTeamGoals').alias('AwayAgainstGoals'))  \
       .withColumnRenamed('AwayTeam', 'Team')

#### seeing both results

In [28]:
home.toPandas()

Unnamed: 0,Season,Team,TotalHomeWin,TotalHomeLoss,TotalHomeTie,HomeScoredGoals,HomeAgainstGoals
0,2005,Kaiserslautern,5,7,5,26.0,33.0
1,2006,Cottbus,6,6,5,21.0,22.0
2,2001,St Pauli,4,9,4,19.0,28.0
3,2005,Mainz,6,4,7,31.0,23.0
4,2006,Hamburg,4,4,9,22.0,19.0
...,...,...,...,...,...,...,...
193,2003,Kaiserslautern,8,4,5,25.0,19.0
194,2004,Bielefeld,7,7,3,21.0,21.0
195,2004,Werder Bremen,9,4,4,33.0,15.0
196,2004,Stuttgart,12,3,2,34.0,15.0


In [29]:
away.toPandas()

Unnamed: 0,Season,Team,TotalAwayWin,TotalAwayLoss,TotalAwayTie,AwayScoredGoals,AwayAgainstGoals
0,2005,Kaiserslautern,3,10,4,21.0,38.0
1,2006,Cottbus,5,9,3,17.0,27.0
2,2001,St Pauli,0,11,6,18.0,42.0
3,2005,Mainz,3,10,4,15.0,24.0
4,2006,Hamburg,6,5,6,21.0,18.0
...,...,...,...,...,...,...,...
193,2003,Kaiserslautern,3,13,1,14.0,43.0
194,2004,Bielefeld,4,9,4,16.0,28.0
195,2004,Werder Bremen,9,7,1,35.0,22.0
196,2004,Stuttgart,5,7,5,20.0,25.0


## answering first question

In [31]:
# it seems window turns easy to use some aggregation functions like rank and statistical analysis methods
# https://sparkbyexamples.com/pyspark/pyspark-window-functions/
window = ['Season']
window = Window.partitionBy(window).orderBy(col('WinPct').desc(), col('GoalDifferentials').desc())
table = home.join(away, ['Team', 'Season'],  'inner') \
    .withColumn('GoalsScored', col('HomeScoredGoals') + col('AwayScoredGoals')) \
    .withColumn('GoalsAgainst', col('HomeAgainstGoals') + col('AwayAgainstGoals')) \
    .withColumn('GoalDifferentials', col('GoalsScored') - col('GoalsAgainst')) \
    .withColumn('Win', col('TotalHomeWin') + col('TotalAwayWin')) \
    .withColumn('Loss', col('TotalHomeLoss') + col('TotalAwayLoss')) \
    .withColumn('Tie', col('TotalHomeTie') + col('TotalAwayTie')) \
    .withColumn('WinPct', round((100* col('Win')/(col('Win') + col('Loss') + col('Tie'))), 2)) \
    .drop('HomeScoredGoals', 'AwayScoredGoals', 'HomeAgainstGoals', 'AwayAgainstGoals') \
    .drop('TotalHomeWin', 'TotalAwayWin', 'TotalHomeLoss', 'TotalAwayLoss', 'TotalHomeTie', 'TotalAwayTie') \
    .withColumn('TeamPosition', rank().over(window)) 

table_df = table.filter(col('TeamPosition') == 1).orderBy(desc('Season')).toPandas()
table_df

Unnamed: 0,Team,Season,GoalsScored,GoalsAgainst,GoalDifferentials,Win,Loss,Tie,WinPct,TeamPosition
0,Dortmund,2010,67.0,22.0,45.0,23,5,6,67.65,1
1,Bayern Munich,2009,72.0,31.0,41.0,20,4,10,58.82,1
2,Wolfsburg,2008,80.0,41.0,39.0,21,7,6,61.76,1
3,Bayern Munich,2007,68.0,21.0,47.0,22,2,10,64.71,1
4,Stuttgart,2006,61.0,37.0,24.0,21,6,7,61.76,1
5,Bayern Munich,2005,67.0,32.0,35.0,22,3,9,64.71,1
6,Bayern Munich,2004,75.0,33.0,42.0,24,5,5,70.59,1
7,Werder Bremen,2003,79.0,38.0,41.0,22,4,8,64.71,1
8,Bayern Munich,2002,70.0,25.0,45.0,23,5,6,67.65,1
9,Leverkusen,2001,77.0,38.0,39.0,21,7,6,61.76,1
