In [None]:
import numpy as np
import pandas as pd
import sys

In [None]:
# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("Soccer Betting Odds Analyzation") \
      .getOrCreate()

sc = spark.sparkContext

In [None]:
# csv files retrieved from: https://www.kaggle.com/datasets/hugomathien/soccer

# read csv files into dataframes and remove first row
leagues_df = spark.read.options(header='True', delimiter=',').csv('leagues.csv')
matches_df = spark.read.options(header='True', delimiter=',').csv('matches.csv')
p_att_df = spark.read.options(header='True', delimiter=',').csv('player_atts.csv')
players_df = spark.read.options(header='True', delimiter=',').csv('players.csv')
t_att_df = spark.read.options(header='True', delimiter=',').csv('team_atts.csv')
teams_df = spark.read.options(header='True', delimiter=',').csv('teams.csv')

In [None]:
from pyspark.sql.functions import col

def checkMLResult(x):
    homeWinOdds = float(x.B365H)
    tieOdds = float(x.B365D)
    awayWinOdds = float(x.B365A)

    homeGoals = int(x.home_team_goal)
    awayGoals = int(x.away_team_goal)

    list = [str(homeWinOdds), str(tieOdds), str(awayWinOdds) ]
    float_arr = np.array(list, dtype=float)
    minVal = np.min(float_arr) 
    maxVal = np.max(float_arr) 

    if (homeGoals > awayGoals):
        if homeWinOdds == minVal:
            return (x.league_id + " MIN", 1)
        if homeWinOdds != minVal and homeWinOdds != maxVal:
            return (x.league_id + " MID", 1)
        if homeWinOdds == maxVal:
            return (x.league_id + " MAX", 1)
    elif (homeGoals == awayGoals):
        if tieOdds == minVal:
            return (x.league_id + " MIN", 1)
        if tieOdds != minVal and homeWinOdds != maxVal:
            return (x.league_id + " MID", 1)
        if tieOdds == maxVal:
            return (x.league_id + " MAX", 1)
    else:
        if awayWinOdds == minVal:
            return (x.league_id + " MIN", 1)
        if awayWinOdds != minVal and homeWinOdds != maxVal:
            return (x.league_id + " MID", 1)
        if awayWinOdds == maxVal:
            return (x.league_id + " MAX", 1)

In [None]:
df = matches_df.where(col("B365H").isNotNull())
# create an rdd in which we map the league ID with corresponding result value
matches_rdd = df.rdd.map(lambda x: checkMLResult(x))
# filter out results in which the database does not have values in place
matches_rdd = matches_rdd.filter(lambda x: x is not None)

In [None]:
# reduce by key (league and 1, 2, or 3)
result_counts = matches_rdd.reduceByKey(lambda x, y: x + y)

In [None]:
# for each record in the RDD we should take the initial string which is the league ID and divide the second index
# by the number of matches in the matches dataframe for that league. then the one with the highest coefficient will
# be the most common 'bang for the buck' and we will focus on the features of that league

# first, transform the rdd to split the first index on the space between the league ID and min/mid/max identifier
split_league_rdd = result_counts.map(lambda line: (line[0].split(" ")[0], line[0].split(" ")[1] + " " + str(line[1])))

In [None]:
# create an RDD in which we count the number of matches for each league ID
def getMatches(x):
    return(x.league_id, 1)

# create an rdd in which we map the league ID with corresponding result value
match_tot_rdd = df.rdd.map(lambda x: getMatches(x))
# filter out results in which the database does not have values in place
match_tot_rdd = match_tot_rdd.filter(lambda x: x is not None)
match_totals = match_tot_rdd.reduceByKey(lambda x, y: x + y)

In [None]:
# join the RDDs in order to get rdd which contains first the League ID and then a tuple of odds occurence and match numbers
join_rdd = match_totals.join(split_league_rdd)

In [None]:
# create coefficient RDD

# first split the tuple of the second index
all_elements_split_rdd = join_rdd.map(lambda line: (line[0], line[1][0], line[1][1].split(" ")[0], int(line[1][1].split(" ")[1])))

# then divide each last element (number of matches of this type of occurrence) by the 2nd element (number of matches in this league)
coeff_rdd = all_elements_split_rdd.map(lambda line: (line[0], line[2], float(line[3]/line[1])))

coeff_rdd.collect()

In [None]:
# we will review the highest coefficients of each sector:
# min: 21518 with 0.566 (la liga)
# mid: 19694 with 0.423 (scotland)
# max: 10257 with 0.0465 (serie a)

In [None]:
# create rdds of most often min results and rdd of the most often max results

init_df = matches_df.where(col("B365H").isNotNull())
min_df = init_df.where(init_df.league_id == 21518)
max_df = init_df.where(init_df.league_id == 10257)

def checkResult(x):
    homeWinOdds = float(x.B365H)
    tieOdds = float(x.B365D)
    awayWinOdds = float(x.B365A)

    homeGoals = int(x.home_team_goal)
    awayGoals = int(x.away_team_goal)

    list = [str(homeWinOdds), str(tieOdds), str(awayWinOdds) ]
    float_arr = np.array(list, dtype=float)
    maxVal = np.max(float_arr) 

    if (homeWinOdds == maxVal):
        if (homeGoals > awayGoals):
            return ("HOME", x.home_team_api_id, x.away_team_api_id)
    if (tieOdds == maxVal):
        if (homeGoals == awayGoals):
            return ("TIE", x.home_team_api_id, x.away_team_api_id)
    if (awayWinOdds == maxVal):
        if (awayGoals > homeGoals):
            return ("AWAY", x.home_team_api_id, x.away_team_api_id)

max_matches_rdd = max_df.rdd.map(lambda x: checkResult(x))
max_matches = max_matches_rdd.filter(lambda x: x is not None)

min_matches_rdd = min_df.rdd.map(lambda x: checkResult(x))
min_matches = min_matches_rdd.filter(lambda x: x is not None)

In [None]:
max_matches = max_matches.filter(lambda x: x[0] == 'HOME' or x[0] == 'AWAY')
max_matches = max_matches.map(lambda x: (x[0], int(x[1])))

min_matches = min_matches.filter(lambda x: x[0] == 'HOME' or x[0] == 'AWAY')
min_matches = min_matches.map(lambda x: (x[0], int(x[1])))

In [None]:
fin_max_matches = max_matches.toDF()
fin_min_matches = min_matches.toDF()

_max = fin_max_matches.join(t_att_df, fin_max_matches._2 == t_att_df.team_api_id, 'inner')
_min = fin_min_matches.join(t_att_df, fin_min_matches._2 == t_att_df.team_api_id, 'inner')

In [None]:
# now we can compare the traits of teams who win matches with highest odds versus teams who win matches with lowest odds
from pyspark.sql.functions import mean

In [None]:
_max.select(mean("buildUpPlaySpeed")).show()
_min.select(mean("buildUpPlaySpeed")).show()

In [None]:
_max.select(mean("buildUpPlayPassing")).show()
_min.select(mean("buildUpPlayPassing")).show()

In [None]:
_max.select(mean("chanceCreationPassing")).show()
_max.select(mean("chanceCreationCrossing")).show()
_max.select(mean("chanceCreationShooting")).show()

_min.select(mean("chanceCreationPassing")).show()
_min.select(mean("chanceCreationCrossing")).show()
_min.select(mean("chanceCreationShooting")).show()

In [None]:
_max.select(mean("defencePressure")).show()
_max.select(mean("defenceAggression")).show()
_max.select(mean("defenceTeamWidth")).show()

_min.select(mean("defencePressure")).show()
_min.select(mean("defenceAggression")).show()
_min.select(mean("defenceTeamWidth")).show()

In [None]:
# we can see a clear disparity in the build up speed of a team who takes advantage of being an underdog versus the team who is expected to win.
# the build up speed is nearly 8 points higher, showing that teams who often win from underdog positions are playing the ball quickly, likely in a counter attacking motion, while the build up speed of expected winners are considerably more patient

# not as large of a disparity in the teams orientation of pass vs. dribble. as such, we can assume that this varies more between teams whether they are expected winners or underdogs

# we can see that the more expected teams generate more chances through all outputs. however the underdogs tend to create better chances via shooting, likely intending to take advantage of their slim chances. 
# we can assume a team who sits back, counters and takes their chances will have a good chance of achieving a win with great odds if they are near these metrics

In [None]:
import pandas as pd

init_df = matches_df.where(col("B365H").isNotNull())
min_df = init_df.where(init_df.league_id == 21518)
max_df = init_df.where(init_df.league_id == 10257)

def result(x):
    homeWinOdds = float(x.B365H)
    tieOdds = float(x.B365D)
    awayWinOdds = float(x.B365A)

    homeGoals = int(x.home_team_goal)
    awayGoals = int(x.away_team_goal)

    list = [str(homeWinOdds), str(tieOdds), str(awayWinOdds) ]
    float_arr = np.array(list, dtype=float)
    maxVal = np.max(float_arr) 

    if (homeWinOdds == maxVal):
        if (homeGoals > awayGoals):
            return ("HOME", x.home_team_api_id, x.away_team_api_id)
    if (tieOdds == maxVal):
        if (homeGoals == awayGoals):
            return ("TIE", x.home_team_api_id, x.away_team_api_id)
    if (awayWinOdds == maxVal):
        if (awayGoals > homeGoals):
            return ("AWAY", x.away_team_api_id, x.home_team_api_id)
    

max_matches_rdd = max_df.rdd.map(lambda x: result(x))
max_matches = max_matches_rdd.filter(lambda x: x is not None)

min_matches_rdd = min_df.rdd.map(lambda x: result(x))
min_matches = min_matches_rdd.filter(lambda x: x is not None)

In [None]:
t1_df = min_matches.toDF()
t2_df = max_matches.toDF()

In [None]:
el = []
el2 = []

# we need to take the dataframe of all the teams in this league as well as the teams who have achieved these results and put a column with a 1 for said team if they were able to achieve winning at lowest odds at some point
for row in t1_df.rdd.collect():
    el.append(row['_2'])
    el.append(row['_3'])
    
for row in t2_df.rdd.collect():
    el2.append(row['_2'])
    el2.append(row['_3'])

In [None]:
arr_min_teams = list(set(el))
arr_max_teams = list(set(el2))

rdd1 = sc.parallelize(arr_min_teams)
rdd2 = sc.parallelize(arr_max_teams)

df1 = rdd1.map(lambda x: (x, )).toDF()
df1 = df1.withColumnRenamed("_1", "team_api_id")
df2 = rdd2.map(lambda x: (x, )).toDF()
df2 = df2.withColumnRenamed("_1", "team_api_id")

In [None]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

# create schema to resemble the features of team identity
schema = StructType([
  StructField('team_api_id', IntegerType(), True),
  StructField('buildUpPlaySpeed', IntegerType(), True),
  StructField('buildUpPlayPassing', IntegerType(), True),
  StructField('chanceCreationPassing', IntegerType(), True),
  StructField('chanceCreationCrossing', IntegerType(), True),
  StructField('chanceCreationShooting', IntegerType(), True),
  StructField('defencePressure', IntegerType(), True),
  StructField('defenceAggression', IntegerType(), True),
  StructField('defenceTeamWidth', IntegerType(), True),
])
fin_min_att_df = spark.createDataFrame([], schema)
fin_max_att_df = spark.createDataFrame([], schema)

In [None]:
import pandas as pd

init_df = matches_df.where(col("B365H").isNotNull())
min_df = init_df.where(init_df.league_id == 21518)
max_df = init_df.where(init_df.league_id == 10257)

def check(x):
    homeWinOdds = float(x.B365H)
    tieOdds = float(x.B365D)
    awayWinOdds = float(x.B365A)

    homeGoals = int(x.home_team_goal)
    awayGoals = int(x.away_team_goal)

    list = [str(homeWinOdds), str(tieOdds), str(awayWinOdds) ]
    float_arr = np.array(list, dtype=float)
    maxVal = np.max(float_arr) 

    if (homeWinOdds == maxVal):
        if (homeGoals > awayGoals):
            return (x.home_team_api_id, 1)
    if (tieOdds == maxVal):
        if (homeGoals == awayGoals):
            return (x.home_team_api_id, 1)
    if (awayWinOdds == maxVal):
        if (awayGoals > homeGoals):
            return ( x.away_team_api_id, 1)
    

rdd1 = max_df.rdd.map(lambda x: check(x))
rdd_max = rdd1.filter(lambda x: x is not None)

rdd2 = min_df.rdd.map(lambda x: check(x))
rdd_min = rdd2.filter(lambda x: x is not None)

In [None]:
# retrieve the list of teams who achieved success with lowest/max odds in respective league

min_rdd = rdd_min.reduceByKey(lambda x, y: x+y)
min_rdd = min_rdd.toDF()

max_rdd = rdd_max.reduceByKey(lambda x, y: x+y)
max_rdd = max_rdd.toDF()

min_rdd = min_rdd.withColumnRenamed('_1', 'team_api_id')
max_rdd = max_rdd.withColumnRenamed('_1', 'team_api_id')

In [None]:
from pyspark.sql.functions import lit
from pyspark.sql.functions import when

In [None]:
# now we can join these dataframes with the team attributes dataframe on each team ID
df_min_atts = df1.join(t_att_df, "team_api_id")
columns = ['team_fifa_api_id', 'date','id', 'buildUpPlaySpeedClass', 'buildUpPlayDribbling', 'buildUpPlayDribblingClass', 'buildUpPlayPassingClass', 'buildUpPlayPositioningClass', 'chanceCreationPassingClass', 'chanceCreationCrossingClass', 'chanceCreationShootingClass', 'chanceCreationPositioningClass', 'defencePressureClass', 'defenceAggressionClass', 'defenceTeamWidthClass', 'defenceDefenderLineClass']
df_min_atts = df_min_atts.drop(*columns)

df_min_atts = df_min_atts.withColumn("buildUpPlaySpeed", col("buildUpPlaySpeed").cast(IntegerType()))    
df_min_atts = df_min_atts.withColumn("buildUpPlayPassing", col("buildUpPlayPassing").cast(IntegerType()))    
df_min_atts = df_min_atts.withColumn("chanceCreationPassing", col("chanceCreationPassing").cast(IntegerType()))    
df_min_atts = df_min_atts.withColumn("chanceCreationCrossing", col("chanceCreationCrossing").cast(IntegerType()))    
df_min_atts = df_min_atts.withColumn("chanceCreationShooting", col("chanceCreationShooting").cast(IntegerType()))  
df_min_atts = df_min_atts.withColumn("defencePressure", col("defencePressure").cast(IntegerType()))    
df_min_atts = df_min_atts.withColumn("defenceAggression", col("defenceAggression").cast(IntegerType()))    
df_min_atts = df_min_atts.withColumn("defenceTeamWidth", col("defenceTeamWidth").cast(IntegerType()))    

df_min_atts = df_min_atts.groupBy('team_api_id').mean('buildUpPlaySpeed', 'buildUpPlayPassing', 'chanceCreationPassing', 'chanceCreationCrossing', 'chanceCreationShooting', 'defencePressure', 'defenceAggression', 'defenceTeamWidth')

In [None]:
# we can clearly see from above that some teams achieve this win from lower odds than others: we will give teams with a value greater than 30. everyone else will get 0, then we can split for training and testing
df_min_atts = df_min_atts.join(min_rdd, df_min_atts.team_api_id == min_rdd.team_api_id)
df_min_atts = df_min_atts.withColumn("val", when(df_min_atts._2 >= 20, lit(1)).otherwise(lit(0)))

In [None]:
columns2 = ['team_api_id', '_2']
df_min_atts = df_min_atts.drop(*columns2)

df_min_atts.show()

In [None]:
df_max_atts = df2.join(t_att_df, "team_api_id")
columns = ['team_fifa_api_id', 'date','id', 'buildUpPlaySpeedClass', 'buildUpPlayDribbling', 'buildUpPlayDribblingClass', 'buildUpPlayPassingClass', 'buildUpPlayPositioningClass', 'chanceCreationPassingClass', 'chanceCreationCrossingClass', 'chanceCreationShootingClass', 'chanceCreationPositioningClass', 'defencePressureClass', 'defenceAggressionClass', 'defenceTeamWidthClass', 'defenceDefenderLineClass']
df_max_atts = df_max_atts.drop(*columns)

df_max_atts = df_max_atts.withColumn("buildUpPlaySpeed", col("buildUpPlaySpeed").cast(IntegerType()))    
df_max_atts = df_max_atts.withColumn("buildUpPlayPassing", col("buildUpPlayPassing").cast(IntegerType()))    
df_max_atts = df_max_atts.withColumn("chanceCreationPassing", col("chanceCreationPassing").cast(IntegerType()))    
df_max_atts = df_max_atts.withColumn("chanceCreationCrossing", col("chanceCreationCrossing").cast(IntegerType()))    
df_max_atts = df_max_atts.withColumn("chanceCreationShooting", col("chanceCreationShooting").cast(IntegerType()))  
df_max_atts = df_max_atts.withColumn("defencePressure", col("defencePressure").cast(IntegerType()))    
df_max_atts = df_max_atts.withColumn("defenceAggression", col("defenceAggression").cast(IntegerType()))    
df_max_atts = df_max_atts.withColumn("defenceTeamWidth", col("defenceTeamWidth").cast(IntegerType()))    


df_max_atts = df_max_atts.groupBy('team_api_id').mean('buildUpPlaySpeed', 'buildUpPlayPassing', 'chanceCreationPassing', 'chanceCreationCrossing', 'chanceCreationShooting', 'defencePressure', 'defenceAggression', 'defenceTeamWidth')

In [None]:
# in the same fashion for max atts, we will take the teams who achieved this result >= 20 times
# teams: 9857, 8551, 8530, 8600, 10167, 8535, 8540, 9882, 9888, 8543, 8529, 8533, 10233, 8524, 7943
df_max_atts = df_max_atts.join(max_rdd, df_max_atts.team_api_id == max_rdd.team_api_id)
df_max_atts = df_max_atts.withColumn("val", when(df_max_atts._2 >= 20, lit(1)).otherwise(lit(0)))

In [None]:
columns2 = ['team_api_id', '_2']
df_max_atts = df_max_atts.drop(*columns2)

df_max_atts.show()

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['avg(buildUpPlaySpeed)', 'avg(buildUpPlayPassing)', 'avg(chanceCreationPassing)', 'avg(chanceCreationCrossing)', 'avg(chanceCreationShooting)', 'avg(defencePressure)', 'avg(defenceAggression)', 'avg(defenceTeamWidth)'], outputCol='features')

In [None]:
df_max_atts = assembler.transform(df_max_atts)
df_min_atts = assembler.transform(df_min_atts)

fin_max_data = df_max_atts.select('features', 'val')
fin_min_data = df_min_atts.select('features', 'val')

In [None]:
min_train_df, min_test_df = df_min_atts.randomSplit(weights=[0.7,0.3], seed=100)
max_train_df, max_test_df = df_max_atts.randomSplit(weights=[0.7,0.3], seed=100)

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
lr_min = LinearRegression(featuresCol="features", labelCol="val", predictionCol="predicted_val")
lr_model_min = lr_min.fit(min_train_df)

lr_max = LinearRegression(featuresCol="features", labelCol="val", predictionCol="predicted_val")
lr_model_max = lr_max.fit(max_train_df)


min_predictions = lr_model_min.transform(min_test_df)
max_predictions = lr_model_max.transform(max_test_df)

In [None]:
evaluator_min = RegressionEvaluator(labelCol="val", predictionCol="predicted_val", metricName="rmse")
rmse_min = evaluator_min.evaluate(min_predictions)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse_min))

evaluator_r2_min = RegressionEvaluator(labelCol="val", predictionCol="predicted_val", metricName="r2")
r2_min = evaluator_r2_min.evaluate(min_predictions)
print("R-squared (R2) on test data: {:.3f}".format(r2_min))


evaluator_max = RegressionEvaluator(labelCol="val", predictionCol="predicted_val", metricName="rmse")
rmse_max = evaluator_max.evaluate(max_predictions)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse_max))

evaluator_r2_max = RegressionEvaluator(labelCol="val", predictionCol="predicted_val", metricName="r2")
r2_max = evaluator_r2_max.evaluate(max_predictions)
print("R-squared (R2) on test data: {:.3f}".format(r2_max))

In [None]:
evaluator = RegressionEvaluator(labelCol="val", predictionCol="predicted_val", metricName="mse")
mse_min = evaluator.evaluate(min_predictions)
print("Mean Squared Error (RMSE) on test data: {:.3f}".format(mse_min))

In [None]:
evaluator = RegressionEvaluator(labelCol="val", predictionCol="predicted_val", metricName="mse")
mse_max = evaluator.evaluate(max_predictions)
print("Mean Squared Error (RMSE) on test data: {:.3f}".format(mse_max))

In [None]:
coefficients_min = lr_model_min.coefficients
intercept_min = lr_model_min.intercept

print("Coefficients (min): ", coefficients_min)
print("Intercept (min): {:.3f}".format(intercept_min))


coefficients_max = lr_model_max.coefficients
intercept_max = lr_model_max.intercept

print("Coefficients (max): ", coefficients_max)
print("Intercept (max): {:.3f}".format(intercept_max))