In [None]:
import pandas as pd

In [None]:
!pip install pyspark
!pip install findspark
import findspark
findspark.init()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 34.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=60b85ae5eeb053f1da30d7b81d43fba283ca6d4e913ed25010c317ab7b386f50
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [None]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
class MissingDict(dict):
    __missing__ = lambda self, key: key

map_values = {"Brighton and Hove Albion": "Brighton", 
              "Manchester United": "Manchester Utd", 
              "Newcastle United": "Newcastle Utd", 
              "Tottenham Hotspur": "Tottenham", 
              "West Ham United": "West Ham", 
              "Wolverhampton Wanderers": "Wolves"} 
mapping = MissingDict(**map_values)

In [None]:
def frompd(path):
  matches = pd.read_csv(path, index_col=0)
  del matches["notes"]
  matches["date"] = pd.to_datetime(matches["date"])
  matches["hour"] = matches["time"].str.replace(":.+", "", regex=True).astype("int")
  matches["day_code"] = matches["date"].dt.dayofweek
  matches["target"] = (matches["result"] == "W").astype("int")
  matches["venue_code"] =(matches["venue"] == "Away").astype("int")
  matches["new_team"] = matches["team"].map(mapping)

  return(matches)







In [None]:
matches=frompd("/content/drive/MyDrive/matches.csv")
matches2=frompd("/content/drive/MyDrive/matches2.csv")

In [None]:
df1 = spark.createDataFrame(matches) 
df2 = spark.createDataFrame(matches2)
df=df1.union(df2)

from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='new_team', outputCol='new_team_numeric').fit(df)
indexed_df1 = indexer.transform(df)

In [None]:
indexer = StringIndexer(inputCol='opponent', outputCol='opponent_numeric').fit(indexed_df1 )
df = indexer.transform(indexed_df1)

In [None]:
df.select('team','new_team_numeric').distinct().sort("new_team_numeric").show(100)

+--------------------+----------------+
|                team|new_team_numeric|
+--------------------+----------------+
|    Newcastle United|             0.0|
|         Southampton|             1.0|
|     West Ham United|             2.0|
|Brighton and Hove...|             3.0|
|   Manchester United|             4.0|
|   Tottenham Hotspur|             5.0|
|Wolverhampton Wan...|             6.0|
|             Arsenal|             7.0|
|         Aston Villa|             8.0|
|      Crystal Palace|             9.0|
|             Everton|            10.0|
|        Leeds United|            11.0|
|      Leicester City|            12.0|
|     Manchester City|            13.0|
|             Chelsea|            14.0|
|             Burnley|            15.0|
|           Liverpool|            16.0|
|           Brentford|            17.0|
|        Norwich City|            18.0|
|             Watford|            19.0|
|              Fulham|            20.0|
|    Sheffield United|            21.0|


In [None]:
df.show()

+-------------------+-----+--------------+------------+---+-----+------+---+---+---------------+---+---+----+----------+---------------+---------+--------------+------------+----+----+----+---+---+-----+------+---------------+----+--------+------+----------+---------------+----------------+----------------+
|               date| time|          comp|       round|day|venue|result| gf| ga|       opponent| xg|xga|poss|attendance|        captain|formation|       referee|match report|  sh| sot|dist| fk| pk|pkatt|season|           team|hour|day_code|target|venue_code|       new_team|new_team_numeric|opponent_numeric|
+-------------------+-----+--------------+------------+---+-----+------+---+---+---------------+---+---+----+----------+---------------+---------+--------------+------------+----+----+----+---+---+-----+------+---------------+----+--------+------+----------+---------------+----------------+----------------+
|2022-08-05 00:00:00|20:00|Premier League| Matchweek 1|Fri| Away|     W|2

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
df.columns

['date',
 'time',
 'comp',
 'round',
 'day',
 'venue',
 'result',
 'gf',
 'ga',
 'opponent',
 'xg',
 'xga',
 'poss',
 'attendance',
 'captain',
 'formation',
 'referee',
 'match report',
 'sh',
 'sot',
 'dist',
 'fk',
 'pk',
 'pkatt',
 'season',
 'team',
 'hour',
 'day_code',
 'target',
 'venue_code',
 'new_team',
 'new_team_numeric',
 'opponent_numeric']

In [None]:
assembler=VectorAssembler(inputCols=['hour',
 'day_code',
 'venue_code',
 'new_team_numeric',
 'opponent_numeric'],outputCol='features')

In [None]:
output=assembler.transform(df)

In [None]:
output.select(['features','target']).show(truncate=False)

+------------------------+------+
|features                |target|
+------------------------+------+
|[20.0,4.0,1.0,7.0,10.0] |1     |
|[15.0,5.0,0.0,7.0,13.0] |1     |
|[17.0,5.0,1.0,7.0,23.0] |1     |
|[17.0,5.0,0.0,7.0,20.0] |1     |
|[19.0,2.0,0.0,7.0,6.0]  |1     |
|[16.0,6.0,1.0,7.0,8.0]  |0     |
|[12.0,6.0,1.0,7.0,17.0] |1     |
|[12.0,5.0,0.0,7.0,3.0]  |1     |
|[16.0,6.0,0.0,7.0,2.0]  |1     |
|[14.0,6.0,1.0,7.0,12.0] |1     |
|[14.0,6.0,1.0,7.0,1.0]  |0     |
|[14.0,6.0,0.0,7.0,24.0] |1     |
|[12.0,6.0,1.0,7.0,15.0] |1     |
|[19.0,5.0,1.0,7.0,5.0]  |1     |
|[16.0,6.0,1.0,13.0,4.0] |1     |
|[15.0,5.0,0.0,13.0,23.0]|1     |
|[16.0,6.0,1.0,13.0,0.0] |0     |
|[15.0,5.0,0.0,13.0,10.0]|1     |
|[19.0,2.0,0.0,13.0,24.0]|1     |
|[17.0,5.0,1.0,13.0,6.0] |0     |
+------------------------+------+
only showing top 20 rows



In [None]:
model_df=output.select(['features','target'])

In [None]:
training_df,test_df=model_df.randomSplit([0.7,0.3])

In [None]:
from pyspark.ml.classification import RandomForestClassifier

In [None]:
rf=RandomForestClassifier(labelCol="target",numTrees=50).fit(training_df)

In [None]:
rf_predictions=rf.transform(test_df)

In [None]:
rf_predictions.show()

+--------------------+------+--------------------+--------------------+----------+
|            features|target|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+----------+
|[12.0,5.0,0.0,4.0...|     0|[22.9437208814053...|[0.45887441762810...|       1.0|
|[12.0,5.0,0.0,4.0...|     0|[22.7546540652325...|[0.45509308130465...|       1.0|
|[12.0,5.0,0.0,6.0...|     0|[39.7016586764725...|[0.79403317352945...|       0.0|
|[12.0,5.0,0.0,10....|     0|[36.6825903012543...|[0.73365180602508...|       0.0|
|[12.0,5.0,0.0,13....|     1|[13.8050194895039...|[0.27610038979007...|       1.0|
|[12.0,5.0,0.0,13....|     0|[14.3389997194242...|[0.28677999438848...|       1.0|
|[12.0,5.0,0.0,16....|     1|[23.1165329477591...|[0.46233065895518...|       1.0|
|[12.0,5.0,1.0,5.0...|     0|[23.7091272773707...|[0.47418254554741...|       1.0|
|[12.0,5.0,1.0,5.0...|     1|[18.5229569547922...|[0.37045913909584...|       1.0|
|[12

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
rf_ac=BinaryClassificationEvaluator(labelCol="target").evaluate(rf_predictions)

In [None]:
rf_ac

0.7155320948591836

In [None]:
rf.save("\model")