In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.ml.classification import LogisticRegression
import findspark
import pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import count

In [4]:
# Create a SparkConf object with the desired configurations

conf = SparkConf() \
    .setAppName("xgboost") \
    .setMaster("local[*]")

conf.set("spark.dynamicAllocation.enabled", "true")
conf.set("spark.executor.cores", 4)
conf.set("spark.dynamicAllocation.minExecutors","2")
conf.set("spark.dynamicAllocation.maxExecutors","10")
# Create a SparkContext using the SparkConf object
sc = SparkContext.getOrCreate(conf=conf)
# Create a SparkSession using the SparkContext
spark = SparkSession.builder.master("local[*]") \
                    .appName('xgboost') \
                    .getOrCreate()




In [5]:
df = (spark.read
            .option("HEADER", True)
            .option("inferSchema", True)
            .csv("./data/pl_matches_modified.csv")
           )
df.show(5)

# df['home_captain'] = df['home_captain'].astype(str)

+----------+-----------------+-------+--------+------------+--------+--------+-----------+------------------+------------------+------------+------------+---------------+---------------+-----------------------+----------------+--------------------+-----------------+-----------------+------------------+--------------------+-------+--------+------------+--------+--------+-----------+------------------+------------------+------------+------------+---------------+---------------+-----------------------+----------------+--------------------+-----------------+-----------------+------------------+----------+----------+
|Match_Date|        Home_Team|Home_Sh|Home_SoT|Home_Touches|Home_Tkl|Home_Int|Home_Blocks|  Home_xG_Expected|Home_npxG_Expected|Home_SCA_SCA|Home_GCA_SCA|Home_Cmp_Passes|Home_Att_Passes|Home_Cmp_percent_Passes|Home_PrgP_Passes|Home_Carries_Carries|Home_PrgC_Carries|Home_Att_Take_Ons|Home_Succ_Take_Ons|           Away_Team|Away_Sh|Away_SoT|Away_Touches|Away_Tkl|Away_Int|Away_Blo

# Onehot Encoder

https://stackoverflow.com/questions/56585434/pyspark-pipeline-error-when-using-indexer-and-encoder

In [6]:
from pyspark.ml.feature import OneHotEncoder,StringIndexer,VectorAssembler

In [7]:
# unusedCols = ['home_fbrefMatchId ','away_fbrefMatchId']
# outputCols = ['home_score','away_score']
# inputCols = [column for column in df.columns if column not in outputCols and column not in unusedCols]
# encodeCols = ['home_captain','away_captain','home_manager','away_manager','home_team','away_team','away_is_home_team']

unusedCols = ['Match_Date']
outputCols = ['Home_Score','Away_Score']
inputCols = [column for column in df.columns if column not in outputCols and column not in unusedCols]
encodeCols = ['Away_Team','Home_Team']




In [8]:
indexer = StringIndexer(inputCols=encodeCols,outputCols = [encodeCol+"_Index" for encodeCol in encodeCols])
indexerModel = indexer.fit(df)
indexer_df = indexerModel.transform(df)
indexer_df.show(5)

+----------+-----------------+-------+--------+------------+--------+--------+-----------+------------------+------------------+------------+------------+---------------+---------------+-----------------------+----------------+--------------------+-----------------+-----------------+------------------+--------------------+-------+--------+------------+--------+--------+-----------+------------------+------------------+------------+------------+---------------+---------------+-----------------------+----------------+--------------------+-----------------+-----------------+------------------+----------+----------+---------------+---------------+
|Match_Date|        Home_Team|Home_Sh|Home_SoT|Home_Touches|Home_Tkl|Home_Int|Home_Blocks|  Home_xG_Expected|Home_npxG_Expected|Home_SCA_SCA|Home_GCA_SCA|Home_Cmp_Passes|Home_Att_Passes|Home_Cmp_percent_Passes|Home_PrgP_Passes|Home_Carries_Carries|Home_PrgC_Carries|Home_Att_Take_Ons|Home_Succ_Take_Ons|           Away_Team|Away_Sh|Away_SoT|Away_To

In [9]:
encodeer = OneHotEncoder(inputCols=[encodeCol+"_Index" for encodeCol in encodeCols],outputCols=[encodeCol+"_Onehot" for encodeCol in encodeCols])
encodeer_df = encodeer.fit(indexer_df).transform(indexer_df)
encodeer_df.show(5)

+----------+-----------------+-------+--------+------------+--------+--------+-----------+------------------+------------------+------------+------------+---------------+---------------+-----------------------+----------------+--------------------+-----------------+-----------------+------------------+--------------------+-------+--------+------------+--------+--------+-----------+------------------+------------------+------------+------------+---------------+---------------+-----------------------+----------------+--------------------+-----------------+-----------------+------------------+----------+----------+---------------+---------------+----------------+----------------+
|Match_Date|        Home_Team|Home_Sh|Home_SoT|Home_Touches|Home_Tkl|Home_Int|Home_Blocks|  Home_xG_Expected|Home_npxG_Expected|Home_SCA_SCA|Home_GCA_SCA|Home_Cmp_Passes|Home_Att_Passes|Home_Cmp_percent_Passes|Home_PrgP_Passes|Home_Carries_Carries|Home_PrgC_Carries|Home_Att_Take_Ons|Home_Succ_Take_Ons|           

In [10]:

indexedCols = [encodeCol+"_Index" for encodeCol in encodeCols]
assembler_inputCols = encodeer_df.columns
assembler_inputCols = [assembler_inputCol for assembler_inputCol in assembler_inputCols \
                       if assembler_inputCol not in encodeCols and assembler_inputCol not in indexedCols and assembler_inputCol not in unusedCols]
assembler_df = encodeer_df.select(assembler_inputCols)
assembler = VectorAssembler(inputCols=assembler_inputCols,outputCol="features")

In [11]:
assembler_inputCols

['Home_Sh',
 'Home_SoT',
 'Home_Touches',
 'Home_Tkl',
 'Home_Int',
 'Home_Blocks',
 'Home_xG_Expected',
 'Home_npxG_Expected',
 'Home_SCA_SCA',
 'Home_GCA_SCA',
 'Home_Cmp_Passes',
 'Home_Att_Passes',
 'Home_Cmp_percent_Passes',
 'Home_PrgP_Passes',
 'Home_Carries_Carries',
 'Home_PrgC_Carries',
 'Home_Att_Take_Ons',
 'Home_Succ_Take_Ons',
 'Away_Sh',
 'Away_SoT',
 'Away_Touches',
 'Away_Tkl',
 'Away_Int',
 'Away_Blocks',
 'Away_xG_Expected',
 'Away_npxG_Expected',
 'Away_SCA_SCA',
 'Away_GCA_SCA',
 'Away_Cmp_Passes',
 'Away_Att_Passes',
 'Away_Cmp_percent_Passes',
 'Away_PrgP_Passes',
 'Away_Carries_Carries',
 'Away_PrgC_Carries',
 'Away_Att_Take_Ons',
 'Away_Succ_Take_Ons',
 'Home_Score',
 'Away_Score',
 'Away_Team_Onehot',
 'Home_Team_Onehot']

In [12]:
output_vector = assembler.transform(assembler_df)
output_vector.show(5)

+-------+--------+------------+--------+--------+-----------+------------------+------------------+------------+------------+---------------+---------------+-----------------------+----------------+--------------------+-----------------+-----------------+------------------+-------+--------+------------+--------+--------+-----------+------------------+------------------+------------+------------+---------------+---------------+-----------------------+----------------+--------------------+-----------------+-----------------+------------------+----------+----------+----------------+----------------+--------------------+
|Home_Sh|Home_SoT|Home_Touches|Home_Tkl|Home_Int|Home_Blocks|  Home_xG_Expected|Home_npxG_Expected|Home_SCA_SCA|Home_GCA_SCA|Home_Cmp_Passes|Home_Att_Passes|Home_Cmp_percent_Passes|Home_PrgP_Passes|Home_Carries_Carries|Home_PrgC_Carries|Home_Att_Take_Ons|Home_Succ_Take_Ons|Away_Sh|Away_SoT|Away_Touches|Away_Tkl|Away_Int|Away_Blocks|  Away_xG_Expected|Away_npxG_Expected|Awa

In [13]:
output_vector.select("features","Home_Score","Away_Score").show()

+--------------------+----------+----------+
|            features|Home_Score|Away_Score|
+--------------------+----------+----------+
|(90,[0,1,2,3,4,5,...|         1|         1|
|(90,[0,1,2,3,4,5,...|         1|         0|
|(90,[0,1,2,3,4,5,...|         1|         1|
|(90,[0,1,2,3,4,5,...|         2|         1|
|(90,[0,1,2,3,4,5,...|         5|         0|
|(90,[0,1,2,3,4,5,...|         4|         4|
|(90,[0,1,2,3,4,5,...|         1|         0|
|(90,[0,1,2,3,4,5,...|         2|         1|
|(90,[0,1,2,3,4,5,...|         1|         4|
|(90,[0,1,2,3,4,5,...|         2|         1|
|(90,[0,1,2,3,4,5,...|         4|         1|
|(90,[0,1,2,3,4,5,...|         1|         1|
|(90,[0,1,2,3,4,5,...|         0|         0|
|(90,[0,1,2,3,4,5,...|         1|         0|
|(90,[0,1,2,3,4,5,...|         3|         1|
|(90,[0,1,2,3,4,5,...|         3|         1|
|(90,[0,1,2,3,4,5,...|         1|         1|
|(90,[0,1,2,3,4,5,...|         1|         0|
|(90,[0,1,2,3,4,5,...|         0|         1|
|(90,[0,1,

# XGBoost

In [14]:
from xgboost.spark import SparkXGBRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [15]:
model_df = output_vector.select("features","Home_Score")
# model_df = output_vector.select("Home_Blocks","Home_Score")
model_df.show(5)

+--------------------+----------+
|            features|Home_Score|
+--------------------+----------+
|(90,[0,1,2,3,4,5,...|         1|
|(90,[0,1,2,3,4,5,...|         1|
|(90,[0,1,2,3,4,5,...|         1|
|(90,[0,1,2,3,4,5,...|         2|
|(90,[0,1,2,3,4,5,...|         5|
+--------------------+----------+
only showing top 5 rows



In [16]:
training_df, test_df = model_df.randomSplit([0.7, 0.3])

In [17]:
xgBoost = SparkXGBRegressor(
  features_col="features",
  label_col="Home_Score",
  prediction_col="Home_Score_Prediction",
  num_workers=1
)

# xgBoost = SparkXGBRegressor(
#   features_col="Home_Blocks",
#   label_col="Home_Score",
#   prediction_col="Home_Score_Prediction",
#   num_workers=4
# )

In [18]:
paramGrid = ParamGridBuilder()\
  .addGrid(xgBoost.max_depth, [2, 5])\
  .addGrid(xgBoost.n_estimators, [10, 100])\
  .build()

In [21]:
import os

os.environ['SPARK_SUBMIT_OPTS'] = '--illegal-access=permit -Dio.netty.tryReflectionSetAccessible=true'

In [23]:
xgBoost_model = xgBoost.fit(training_df)

2023-11-02 18:43:41,949 INFO XGBoost-PySpark: _fit Running xgboost-2.0.0 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(19, 0) finished unsuccessfully.
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
	at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
	at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
	at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
	at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
	at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
	at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
	at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2216)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3042)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1046)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1045)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)


In [122]:
sc.stop()
spark.stop()