In [1]:
# start the spark server

from pyspark.sql import SparkSession

spark=SparkSession.builder.appName('goal_model').getOrCreate()

In [2]:
# First we need to find the distance from shot to the goal
# We will leverage a UDF for it.
import math
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(StringType())
def GetShotDistanceToGoal(x, y):
    ''' 
        Translate 0-100 (x,y) coordinate-based distances to absolute positions
        using "average" field dimensions of 105x68 before combining in 2D dist calc.

        Parameter:
            x (int): X-coordinates
            y (int): Y-coordinates
        
        Return:
            Float type 
    '''
    return str(math.sqrt(math.pow((100 - x) * (105/100), 2) + math.pow((50 - y) * 68/100, 2)))

In [3]:
from pyspark.sql.functions import try_divide
import math

@udf(StringType())
def GetShotAngleToGoal(x, y):
    return str(math.acos(
        try_divide(
            (
                (math.pow(105 - (x * (105/100)), 2) + math.pow(34 + (7.32/2) - (y * (68/100)), 2)) + 
                (math.pow(105 - (x * (105/100)), 2) + math.pow(34 - (7.32/2) - (y * (68/100)), 2)) -
                math.pow(7.32, 2)
            ),
            (2 * 
                (math.sqrt(math.pow(105- (x * (105/100)), 2) + math.pow(34 + (7.32/2) - (y * (68/100)), 2)) * (
                     math.sqrt(math.pow(105 - (x * (105/100)), 2) + math.pow(34 - (7.32/2) - (y * (68/100)), 2))
                    )
                )
             )
        ) * (180/ math.acos(-1))
    ))

In [4]:
# Before we create a model, we need to have a perfect data for it.
# Lets use position of the events for it.
from pyspark.sql.functions import col

events_df = spark.read.csv("../Dataset/events_England.csv", header=True)
events_df = events_df.where(((col("eventName") == "Free Kick") & (col("subEventName").isin(['Free kick shot', 'Penalty']))) | (col("eventName") == "Shot"))
events_df.show(truncate=False)


+-------+------------+--------------------------------------------------------------------+--------+------------------------------------------+-------+---------+------+-----------+-----------------+----------+---------+----------------------------+----------+----------+----------+----------+
|eventId|subEventName|tags                                                                |playerId|positions                                 |matchId|eventName|teamId|matchPeriod|eventSec         |subEventId|id       |tagsList                    |pos_orig_y|pos_orig_x|pos_dest_y|pos_dest_x|
+-------+------------+--------------------------------------------------------------------+--------+------------------------------------------+-------+---------+------+-----------+-----------------+----------+---------+----------------------------+----------+----------+----------+----------+
|10     |Shot        |[{'id': 101}, {'id': 402}, {'id': 201}, {'id': 1205}, {'id': 1801}] |25413   |[{'y': 41, 'x': 88}, 

In [5]:
matches_df = spark.read.csv("../Dataset/matches_England.csv", header=True)
matches_df.show()

+------+-------+--------+--------------------+--------+-------------------+------+--------------------+-------+--------------------+--------------------+--------------------+--------+-------------+-------------+-------------+----------+------------+-----------+------------+------------------+--------------------+-------------+---------------------+----------------------+-----------------------------+-------------+-------------+----------+------------+-----------+------------+------------------+--------------------+-------------+---------------------+----------------------+-----------------------------+
|status|roundId|gameweek|           teamsData|seasonId|            dateutc|winner|               venue|   wyId|               label|                date|            referees|duration|competitionId|team1.scoreET|team1.coachId|team1.side|team1.teamId|team1.score|team1.scoreP|team1.hasFormation|     team1.formation|team1.scoreHT|team1.formation.bench|team1.formation.lineup|team1.formation.s

In [6]:
# Left joinin matches and events dataframe

combine_df = events_df.join(matches_df, on=matches_df.wyId == events_df.matchId, how="left")
combine_df.show()

+-------+------------+--------------------+--------+--------------------+-------+---------+------+-----------+-----------------+----------+---------+--------------------+----------+----------+----------+----------+------+-------+--------+--------------------+--------+-------------------+------+----------------+-------+--------------------+--------------------+--------------------+--------+-------------+-------------+-------------+----------+------------+-----------+------------+------------------+--------------------+-------------+---------------------+----------------------+-----------------------------+-------------+-------------+----------+------------+-----------+------------+------------------+--------------------+-------------+---------------------+----------------------+-----------------------------+
|eventId|subEventName|                tags|playerId|           positions|matchId|eventName|teamId|matchPeriod|         eventSec|subEventId|       id|            tagsList|pos_orig_y|

In [7]:
# first we split the tagslist into proper datatype

from pyspark.sql.functions import contains

combine_df = combine_df.withColumn('isGoal', col('tagsList').contains("101"))
combine_df.show()


+-------+------------+--------------------+--------+--------------------+-------+---------+------+-----------+-----------------+----------+---------+--------------------+----------+----------+----------+----------+------+-------+--------+--------------------+--------+-------------------+------+----------------+-------+--------------------+--------------------+--------------------+--------+-------------+-------------+-------------+----------+------------+-----------+------------+------------------+--------------------+-------------+---------------------+----------------------+-----------------------------+-------------+-------------+----------+------------+-----------+------------+------------------+--------------------+-------------+---------------------+----------------------+-----------------------------+------+
|eventId|subEventName|                tags|playerId|           positions|matchId|eventName|teamId|matchPeriod|         eventSec|subEventId|       id|            tagsList|pos_

In [8]:
from pyspark.sql.types import IntegerType

# First lets change the string datatype into integer type for position

combine_df = combine_df.withColumn("pos_orig_x", col("pos_orig_x").cast(IntegerType())).withColumn("pos_orig_y",col("pos_orig_y").cast(IntegerType()))
combine_df.show()

+-------+------------+--------------------+--------+--------------------+-------+---------+------+-----------+-----------------+----------+---------+--------------------+----------+----------+----------+----------+------+-------+--------+--------------------+--------+-------------------+------+----------------+-------+--------------------+--------------------+--------------------+--------+-------------+-------------+-------------+----------+------------+-----------+------------+------------------+--------------------+-------------+---------------------+----------------------+-----------------------------+-------------+-------------+----------+------------+-----------+------------+------------------+--------------------+-------------+---------------------+----------------------+-----------------------------+------+
|eventId|subEventName|                tags|playerId|           positions|matchId|eventName|teamId|matchPeriod|         eventSec|subEventId|       id|            tagsList|pos_

In [9]:
new_df = combine_df.withColumn("shotDistance", GetShotDistanceToGoal(col("pos_orig_x"), col("pos_orig_y"))).withColumn("shotAngle", GetShotAngleToGoal(col("pos_orig_x"), col("pos_orig_y")))

In [11]:
new_df.printSchema()

Py4JJavaError: An error occurred while calling o127.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 1 times, most recent failure: Lost task 0.0 in stage 13.0 (TID 18) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:54)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 27 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:418)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3537)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:54)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 27 more
