## **1.** Install PySpark and related dependencies

In [14]:
!pip install pyspark==3.2.1

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.2.1
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 39.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=1fb6fec9ce3670a9b64285985c7766b468bd298e3cadd14f4da0434219fd733e
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


Pandas API on Spark requires a JDBC driver to read so it requires the driver for your particular database to be on the Spark’s classpath. For SQLite JDBC driver, you can download it, for example, as below:

In [15]:
!curl -O https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.34.0/sqlite-jdbc-3.34.0.jar

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 7125k  100 7125k    0     0  40.2M      0 --:--:-- --:--:-- --:--:-- 39.9M


After that, you should add it into your Spark session first. Once you add them, pandas API on Spark will automatically detect the Spark session and leverage it.

## **2.** Import useful PySpark packages

In [16]:
import os
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import pyspark.pandas as ps

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"



## **3.** Create Spark context

In [17]:
# Create the session
conf = (SparkConf()
          .setMaster("local")
          .setAppName("Soccer Match Predictor")
          .set("spark.ui.port", "4050")
          .set('spark.executor.memory', '4G')
          .set('spark.driver.memory', '45G')
          .set('spark.driver.maxResultSize', '10G')
          .set("spark.jars","{}/sqlite-jdbc-3.34.0.jar".format(os.getcwd()))
          .set("spark.driver.extraClassPath","{}/sqlite-jdbc-3.34.0.jar".format(os.getcwd())))

# Create the context
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
sc._conf.getAll()

[('spark.master', 'local'),
 ('spark.app.startTime', '1653900934145'),
 ('spark.driver.extraClassPath', '/content/sqlite-jdbc-3.34.0.jar'),
 ('spark.driver.host', 'cdb21eb609fd'),
 ('spark.driver.memory', '45G'),
 ('spark.jars', '/content/sqlite-jdbc-3.34.0.jar'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.maxResultSize', '10G'),
 ('spark.ui.port', '4050'),
 ('spark.app.id', 'local-1653900935812'),
 ('spark.app.initial.jar.urls',
  'spark://cdb21eb609fd:43879/jars/sqlite-jdbc-3.34.0.jar'),
 ('spark.repl.local.jars', 'file:///content/sqlite-jdbc-3.34.0.jar'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.name', 'Soccer Match Predictor'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.port', '43879'),
 ('spark.executor.memory', '4G'),
 ('spark.ui.showConsoleProgress', 'true')]

## **1. Link Google Colab to our Google Drive**

In [1]:
GDRIVE_DIR = "/content/gdrive" # Your own mount point on Google Drive
GDRIVE_DATA_DIR = GDRIVE_DIR + "/MyDrive/big_data/datasets" # Your own data directory

In [2]:
# mounting our google drive
from google.colab import drive
drive.mount(GDRIVE_DIR, force_remount=True)

Mounted at /content/gdrive


# Players

## Read players data into 2 Spark Dataframes


In [61]:
import pandas as pd
import sqlite3

In [60]:
players_df = pd.read_csv(f"{GDRIVE_DATA_DIR}/players.csv")

In [12]:
player_attributes_df = ps.read_sql_table("Player_Attributes", con=f"jdbc:sqlite:{GDRIVE_DATA_DIR}/ESDB.sqlite")

In [62]:
db_conn = sqlite3.connect(f"{GDRIVE_DATA_DIR}/ESDB.sqlite")

In [64]:
player_attributes_df = pd.read_sql("SELECT * FROM Player_Attributes;", db_conn)

## saving each player's attributes as a list into a column in players Dataframe

### Finding player's attributes history

In [66]:
# convert match results from goals count of each team to three categories of "home_win", "away_win" and "draw"
import json
players_attrs = []
players_count = len(players_df)
pfi_index =  players_df.columns.get_loc("player_fifa_api_id")
for i in range(players_count):
    print("iterate number --> ", i)
    player_fifa_id = players_df.iloc[i,pfi_index]
    player_attrs_history = player_attributes_df.loc[(player_attributes_df["player_fifa_api_id"] == player_fifa_id)]
    players_attrs.append(json.dumps(player_attrs_history.to_dict('records')))

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
iterate number -->  6060
iterate number -->  6061
iterate number -->  6062
iterate number -->  6063
iterate number -->  6064
iterate number -->  6065
iterate number -->  6066
iterate number -->  6067
iterate number -->  6068
iterate number -->  6069
iterate number -->  6070
iterate number -->  6071
iterate number -->  6072
iterate number -->  6073
iterate number -->  6074
iterate number -->  6075
iterate number -->  6076
iterate number -->  6077
iterate number -->  6078
iterate number -->  6079
iterate number -->  6080
iterate number -->  6081
iterate number -->  6082
iterate number -->  6083
iterate number -->  6084
iterate number -->  6085
iterate number -->  6086
iterate number -->  6087
iterate number -->  6088
iterate number -->  6089
iterate number -->  6090
iterate number -->  6091
iterate number -->  6092
iterate number -->  6093
iterate number -->  6094
iterate number -->  6095
iterate number -->  6096
iterate nu

### Saving attribute history of players in players Spark Dataframe

In [68]:
players_df.insert(4, 'attributes_history', players_attrs)


In [13]:
players_df.to_csv(f"{GDRIVE_DATA_DIR}/players_with_attributes.csv", index=False)

In [21]:
players_with_attributes_df = ps.read_csv(f"{GDRIVE_DATA_DIR}/players_with_attributes.csv")

## Functions to parse players data and find their attributes in a specific date

In [54]:
def find_player_attr_of_date(date, attrs):
    sorted_attrs = sorted(attrs, key=lambda d: d['date']) 
    matched_attr = {}
    for i in range(len(attrs)):
        if date == sorted_attrs[i]["date"]:
          return sorted_attrs[i]
        elif date < sorted_attrs[i]["date"]:
            if i == 0:
                return sorted_attrs[i]
        else:
            matched_attr = sorted_attrs[i]
    return matched_attr

In [20]:
from datetime import datetime


def parse_date_str(date_str: str):
    date = None
    date_str_list = date_str.split()
    try:
        date = datetime.strptime(" ".join(date_str_list[i] for i in range(3)), '%b %d, %Y')
    except Exception as e:
        print("error in saving birth_data ---> ", str(e))
    finally:
        return date

# Match

## **1.Read dataset file into a Spark Dataframe**

In [22]:
matches_df = ps.read_sql_table("Match", con=f"jdbc:sqlite:{GDRIVE_DATA_DIR}/ESDB.sqlite")

## Here is column descpritions of Match table:

### Match 

**id**: Primary key of match table

**country_id**: Foreign key of Country table

**league_id**: Foreign key of League table

**season**: The football season which this match was held on

**stage**: Which week of season was this match for

**date**: The date of the match

**match_api_id**: Match ID in http://football-data.mx-api.enetscores.com/ where match information was retrieved from

**home_team_api_id**: Home team ID in http://football-data.mx-api.enetscores.com/

**away_team_api_id**: Away team ID in http://football-data.mx-api.enetscores.com/

**home_team_goal**: How many goals home team scored

**away_team_goal**: How many goals away team scored

#home_player_X(1-11)

**goal**: details of goals of the match

**shoton**: match stats of teams shot on target during the match

**shotoff**: match stats of teams shot off target during the match

**foulcommit**: match stats of fouls commited by each teams during the match

**card**: match stats of red or yellow cards which players recieved during the match

**cross**: match stats of how many times plyers tried to cross during the match

**corner**: match stats of corners which each teams got during the match

**possession**: match stats of each teams ball possession during the match

**B365H**: Bet365 home win odds

**B365D**: Bet365 draw odds

**B365A**: Bet365 away win odds

**BSH**: Blue Square home win odds

**BSD**: Blue Square draw odds

**BSA**:Blue Square away win odds

**BWH**: Bet&Win home win odds

**BWD**: Bet&Win draw odds

**BWA**: Bet&Win away win odds

**GBH**: Gamebookers home win odds

**GBD**: Gamebookers draw odds

**GBA**: Gamebookers away win odds

**IWH**: Interwetten home win odds

**IWD**: Interwetten draw odds

**IWA**: Interwetten away win odds

**LBH**: Ladbrokes home win odds

**LBD**: Ladbrokes draw odds

**LBA**: Ladbrokes away win odds

**PSH**: Pinnacle Sports home win odds

**PSD**: Pinnacle Sports draw odds

**PSA**: Pinnacle Sports away win odds

**SOH**: Sporting Odds home win odds

**SOD**: Sporting Odds draw odds

**SOA**: Sporting Odds away win odds

**SBH**: Sportingbet home win odds

**SBD**: Sportingbet draw odds

**SBA**: Sportingbet away win odds

**SJH**: Stan James home win odds

**SJD**: Stan James draw odds

**SJA**: Stan James away win odds

**SYH**:Stanleybet home win odds

**SYD**: Stanleybet draw odds

**SYA**: Stanleybet away win odds

**VCH**: VC Bet home win odds

**VCD**: VC Bet draw odds

**VCA**: VC Bet away win odds

**WHH**: William Hill home win odds

**WHD**: William Hill draw odds

**WHA**: William Hill away win odds

## Parsing Match

In [25]:
player_cordinates_columns = [f"{team}_player_{axis}{i}" for axis in ["X","Y"] for team in ["home", "away"] for i in range(1,12)]
player_id_columns = [f"{team}_player_{i}" for team in ["home", "away"] for i in range(1,12)]
match_stats_columns = ['home_team_goal','away_team_goal','goal','shoton','shotoff','foulcommit','card','cross','corner','possession']
betting_odds_columns = ['B365H','B365D','B365A','BWH','BWD','BWA','IWH','IWD','IWA','LBH','LBD','LBA','PSH','PSD','PSA','WHH','WHD','WHA','SJH','SJD','SJA','VCH','VCD','VCA','GBH','GBD','GBA','BSH','BSD','BSA']

In [23]:
def get_result_category(home_goals: int, away_goals: int):
    if home_goals > away_goals:
        return "home_win"
    elif home_goals < away_goals:
        return "away_win"
    else:
        return "draw"


In [35]:
new_matches_df = matches_df.copy()

In [None]:
match_player_features = ['id', 'player_fifa_api_id', 'player_api_id', 'date', 'tm_player_id', 'player_name']

In [84]:
# convert match results from goals count of each team to three categories of "home_win", "away_win" and "draw"
from datetime import datetime
from dateutil.relativedelta import relativedelta
import math
import ast


matches_info = []
matches_count = len(matches_df)
match_columns = matches_df.columns.values.tolist()
match_columns_index = {}
for idx in range(len(match_columns)):
    match_columns_index[match_columns[idx]] =  idx
for i in range(1000,1001):
    print("match number =========> ", i)
    match_info = (matches_df.iloc[i]).to_dict()
    match_date = datetime.strptime(match_info["date"], '%Y-%m-%d %H:%M:%S')
    # determine match categorical result
    match_info["result"] = get_result_category(match_info["home_team_goal"], match_info["away_team_goal"])
    for player_id_column in player_id_columns:
        player_dict = {}
        if math.isnan(match_info[player_id_column]):
            # player_id is not present in match table
            continue
        player_df = players_df.loc[(players_df["player_api_id"] == match_info[player_id_column])]
        if player_df.empty:
            # we could not find player with specified player_api_id
            continue
        # Since player_api_id is unique in our dataset so we just get the first retrieved dataframe
        player_info = player_df.to_dict("records")[0]
        birthday = datetime.strptime(player_info.pop("birthday"), '%Y-%m-%d %H:%M:%S')
        player_info["age"] = relativedelta(match_date, birthday).years
        value_history = ast.literal_eval(player_info.pop("value_history"))
        for vh in value_history:
            vh["date"] = datetime.strptime(vh["date"], '%b %d, %Y')  
        
        market_value_on_match_date = find_player_attr_of_date(match_date, value_history)
        player_info["market_value"] = market_value_on_match_date.get("value")
        # print(f'player {player_info["player_api_id"]} is {player_info["age"]} years old and worth {player_info["market_value"]} euros.')

        attributes_history = json.loads(player_info.pop("attributes_history"))
        for ah in attributes_history:
            ah["date"] = datetime.strptime(ah["date"], '%Y-%m-%d %H:%M:%S')
        attributes_on_match_date = find_player_attr_of_date(match_date, attributes_history)

        for k, v in attributes_on_match_date.items():
            player_info[k] = v

        unused_player_features = ['id', 'player_fifa_api_id', 'player_api_id', 'date', 'tm_player_id', 'player_name']

        for k, v in player_info.items():
            try:
                if k not in unused_player_features:
                    match_info[f"{player_id_column}_{k}"] = int(v)
            except:
                if v == "None":
                    match_info[f"{player_id_column}_{k}"] = None
                else:
                    match_info[f"{player_id_column}_{k}"] = v
        print(f'player {match_info[player_id_column]} infromation --> {player_info}')
    matches_info.append(match_info)

player 39573 infromation --> {'id': 136059, 'player_api_id': 39573, 'tm_player_id': 5881, 'player_name': 'Olivier Renard', 'player_fifa_api_id': 184412, 'height': 187.96, 'weight': 170, 'age': 33, 'market_value': 1200000, 'date': datetime.datetime(2012, 2, 22, 0, 0), 'overall_rating': 69.0, 'potential': 70.0, 'preferred_foot': 'right', 'attacking_work_rate': 'None', 'defensive_work_rate': '6', 'crossing': 12.0, 'finishing': 13.0, 'heading_accuracy': 11.0, 'short_passing': 32.0, 'volleys': 10.0, 'dribbling': 10.0, 'curve': 18.0, 'free_kick_accuracy': 12.0, 'long_passing': 32.0, 'ball_control': 31.0, 'acceleration': 46.0, 'sprint_speed': 55.0, 'agility': 61.0, 'reactions': 62.0, 'balance': 33.0, 'shot_power': 39.0, 'jumping': 68.0, 'stamina': 47.0, 'strength': 74.0, 'long_shots': 11.0, 'aggression': 30.0, 'interceptions': 22.0, 'positioning': 13.0, 'vision': 33.0, 'penalties': 10.0, 'marking': 10.0, 'standing_tackle': 11.0, 'sliding_tackle': 13.0, 'gk_diving': 69.0, 'gk_handling': 74.0, 

  df[column_name] = series


In [1]:
matches_info

NameError: ignored