In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, to_json, array_join


In [2]:
spark_sess = SparkSession.builder.appName("TempSession").getOrCreate()
spark_sess


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/01 09:34:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# spark_sess.stop()

In [4]:
data = [{"name": "Cedric", "s_year": 6}, {"name": "Harry", "s_year": 4}]
data

[{'name': 'Cedric', 's_year': 6}, {'name': 'Harry', 's_year': 4}]

In [5]:
df = spark_sess.createDataFrame(data)
df

DataFrame[name: string, s_year: bigint]

In [6]:
df.show()


                                                                                

+------+------+
|  name|s_year|
+------+------+
|Cedric|     6|
| Harry|     4|
+------+------+



In [7]:
from pyspark.sql import Row


In [8]:
new_column = [Row({"team": "HufflePuff", "avg_SPI": 8.2}), Row({"team": "Gryffindor", "avg_SPI": 7.8})]
new_column

[<Row({'team': 'HufflePuff', 'avg_SPI': 8.2})>,
 <Row({'team': 'Gryffindor', 'avg_SPI': 7.8})>]

In [9]:
other_df = spark_sess.createDataFrame(new_column, schema="player_stats map<string,string>")
other_df

DataFrame[player_stats: map<string,string>]

In [10]:
other_df.show()

+--------------------+
|        player_stats|
+--------------------+
|{team -> HufflePu...|
|{team -> Gryffind...|
+--------------------+



In [11]:
other_df.dtypes, other_df.count()

([('player_stats', 'map<string,string>')], 2)

In [12]:
# df.union(other_df)
"""
default inner. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.

"""
# x = df.join(other_df, how="full")
# x = df.join(other_df, how="inner")
# x = df.join(other_df, how="outer")
x = df.join(other_df, how="full")

In [13]:
x.show()

+------+------+--------------------+
|  name|s_year|        player_stats|
+------+------+--------------------+
|Cedric|     6|{team -> HufflePu...|
|Cedric|     6|{team -> Gryffind...|
| Harry|     4|{team -> HufflePu...|
| Harry|     4|{team -> Gryffind...|
+------+------+--------------------+



In [14]:
x.count()

4

In [15]:
other_df.join(df).show()



+--------------------+------+------+
|        player_stats|  name|s_year|
+--------------------+------+------+
|{team -> HufflePu...|Cedric|     6|
|{team -> HufflePu...| Harry|     4|
|{team -> Gryffind...|Cedric|     6|
|{team -> Gryffind...| Harry|     4|
+--------------------+------+------+



                                                                                

In [16]:
from pyspark.sql.functions import monotonically_increasing_id


In [17]:
temp_col1 = df.withColumn("index", monotonically_increasing_id())
temp_col2 = other_df.withColumn("index", monotonically_increasing_id())

In [18]:
temp_col1.show()

+------+------+-----------+
|  name|s_year|      index|
+------+------+-----------+
|Cedric|     6|34359738368|
| Harry|     4|77309411328|
+------+------+-----------+



In [19]:
temp_col2.show()

+--------------------+-----------+
|        player_stats|      index|
+--------------------+-----------+
|{team -> HufflePu...|34359738368|
|{team -> Gryffind...|77309411328|
+--------------------+-----------+



In [20]:
temp_col1.join(temp_col2, on="index").show()

+-----------+------+------+--------------------+
|      index|  name|s_year|        player_stats|
+-----------+------+------+--------------------+
|34359738368|Cedric|     6|{team -> HufflePu...|
|77309411328| Harry|     4|{team -> Gryffind...|
+-----------+------+------+--------------------+



In [21]:
df.unionAll(other_df)

AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only be performed on inputs with the same number of columns, but the first input has 2 columns and the second input has 1 columns.;
'Union false, false
:- LogicalRDD [name#0, s_year#1L], false
+- LogicalRDD [player_stats#13], false


In [90]:
other_df.union(df)

AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only be performed on inputs with the same number of columns, but the first input has 1 columns and the second input has 2 columns.;
'Union false, false
:- LogicalRDD [player_stats#100], false
+- LogicalRDD [name#0, s_year#1L], false


In [80]:
other_col = df.withColumn("player_stats", other_df)


PySparkTypeError: [NOT_COLUMN] Argument `col` should be a Column, got DataFrame.

In [84]:
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit, col, struct, array

# # Create a SparkSession
# spark = SparkSession.builder.appName("CreateColumnExample").getOrCreate()

# Example data
new_column = [
    {"team": "HufflePuff", "avg_SPI": 8.2},
    {"team": "Gryffindor", "avg_SPI": 7.8}
]

# Convert the list of Rows into a list of dictionaries
new_column_data = [row for row in new_column]

# Create a DataFrame to demonstrate adding the column
datax = [("Team1",), ("Team2",)]
dfx = spark_sess.createDataFrame(data, ["existing_column"])

# Add the new column as an array of structs
new_col = array([lit(row).alias(str(idx)) for idx, row in enumerate(new_column_data)])
dfx = dfx.withColumn("new_column", new_col)

# Show the resulting DataFrame
dfx.show(truncate=False)


SparkRuntimeException: [UNSUPPORTED_FEATURE.LITERAL_TYPE] The feature is not supported: Literal for '{team=HufflePuff, avg_SPI=8.2}' of class java.util.HashMap.

In [53]:
df.withColumn("x", other_df)

PySparkTypeError: [NOT_COLUMN] Argument `col` should be a Column, got DataFrame.

In [None]:
df.withColumn(new_column, lit("player_stats"))
df.withColumn(new_column, col("player_stats"))

Py4JError: An error occurred while calling o45.withColumn. Trace:
py4j.Py4JException: Method withColumn([class java.util.ArrayList, class org.apache.spark.sql.Column]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)



In [26]:
from pyspark.sql.types import MapType, StringType
from pyspark.sql.functions import udf, col, monotonically_increasing_id


In [34]:
# Convert player stats to a JSON string before adding it to DataFrame
def get_player_stats(match_url):
    # Extract the player stats based on the match_url (already scraped)
    match_index = new_column.index((match_url,))
    return new_column[match_index]

get_player_stats_udf = udf(get_player_stats, MapType(StringType(), StringType()))


# # Add the player stats as a MapType column
# match_df = df.withColumn("player_stats", get_player_stats_udf("match_url"))

# # Convert the MapType column to a JSON string column
# match_df_with_json = df.withColumn("player_stats_json", to_json(col("player_stats")))


In [35]:
get_player_stats_udf("match_url")


Column<'get_player_stats(match_url)'>

In [110]:
x = "{'age': '[46,36,75]'}"

In [111]:
x

"{'age': '[46,36,75]'}"

In [115]:
import json
import ast

In [116]:
# y = json.loads(x)
y = ast.literal_eval(x)
y

{'age': '[46,36,75]'}

In [117]:
y

{'age': '[46,36,75]'}

In [118]:
type(y)

dict

In [120]:
# for i in zip(y.keys(), y.values()):
#     print(i)

('age', '[46,36,75]')


In [124]:
for i in y.values():
    j = ast.literal_eval(i)
    print(j, type(j))

[46, 36, 75] <class 'list'>


In [125]:
y

{'age': '[46,36,75]'}

In [127]:
y.values()

dict_values(['[46,36,75]'])

In [22]:
# start here

In [23]:
from pyspark.sql.functions import col, from_json, expr
from pyspark.sql.types import StructType, StringType, ArrayType, FloatType, StructField, IntegerType

In [24]:
competition_name = "mls"
competition_year = 2024

In [25]:
spark_sess = SparkSession.builder.appName("PlayerStatsSession").getOrCreate()

25/01/01 09:36:36 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [26]:
df = spark_sess.read.csv(f"/Users/druestaples/projects/soccer_mvp_project/MlsMvp/{competition_name}_{competition_year}_player_stats.csv", header=True, inferSchema=True)
print(df.show())


+--------------------+----------------+----------------+--------------------+
|           match_url|competition_name|competition_year|        player_stats|
+--------------------+----------------+----------------+--------------------+
|https://www.fotmo...|             mls|            2024|{"Shirt_Numbers":...|
|https://www.fotmo...|             mls|            2024|{"Shirt_Numbers":...|
|https://www.fotmo...|             mls|            2024|{"Shirt_Numbers":...|
|https://www.fotmo...|             mls|            2024|{"Shirt_Numbers":...|
|https://www.fotmo...|             mls|            2024|{"Shirt_Numbers":...|
|https://www.fotmo...|             mls|            2024|{"Shirt_Numbers":...|
|https://www.fotmo...|             mls|            2024|{"Shirt_Numbers":...|
|https://www.fotmo...|             mls|            2024|{"Shirt_Numbers":...|
|https://www.fotmo...|             mls|            2024|{"Shirt_Numbers":...|
|https://www.fotmo...|             mls|            2024|{"Shirt_

In [27]:
g = df.select(df.player_stats)
g

DataFrame[player_stats: string]

In [28]:
g.show()

+--------------------+
|        player_stats|
+--------------------+
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
|{"Shirt_Numbers":...|
+--------------------+
only showing top 20 rows



In [29]:
match_rows = g.collect()
match_rows

[Row(player_stats='{"Shirt_Numbers":"[1, 2, 6, 27, 18, 24, 5, 20, 10, 9, 16, 9, 11, 7, 8, 13, 6, 2, 4, 15, 14, 18, 41, 7, 32, 29, 17, 92]","SPI_Scores":"[7.3, 7.2, 7.6, 7.5, 7.7, 6.9, 7.8, 8.6, 8.5, 7.9, 8.4, 6.4, 5.3, 7.0, 6.9, 6.3, 6.9, 6.5, 6.6, 6.5, 6.8, 5.9, 6.3, 6.0, 6.1, 6.1, 6.1, 6.3]","Player_Names":"[Drake Callender, DeAndre Yedlin, Tomas Aviles, Serhiy Kryvtsov, Jordi Alba, Julian Gressel, Sergio Busquets, Diego Gomez, Lionel Messi, Luis Suárez, Robert Taylor, Cristian Arango, Carlos Gomez, Pablo Ruiz, Diego Luna, Nelson Palacio, Braian Ojeda, Andrew Brody, Brayan Vera, Justen Glad, Emeka Eneli, Zac MacMath, David Ruiz, Jean Mota, Noah Allen, Anderson Julio, Fidel Barajas, Noel Caliskan]"}'),
 Row(player_stats='{"Shirt_Numbers":"[49, 24, 13, 12, 22, 6, 55, 17, 10, 26, 16, 7, 28, 10, 16, 11, 31, 27, 26, 5, 15, 76, 35, 27, 8, 9, 7, 9, 20, 33]","SPI_Scores":"[2.9, 5.4, 3.8, 6.0, 4.4, 5.1, 6.0, 6.4, 8.2, 6.4, 7.1, 7.4, 8.3, 8.5, 7.5, 7.6, 7.6, 7.7, 7.6, 8.3, 7.9, 8.1, 6.8, 6.5, 

In [30]:
import json
import ast

"Player_Names":"[Drake Callender, DeAndre Yedlin, Tomas Aviles, Serhiy Kryvtsov, Jordi Alba, Julian Gressel, Sergio Busquets, Diego Gomez, Lionel Messi, Luis Suárez, Robert Taylor, Cristian Arango, Carlos Gomez, Pablo Ruiz, Diego Luna, Nelson Palacio, Braian Ojeda, Andrew Brody, Brayan Vera, Justen Glad, Emeka Eneli, Zac MacMath, David Ruiz, Jean Mota, Noah Allen, Anderson Julio, Fidel Barajas, Noel Caliskan]"

In [31]:
all_player_names = []
all_shirt_numbers = []
all_spi_scores = []
for row in match_rows:
    # print(i, "\n") # Row()
    player_info = json.loads(row.player_stats)
    # print(ast.literal_eval(player_info["Player_Names"]))

    raw_data = player_info["Player_Names"]
    corrected_data = "[" + ", ".join(f'"{name.strip()}"' for name in raw_data.strip("[]").split(", ") if name) + "]"
    player_names = ast.literal_eval(corrected_data)
    # print(player_names)
    all_player_names.extend(player_names)


    match_shirt_numbers = json.loads(player_info["Shirt_Numbers"])
    all_shirt_numbers.extend(match_shirt_numbers) 

    match_spi_scores = json.loads(player_info["SPI_Scores"])
    all_spi_scores.extend(match_spi_scores)

# print(all_player_names)
print(all_spi_scores)
print(len(all_shirt_numbers),  len(all_spi_scores), len(all_player_names))

[7.3, 7.2, 7.6, 7.5, 7.7, 6.9, 7.8, 8.6, 8.5, 7.9, 8.4, 6.4, 5.3, 7.0, 6.9, 6.3, 6.9, 6.5, 6.6, 6.5, 6.8, 5.9, 6.3, 6.0, 6.1, 6.1, 6.1, 6.3, 2.9, 5.4, 3.8, 6.0, 4.4, 5.1, 6.0, 6.4, 8.2, 6.4, 7.1, 7.4, 8.3, 8.5, 7.5, 7.6, 7.6, 7.7, 7.6, 8.3, 7.9, 8.1, 6.8, 6.5, 6.4, 6.1, 6.2, 6.2, 7.1, 6.8, 6.9, 7.5, 7.2, 8.4, 8.1, 7.8, 7.9, 7.3, 7.9, 7.7, 6.9, 6.2, 6.2, 6.0, 6.1, 6.2, 7.3, 6.3, 6.3, 6.6, 6.4, 5.9, 7.4, 6.3, 5.8, 6.2, 5.8, 7.6, 7.8, 8.1, 7.2, 8.9, 7.5, 7.6, 6.7, 7.8, 7.3, 7.2, 6.5, 6.8, 7.8, 7.7, 6.0, 6.2, 6.7, 6.8, 6.6, 6.9, 6.4, 7.3, 6.1, 6.3, 6.3, 8.6, 7.6, 7.7, 7.1, 7.3, 7.4, 7.0, 6.8, 8.6, 8.8, 6.9, 6.0, 6.6, 6.6, 6.4, 6.9, 7.4, 5.8, 6.4, 5.4, 5.7, 4.8, 6.3, 6.2, 6.4, 6.3, 6.2, 5.8, 6.1, 5.6, 6.4, 7.1, 7.3, 5.9, 5.9, 6.8, 6.4, 4.6, 8.0, 8.0, 8.8, 8.0, 7.3, 7.2, 8.0, 7.2, 7.6, 7.6, 7.5, 6.0, 6.1, 6.4, 6.1, 6.2, 6.0, 6.0, 6.1, 5.8, 8.2, 7.7, 7.3, 7.1, 7.0, 7.4, 7.5, 7.6, 9.0, 6.7, 8.0, 6.3, 7.1, 6.9, 6.5, 5.3, 6.6, 5.4, 5.3, 5.2, 5.8, 6.0, 6.3, 7.6, 8.5, 4.9, 5.9, 6.4, 6.3, 6.3, 6.3,

In [32]:
data = list(zip(all_player_names, all_shirt_numbers, all_spi_scores))

In [33]:
new_schema = StructType()
new_schema.add(StructField(name="player_name", dataType=StringType()))
new_schema.add(StructField(name="player_number", dataType=IntegerType()))
new_schema.add(StructField(name="spi_score", dataType=FloatType()))
new_df = spark_sess.createDataFrame(data, schema=new_schema)
new_df

DataFrame[player_name: string, player_number: int, spi_score: float]

In [34]:
new_df.show()

+---------------+-------------+---------+
|    player_name|player_number|spi_score|
+---------------+-------------+---------+
|Drake Callender|            1|      7.3|
| DeAndre Yedlin|            2|      7.2|
|   Tomas Aviles|            6|      7.6|
|Serhiy Kryvtsov|           27|      7.5|
|     Jordi Alba|           18|      7.7|
| Julian Gressel|           24|      6.9|
|Sergio Busquets|            5|      7.8|
|    Diego Gomez|           20|      8.6|
|   Lionel Messi|           10|      8.5|
|    Luis Suárez|            9|      7.9|
|  Robert Taylor|           16|      8.4|
|Cristian Arango|            9|      6.4|
|   Carlos Gomez|           11|      5.3|
|     Pablo Ruiz|            7|      7.0|
|     Diego Luna|            8|      6.9|
| Nelson Palacio|           13|      6.3|
|   Braian Ojeda|            6|      6.9|
|   Andrew Brody|            2|      6.5|
|    Brayan Vera|            4|      6.6|
|    Justen Glad|           15|      6.5|
+---------------+-------------+---

In [None]:
# Find average SPI for each player
# Find number of games for each player
# Find max average SPI
# Find max game count



In [44]:
matches_played_per_player = new_df.groupBy(["player_name", "player_number"]).count()
matches_played_per_player.show()

+------------------+-------------+-----+
|       player_name|player_number|count|
+------------------+-------------+-----+
|      Rudy Camacho|            4|   18|
|        Ian Murphy|           32|   11|
|     Jonathan Dean|           24|    7|
| Keegan Rosenberry|            2|   16|
|       Diego Rossi|           10|   22|
|   Julian Carranza|            9|    5|
|   Cristian Dajome|           11|   20|
|   Drake Callender|            1|   19|
|     Fidel Barajas|           17|    5|
|       Caden Clark|           37|    7|
|     Rafael Santos|            3|   24|
|       Eric Miller|           15|    9|
|       Brayan Vera|            4|   15|
|         Robin Lod|           17|   22|
|       John Nelson|           14|   22|
|Robert Castellanos|           19|    7|
|    Lalas Abubakar|            6|   11|
|    Victor Wanyama|            2|    3|
|  Nökkvi Thorisson|           29|   18|
|  Jeorgio Kocevski|           33|    2|
+------------------+-------------+-----+
only showing top

In [51]:
max_games_played = matches_played_per_player.agg({"count": "max"})
max_games_played.show()

+----------+
|max(count)|
+----------+
|        29|
+----------+



In [107]:
max_games_played_var = max_games_played.collect()[0]["max(count)"]
max_games_played_var

29

In [45]:
avg_spi_per_player = new_df.groupBy(["player_name", "player_number"]).agg({"spi_score": "avg"})
avg_spi_per_player.show()

+------------------+-------------+------------------+
|       player_name|player_number|    avg(spi_score)|
+------------------+-------------+------------------+
|      Rudy Camacho|            4| 6.594444486829969|
|        Ian Murphy|           32| 6.818181731484153|
|     Jonathan Dean|           24|6.6571429797581265|
| Keegan Rosenberry|            2|6.4937500059604645|
|       Diego Rossi|           10| 7.109090913425792|
|   Julian Carranza|            9|               7.5|
|   Cristian Dajome|           11|6.5150000095367435|
|   Drake Callender|            1| 6.621052666714317|
|     Fidel Barajas|           17|6.3799999237060545|
|       Caden Clark|           37| 6.299999918256487|
|     Rafael Santos|            3| 7.204166670640309|
|       Eric Miller|           15| 6.511111153496636|
|       Brayan Vera|            4| 6.793333371480306|
|         Robin Lod|           17| 7.159091060811823|
|       John Nelson|           14| 7.063636389645663|
|Robert Castellanos|        

In [56]:
max_avg_spi = avg_spi_per_player.agg({"avg(spi_score)": "max"})
max_avg_spi.show()

+-------------------+
|max(avg(spi_score))|
+-------------------+
|  8.600000381469727|
+-------------------+



In [124]:
max_avg_spi_var = round(max_avg_spi.collect()[0]["max(avg(spi_score))"], 1)
max_avg_spi_var

8.6

In [74]:
from pyspark.sql.functions import desc

In [76]:
# Supported join types include: 'inner', 'outer', 'full', 'fullouter', 'full_outer', 'leftouter', 'left', 'left_outer', 'rightouter', 'right', 'right_outer', 'leftsemi', 'left_semi', 'semi', 'leftanti', 'left_anti', 'anti', 'cross'.
all_player_stats_df = matches_played_per_player.join(avg_spi_per_player, how="inner", on=["player_name", "player_number"]).sort(desc("avg(spi_score)"))
all_player_stats_df.show()

+----------------+-------------+-----+------------------+
|     player_name|player_number|count|    avg(spi_score)|
+----------------+-------------+-----+------------------+
|    Oscar Ustari|           19|    2| 8.600000381469727|
|      Leo Afonso|           73|    1|               8.5|
|     Ricard Puig|           10|   22|  8.39545471018011|
|Evander Ferreira|           10|   14| 8.264285836900983|
|  Elliot Panicco|           30|    4| 8.199999809265137|
|    Lionel Messi|           10|   16| 8.112499952316284|
|     Andrew Rick|           76|    2| 8.100000381469727|
|   Marcel Hartel|           17|   16| 8.006250023841858|
|   Kelvin Yeboah|            9|   15|7.9866666475931805|
|     Gabriel Pec|           11|   23|7.9695652671482256|
|  Juan Hernández|            9|   20|  7.92000002861023|
|    Aidan Morris|            8|    3| 7.866666793823242|
|   Emil Forsberg|           10|   15|7.7933334032694495|
|   Albert Rusnak|           11|   26| 7.742307699643648|
| Cedric Teuch

In [147]:
df_with_scaled_matches = all_player_stats_df.withColumn("matches_scaled", col("count") / max_games_played_var)
df_with_scaled_matches.show()

+----------------+-------------+-----+------------------+--------------------+
|     player_name|player_number|count|    avg(spi_score)|      matches_scaled|
+----------------+-------------+-----+------------------+--------------------+
|    Oscar Ustari|           19|    2| 8.600000381469727| 0.06896551724137931|
|      Leo Afonso|           73|    1|               8.5|0.034482758620689655|
|     Ricard Puig|           10|   22|  8.39545471018011|  0.7586206896551724|
|Evander Ferreira|           10|   14| 8.264285836900983|  0.4827586206896552|
|  Elliot Panicco|           30|    4| 8.199999809265137| 0.13793103448275862|
|    Lionel Messi|           10|   16| 8.112499952316284|  0.5517241379310345|
|     Andrew Rick|           76|    2| 8.100000381469727| 0.06896551724137931|
|   Marcel Hartel|           17|   16| 8.006250023841858|  0.5517241379310345|
|   Kelvin Yeboah|            9|   15|7.9866666475931805|  0.5172413793103449|
|     Gabriel Pec|           11|   23|7.969565267148

In [127]:
all_player_stats_df.printSchema()

root
 |-- player_name: string (nullable = true)
 |-- player_number: integer (nullable = true)
 |-- count: long (nullable = false)
 |-- avg(spi_score): double (nullable = true)



In [141]:
from pyspark.sql.functions import round as pyspark_round

In [None]:
df_with_scaled_spi = all_player_stats_df.withColumn("spi_scaled", pyspark_round(col("avg(spi_score)") / float(max_avg_spi_var),3))
df_with_scaled_spi.show()

+----------------+-------------+-----+------------------+----------+
|     player_name|player_number|count|    avg(spi_score)|spi_scaled|
+----------------+-------------+-----+------------------+----------+
|    Oscar Ustari|           19|    2| 8.600000381469727|       1.0|
|      Leo Afonso|           73|    1|               8.5|     0.988|
|     Ricard Puig|           10|   22|  8.39545471018011|     0.976|
|Evander Ferreira|           10|   14| 8.264285836900983|     0.961|
|  Elliot Panicco|           30|    4| 8.199999809265137|     0.953|
|    Lionel Messi|           10|   16| 8.112499952316284|     0.943|
|     Andrew Rick|           76|    2| 8.100000381469727|     0.942|
|   Marcel Hartel|           17|   16| 8.006250023841858|     0.931|
|   Kelvin Yeboah|            9|   15|7.9866666475931805|     0.929|
|     Gabriel Pec|           11|   23|7.9695652671482256|     0.927|
|  Juan Hernández|            9|   20|  7.92000002861023|     0.921|
|    Aidan Morris|            8|  

In [160]:
min_required_matches = max_games_played_var // 2
min_required_matches

14

In [163]:
# spi_scaled = x^n / max_games_played, where n will be included in other function that will allow me to scale up by n times (exponentionally) then scaled back down by the max scaled-up spi.
mvp_df = df_with_scaled_matches.join(df_with_scaled_spi, on=["player_name", "player_number", "count", "avg(spi_score)"]).filter(col("count") >= min_required_matches).sort(desc("spi_scaled"))
mvp_df.show()

                                                                                

+-----------------+-------------+-----+------------------+------------------+----------+
|      player_name|player_number|count|    avg(spi_score)|    matches_scaled|spi_scaled|
+-----------------+-------------+-----+------------------+------------------+----------+
|      Ricard Puig|           10|   22|  8.39545471018011|0.7586206896551724|     0.976|
| Evander Ferreira|           10|   14| 8.264285836900983|0.4827586206896552|     0.961|
|     Lionel Messi|           10|   16| 8.112499952316284|0.5517241379310345|     0.943|
|    Marcel Hartel|           17|   16| 8.006250023841858|0.5517241379310345|     0.931|
|    Kelvin Yeboah|            9|   15|7.9866666475931805|0.5172413793103449|     0.929|
|      Gabriel Pec|           11|   23|7.9695652671482256|0.7931034482758621|     0.927|
|   Juan Hernández|            9|   20|  7.92000002861023|0.6896551724137931|     0.921|
|    Emil Forsberg|           10|   15|7.7933334032694495|0.5172413793103449|     0.906|
|  Cedric Teuchert|  