In [1]:
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import numpy as np
from PIL import Image
from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType
from pyspark.sql import SparkSession
import io

In [2]:
# Create the session
conf = SparkConf().\
                set('spark.ui.port', "4050").\
                set('spark.executor.memory', '4G').\
                set('spark.driver.memory', '45G').\
                set('spark.driver.maxResultSize', '10G').\
                setAppName("PySparkTutorial").\
                setMaster("local[*]")

# Create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [3]:
spark

In [4]:
sc._conf.getAll()

[('spark.sql.warehouse.dir',
  'file:/C:/Users/marco/Desktop/BDC_project/BDC_project/spark-warehouse'),
 ('spark.app.name', 'PySparkTutorial'),
 ('spark.driver.host', 'LAPTOP-JLLVBEPM'),
 ('spark.driver.memory', '45G'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.maxResultSize', '10G'),
 ('spark.driver.port', '54177'),
 ('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.acti

In [5]:
path = "C:/Users/marco/Desktop"

In [6]:

try:
    df_appearances = spark.read.csv("archive/appearances.csv", header=True, inferSchema=True)
    df_club_games = spark.read.csv("archive/club_games.csv", header=True, inferSchema=True)
    df_clubs = spark.read.csv("archive/clubs.csv", header=True, inferSchema=True)
    df_competitions = spark.read.csv("archive/competitions.csv", header=True, inferSchema=True)
    df_game_events = spark.read.csv("archive/game_events.csv", header=True, inferSchema=True)
    df_games = spark.read.csv("archive/games.csv", header=True, inferSchema=True)
    df_player_valuations = spark.read.csv("archive/player_valuations.csv", header=True, inferSchema=True)
    df_players = spark.read.csv("archive/players.csv", header=True, inferSchema=True)
except Exception:
    df_appearances = spark.read.csv(path+"/archive/appearances.csv", header=True, inferSchema=True)
    df_club_games = spark.read.csv(path+"/archive/club_games.csv", header=True, inferSchema=True)
    df_clubs = spark.read.csv(path+"/archive/clubs.csv", header=True, inferSchema=True)
    df_competitions = spark.read.csv(path+"/archive/competitions.csv", header=True, inferSchema=True)
    df_game_events = spark.read.csv(path+"/archive/game_events.csv", header=True, inferSchema=True)
    df_games = spark.read.csv(path+"/archive/games.csv", header=True, inferSchema=True)
    df_player_valuations = spark.read.csv(path+"/archive/player_valuations.csv", header=True, inferSchema=True)
    df_players = spark.read.csv(path+"/archive/players.csv", header=True, inferSchema=True)


In [7]:
# join players and appearances
df_players_appearances = df_players.join(df_appearances, ["player_id"], how='inner')

In [8]:
# drop useless and duplicated features from df_players_appearances
df_players_appearances = df_players_appearances.drop("current_club_id", "appearance_id",
                                                        "highest_market_value_in_eur", "current_club_name",
                                                        "city_of_birth", "market_value_in_eur",
                                                        "contract_expiration_date", "agent_name",
                                                        "current_club_domestic_competition_id", "image_url",
                                                        "last_season", "url", "game_id", "player_current_club_id",
                                                        "first_name", "last_name", "player_name", "player_code")

In [9]:
# drop useless and duplicated features from df_players_valuations
df_player_valuations = df_player_valuations.drop("datetime", "dateweek")

In [10]:
# rename the column date of df_players_valuations in date_v to avoid confusion with the date of df_players_appearances
df_player_valuations = df_player_valuations.withColumnRenamed("date", "date_v")

In [11]:
# Join the two dataframes on player_id
df_valuations_appearances = df_player_valuations.join(df_players_appearances, "player_id")

In [12]:
# TODO decide if we want to keep the players with no appearances, in case we have to do a union with valuations
# adding before the zeroed column of df_players_appearances

In [13]:
# Assign aliases to the tables
df_v = df_valuations_appearances.alias("df_v")

In [14]:
# we want to keep only the rows where the appearence date is within 1 year from the valuation date
df_filtered = df_v.filter(
    (year(df_v.date_v) == year(df_v.date) + 1) & (month(df_v.date_v) <= month(df_v.date)) |
    (year(df_v.date_v) == year(df_v.date)) & (month(df_v.date_v) > month(df_v.date)) |
    (year(df_v.date_v) == year(df_v.date)) & (month(df_v.date_v) == month(df_v.date)) & (dayofmonth(df_v.date_v) > dayofmonth(df_v.date)) |
    (year(df_v.date_v) == year(df_v.date) + 1) & (month(df_v.date_v) == month(df_v.date)) & (dayofmonth(df_v.date_v) <= dayofmonth(df_v.date))
)

In [15]:
# Group by the player_id and the valuation date and extract all the important features
from pyspark.sql.functions import col, year, collect_set, count, sum

df_result = df_filtered.groupBy(
    "player_id", col("market_value_in_eur").alias("market_value"), "date_v",
    col("current_club_id").alias("club_id"), col("height_in_cm").alias("height"),
    col("country_of_citizenship").alias("citizenship"), year("date_of_birth").alias("year_b"), "position",
    "sub_position") \
    .agg(collect_set("competition_id").alias("competition_id"),
            collect_set("player_club_id").alias("player_club_id"),
            count("date").alias("app"),
            sum("assists").alias("assists"),
            sum("goals").alias("goals"),
            sum("minutes_played").alias("minutes_played"),
            sum("red_cards").alias("red_cards"),
            sum("yellow_cards").alias("yellow_cards"))

In [16]:
#add last valuation in temporal terms
df_result = df_result.withColumn("last_valuation", lag(df_result.market_value).over(Window.partitionBy("player_id").orderBy("date_v")))

In [17]:
# Adding clubs stats to the instances (player_id, date_v)

In [18]:
df_result.filter(df_result.player_id == 148455).show(100)

+---------+------------+----------+-------+------+-----------+------+--------+------------+--------------------+--------------+---+-------+-----+--------------+---------+------------+--------------+
|player_id|market_value|    date_v|club_id|height|citizenship|year_b|position|sub_position|      competition_id|player_club_id|app|assists|goals|minutes_played|red_cards|yellow_cards|last_valuation|
+---------+------------+----------+-------+------+-----------+------+--------+------------+--------------------+--------------+---+-------+-----+--------------+---------+------------+--------------+
|   148455|    10000000|2015-02-12|     31|   175|      Egypt|  1992|  Attack|Right Winger| [CL, FAC, IT1, GB1]|    [430, 631]|  7|      1|    0|           202|        0|           0|          null|
|   148455|    12000000|2015-03-20|     31|   175|      Egypt|  1992|  Attack|Right Winger|[CL, FAC, CIT, EL...|    [430, 631]| 16|      3|    6|           879|        0|           1|      10000000|
|   1

In [19]:
#join the two tables on game_id but of df_games we need only the date and of df_club_games we need to have everything
df_club_games_j = df_club_games.join(df_games.select("game_id", "date"), "game_id", how='inner')

In [20]:
df_club_games_j.show()

+-------+-------+---------+------------+------------------+-----------+--------------+-----------------+---------------------+-------+------+----------+
|game_id|club_id|own_goals|own_position|  own_manager_name|opponent_id|opponent_goals|opponent_position|opponent_manager_name|hosting|is_win|      date|
+-------+-------+---------+------------+------------------+-----------+--------------+-----------------+---------------------+-------+------+----------+
|2229332|     27|        2|          -1|     Jupp Heynckes|         16|             1|               -1|         Jürgen Klopp|   Home|     1|2012-08-12|
|2244388|    131|        3|          -1|     Tito Vilanova|        418|             2|               -1|        José Mourinho|   Home|     1|2012-08-22|
|2269557|   3709|        0|          -1|       Luis García|       4032|             0|               -1|     Claudio Barragán|   Home|     0|2012-11-28|
|2254432|  21322|        1|          -1|Pedro Buenaventura|       7077|           

In [21]:
from pyspark.sql.functions import explode, col, expr
from pyspark.sql.window import Window

# Expands the player_club_id list into separate columns
df_result_expanded = df_result.withColumn("club_id", explode(col("player_club_id")))

# Peform join based on club_id and apply condition on date
df_join = df_result_expanded.join(df_club_games_j, ["club_id"]) \
    .where(expr("date <= date_v AND date >= date_v - INTERVAL 1 YEAR"))

# Select columns
df_join = df_join.select(df_result.columns + df_club_games_j.columns)

In [22]:
df_join.show()

+---------+------------+----------+-------+------+-------------+------+--------+--------------+--------------+--------------+---+-------+-----+--------------+---------+------------+--------------+-------+-------+---------+------------+----------------+-----------+--------------+-----------------+---------------------+-------+------+----------+
|player_id|market_value|    date_v|club_id|height|  citizenship|year_b|position|  sub_position|competition_id|player_club_id|app|assists|goals|minutes_played|red_cards|yellow_cards|last_valuation|game_id|club_id|own_goals|own_position|own_manager_name|opponent_id|opponent_goals|opponent_position|opponent_manager_name|hosting|is_win|      date|
+---------+------------+----------+-------+------+-------------+------+--------+--------------+--------------+--------------+---+-------+-----+--------------+---------+------------+--------------+-------+-------+---------+------------+----------------+-----------+--------------+-----------------+-----------

In [23]:
df_join.filter(df_join.player_id == 148455).show(1000)

+---------+------------+----------+-------+------+-----------+------+--------+------------+--------------------+--------------+---+-------+-----+--------------+---------+------------+--------------+-------+-------+---------+------------+--------------------+-----------+--------------+-----------------+---------------------+-------+------+----------+
|player_id|market_value|    date_v|club_id|height|citizenship|year_b|position|sub_position|      competition_id|player_club_id|app|assists|goals|minutes_played|red_cards|yellow_cards|last_valuation|game_id|club_id|own_goals|own_position|    own_manager_name|opponent_id|opponent_goals|opponent_position|opponent_manager_name|hosting|is_win|      date|
+---------+------------+----------+-------+------+-----------+------+--------+------------+--------------------+--------------+---+-------+-----+--------------+---------+------------+--------------+-------+-------+---------+------------+--------------------+-----------+--------------+-----------

In [24]:
# add column draw that is equal to 1 if own_goals = opponent_goals
df_joinn = df_join.withColumn("is_draw", when(df_join.own_goals == df_join.opponent_goals, 1).otherwise(0))

#add column games_played that counts the instances with key (player_id, date_v)
df_joinn = df_joinn.withColumn("games_played", count(df_joinn.date_v).over(Window.partitionBy("date_v", "player_id")))

# add column games_won that counts the instances with key (player_id, date_v) and is_win = 1
df_joinn = df_joinn.withColumn("games_won", count(when(df_joinn.is_win == 1, 1)).over(Window.partitionBy("date_v", "player_id")))

# add column games_draw that counts the instances with key (player_id, date_v) and is_draw = 1
df_joinn = df_joinn.withColumn("games_draw", count(when(df_joinn.is_draw == 1, 1)).over(Window.partitionBy("date_v", "player_id")))

# add column games_lost that counts the instances with key (player_id, date_v) and is_win = 0 and is_draw = 0
df_joinn = df_joinn.withColumn("games_lost", df_joinn.games_played - df_joinn.games_won - df_joinn.games_draw)

df_joinn.filter(df_joinn.player_id == 148455).show(1000)

+---------+------------+----------+-------+------+-----------+------+--------+------------+--------------------+--------------+---+-------+-----+--------------+---------+------------+--------------+-------+-------+---------+------------+--------------------+-----------+--------------+-----------------+---------------------+-------+------+----------+-------+------------+---------+----------+----------+
|player_id|market_value|    date_v|club_id|height|citizenship|year_b|position|sub_position|      competition_id|player_club_id|app|assists|goals|minutes_played|red_cards|yellow_cards|last_valuation|game_id|club_id|own_goals|own_position|    own_manager_name|opponent_id|opponent_goals|opponent_position|opponent_manager_name|hosting|is_win|      date|is_draw|games_played|games_won|games_draw|games_lost|
+---------+------------+----------+-------+------+-----------+------+--------+------------+--------------------+--------------+---+-------+-----+--------------+---------+------------+-------

In [25]:
#delete some columns
df_join = df_joinn.drop("game_id", "club_id", "own_goals",  "own_position", "own_manager_name", "opponent_id", "opponent_goals", "opponent_position", "opponent_manager_name", "hosting", "is_win", "date", "is_draw")

In [26]:
df_join.filter(df_join.player_id == 148455).show(100)

+---------+------------+----------+------+-----------+------+--------+------------+-------------------+--------------+---+-------+-----+--------------+---------+------------+--------------+------------+---------+----------+----------+
|player_id|market_value|    date_v|height|citizenship|year_b|position|sub_position|     competition_id|player_club_id|app|assists|goals|minutes_played|red_cards|yellow_cards|last_valuation|games_played|games_won|games_draw|games_lost|
+---------+------------+----------+------+-----------+------+--------+------------+-------------------+--------------+---+-------+-----+--------------+---------+------------+--------------+------------+---------+----------+----------+
|   148455|    10000000|2015-02-12|   175|      Egypt|  1992|  Attack|Right Winger|[CL, FAC, IT1, GB1]|    [430, 631]|  7|      1|    0|           202|        0|           0|          null|         102|       55|        24|        23|
|   148455|    10000000|2015-02-12|   175|      Egypt|  1992

In [27]:
# delete duplicates
df_joinn = df_join.dropDuplicates(["player_id", "date_v"])

In [28]:
df_joinn.show(100)

+---------+------------+----------+------+----------------+------+----------+------------------+--------------------+---------------+---+-------+-----+--------------+---------+------------+--------------+------------+---------+----------+----------+
|player_id|market_value|    date_v|height|     citizenship|year_b|  position|      sub_position|      competition_id| player_club_id|app|assists|goals|minutes_played|red_cards|yellow_cards|last_valuation|games_played|games_won|games_draw|games_lost|
+---------+------------+----------+------+----------------+------+----------+------------------+--------------------+---------------+---+-------+-----+--------------+---------+------------+--------------+------------+---------+----------+----------+
|       26|     1000000|2017-02-07|   190|         Germany|  1980|Goalkeeper|              null|   [CL, DFB, L1, EL]|           [16]| 16|      0|    0|          1470|        0|           1|       1000000|          51|       31|        13|         7|


In [29]:
# for each (player_id, date_v) add a column with the result of (games_won*3 + games_draw)/games_played
df_join = df_joinn.withColumn("winning_rate", (df_joinn.games_won*3 + df_joinn.games_draw)/df_joinn.games_played)

In [30]:
df_join.filter(df_join.player_id == 148455).show(100)

+---------+------------+----------+------+-----------+------+--------+------------+--------------------+--------------+---+-------+-----+--------------+---------+------------+--------------+------------+---------+----------+----------+------------------+
|player_id|market_value|    date_v|height|citizenship|year_b|position|sub_position|      competition_id|player_club_id|app|assists|goals|minutes_played|red_cards|yellow_cards|last_valuation|games_played|games_won|games_draw|games_lost|      winning_rate|
+---------+------------+----------+------+-----------+------+--------+------------+--------------------+--------------+---+-------+-----+--------------+---------+------------+--------------+------------+---------+----------+----------+------------------+
|   148455|    10000000|2015-02-12|   175|      Egypt|  1992|  Attack|Right Winger| [CL, FAC, IT1, GB1]|    [430, 631]|  7|      1|    0|           202|        0|           0|          null|         102|       55|        24|        23|