In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import monotonically_increasing_id

#The chances of taking a card based on the card of the opponent and the current hand.
#It help to understand how the model is working

spark = SparkSession.builder \
    .appName("TGVD_GenericQuery") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

path_training = "CardsParquetData/trained_blackjack.parquet"
path_match = "CardsParquetData/played_blackjack.parquet"

df_train = spark.read.parquet(path_training)
df_play = spark.read.parquet(path_match)
df_train = df_train.withColumn("index", (monotonically_increasing_id() + 1))
df_play = df_play.withColumn("index", (monotonically_increasing_id() + 1))

df_train.show(5)
df_play.show(5)

25/05/16 22:11:59 WARN Utils: Your hostname, ASUS-DIEGO resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/05/16 22:11:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/16 22:12:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/16 22:12:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/05/16 22:12:01 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/05/16 22:12:01 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/05/16 22:12:01 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
                                                                                

+-------------------+-----------+--------+--------+--------+--------+------+------+------+------+-----+
|          Timestamp|Shown_cards|  Hand 0|  Hand 1|  Hand 2|  Hand 3|Hand 4|Hand 5|Hand 6|Hand 7|index|
+-------------------+-----------+--------+--------+--------+--------+------+------+------+------+-----+
|2025-05-12 19:24:20|    [10, 4]| [10, 7]|[10, 11]|[10, 21]|[10, 21]|  NULL|  NULL|  NULL|  NULL|    1|
|2025-05-12 19:24:20|     [4, 7]|[13, 12]|[23, 21]|    NULL|    NULL|  NULL|  NULL|  NULL|  NULL|    2|
|2025-05-12 19:24:20|   [-1, 10]|[14, 18]|[14, 29]|    NULL|    NULL|  NULL|  NULL|  NULL|  NULL|    3|
|2025-05-12 19:24:20|    [8, 12]|[20, 12]|[22, 12]|    NULL|    NULL|  NULL|  NULL|  NULL|  NULL|    4|
|2025-05-12 19:24:20|     [3, 6]| [3, 15]| [3, 23]|    NULL|    NULL|  NULL|  NULL|  NULL|  NULL|    5|
+-------------------+-----------+--------+--------+--------+--------+------+------+------+------+-----+
only showing top 5 rows

+-------------------+-----------+------

In [5]:
from pyspark.sql.functions import floor

CHUNK_SIZE = 500
df_clean = df_train.select(
    col("index"),
    col("Shown_cards").alias("Hand_-1"),
    col("Hand 0").alias("Hand_0"),
    col("Hand 1").alias("Hand_1"),
    col("Hand 2").alias("Hand_2"),
    col("Hand 3").alias("Hand_3"),
    col("Hand 4").alias("Hand_4"),
    col("Hand 5").alias("Hand_5"),
    col("Hand 6").alias("Hand_6"),
    col("Hand 7").alias("Hand_7")
)
df_chunks = df_clean.withColumn("Chunk Number", floor(col("index")/CHUNK_SIZE))

from pyspark.sql.functions import expr, when, coalesce

hand_cols = ["Hand_0", "Hand_1", "Hand_2", "Hand_3", "Hand_4", "Hand_5", "Hand_6", "Hand_7"]
df_moves_train = df_chunks.withColumn("n_hits",
    1 + sum([
        when((col(f"Hand_{i-1}")[0].isNotNull()) & (col(f"Hand_{i-1}")[0] != col(f"Hand_{i-2}")[0]), 1).otherwise(0)
        for i in range(1, len(hand_cols))
    ])
)

df_moves_train.show(5)

df_clean = df_play.select(
    col("index"),
    col("Shown_cards").alias("Hand_-1"),
    col("Hand 0").alias("Hand_0"),
    col("Hand 1").alias("Hand_1"),
    col("Hand 2").alias("Hand_2"),
    col("Hand 3").alias("Hand_3"),
    col("Hand 4").alias("Hand_4"),
    col("Hand 5").alias("Hand_5")
)
df_chunks = df_clean.withColumn("Chunk Number", floor(col("index")/CHUNK_SIZE))

from pyspark.sql.functions import expr, when, coalesce

hand_cols = ["Hand_0", "Hand_1", "Hand_2", "Hand_3", "Hand_4", "Hand_5"]
df_moves_play = df_chunks.withColumn("n_hits",
    1 + sum([
        when((col(f"Hand_{i-1}")[0].isNotNull()) & (col(f"Hand_{i-1}")[0] != col(f"Hand_{i-2}")[0]), 1).otherwise(0)
        for i in range(1, len(hand_cols))
    ])
)

df_moves_play.show(5)

+-----+--------+--------+--------+--------+--------+------+------+------+------+------------+------+
|index| Hand_-1|  Hand_0|  Hand_1|  Hand_2|  Hand_3|Hand_4|Hand_5|Hand_6|Hand_7|Chunk Number|n_hits|
+-----+--------+--------+--------+--------+--------+------+------+------+------+------------+------+
|    1| [10, 4]| [10, 7]|[10, 11]|[10, 21]|[10, 21]|  NULL|  NULL|  NULL|  NULL|           0|     1|
|    2|  [4, 7]|[13, 12]|[23, 21]|    NULL|    NULL|  NULL|  NULL|  NULL|  NULL|           0|     3|
|    3|[-1, 10]|[14, 18]|[14, 29]|    NULL|    NULL|  NULL|  NULL|  NULL|  NULL|           0|     2|
|    4| [8, 12]|[20, 12]|[22, 12]|    NULL|    NULL|  NULL|  NULL|  NULL|  NULL|           0|     3|
|    5|  [3, 6]| [3, 15]| [3, 23]|    NULL|    NULL|  NULL|  NULL|  NULL|  NULL|           0|     1|
+-----+--------+--------+--------+--------+--------+------+------+------+------+------------+------+
only showing top 5 rows

+-----+--------+--------+--------+--------+--------+--------+-----

In [9]:
from pyspark.sql.functions import variance

'''
Variance:
    - It helps you know if for a combination of (my card, opponent's card) the number of hits is constant (low variance, consistent decisions) or changes a lot (high variance, doubtful decisions).

Percentile:
    - You can see if the number of hits is generally low, medium, or high, without depending on the average (which is a restricted function).

Collect Set:
    - It gives you the different number of hits the model has made for a combination (my card, opponent's card). If there are many different options, it's a sign of inconsistency or exploration.
'''

df_moves_train = df_moves_train.withColumn("Agent_1st_Card", col("Hand_-1")[0])
df_moves_train = df_moves_train.withColumn("Opponent_1st_Card", col("Hand_-1")[1])

df_hits_stats = df_moves_train.groupBy("Agent_1st_Card", "Opponent_1st_Card", "Chunk Number") \
    .agg(
        variance("n_hits").alias("Hits_Variance"),
        expr("percentile(n_hits, array(0.25, 0.5, 0.75)) as Hits_Quartiles"),
        expr("collect_set(n_hits) as Unique_Hits")
    )

df_hits_stats.show()

df_moves_play = df_moves_play.withColumn("Agent_1st_Card", col("Hand_-1")[0])
df_moves_play = df_moves_play.withColumn("Opponent_1st_Card", col("Hand_-1")[1])

df_stats_play = df_moves_play.groupBy("Agent_1st_Card", "Opponent_1st_Card") \
    .agg(
        variance("n_hits").alias("Hits_Variance"),
        expr("percentile(n_hits, array(0.25, 0.5, 0.75)) as Hits_Quartiles"),
        expr("collect_set(n_hits) as Unique_Hits")
    )

df_stats_play.show()

+--------------+-----------------+------------+-------------+-----------------+-----------+
|Agent_1st_Card|Opponent_1st_Card|Chunk Number|Hits_Variance|   Hits_Quartiles|Unique_Hits|
+--------------+-----------------+------------+-------------+-----------------+-----------+
|            -1|                0|           3|         NULL|  [2.0, 2.0, 2.0]|        [2]|
|            -1|                1|           1|         NULL|  [2.0, 2.0, 2.0]|        [2]|
|            -1|                2|           0|          0.0|  [2.0, 2.0, 2.0]|        [2]|
|            -1|                3|           0|         NULL|  [2.0, 2.0, 2.0]|        [2]|
|            -1|                3|           3|          0.5|[2.25, 2.5, 2.75]|     [2, 3]|
|            -1|                4|           1|         NULL|  [4.0, 4.0, 4.0]|        [4]|
|            -1|                4|           2|         NULL|  [3.0, 3.0, 3.0]|        [3]|
|            -1|                4|           3|         NULL|  [2.0, 2.0, 2.0]| 