In [1]:
#Define load and save path
load_path = f"abfss://raw@azrawdatalake4djynq.dfs.core.windows.net/Poker Data/PokerGameDetails/"

save_path = f"abfss://curated@azcurateddatalake4djynq.dfs.core.windows.net/Poker Data/"



# Load Dataframe from External Tables created in SQL Serverless

In [2]:
%%pyspark

from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType, DecimalType, TimestampType

game_schema = StructType(fields=[StructField("GameId", StringType(), True),
                                StructField("TableName", StringType(), True),
                                StructField("Board", StringType(), True),
                                StructField("GameTime", TimestampType(), True),
                                StructField("GameActionId", LongType(), True),
                                StructField("Player", StringType(), True),
                                StructField("StreetName", StringType(), True),
                                StructField("Action", StringType(), True),
                                StructField("Amount", DecimalType(9,2), True),
                                StructField("Cards", StringType(), True),
                                StructField("StreetSort", IntegerType(), True)
])

df = spark.read.load(load_path, format='parquet', schema=game_schema)

df = df.withColumn("year",df.GameTime.substr(1,4)) \
    .withColumn("month", df.GameTime.substr(6, 2)) \
    .withColumn("day", df.GameTime.substr(9,2)) 
    
display(df.limit(10))

# Persist transformed data back to lake

In [3]:
%%pyspark

#Write data to target location partition by year, month, & day in Delta
from pyspark.sql.functions import lit

gd_file_path = save_path + "PokerGameDetails/"

df_output = spark.read.load(load_path, format='parquet', schema=game_schema)

df_output = df_output.withColumn("year",df_output.GameTime.substr(1,4)) \
    .withColumn("month", df_output.GameTime.substr(6, 2)) \
    .withColumn("day", df_output.GameTime.substr(9,2)) 

df_output.write.partitionBy("year","month","day") \
    .format("delta") \
    .mode("overwrite") \
    .save(gd_file_path)

# Create SQL View over new dataset

In [4]:
df_output.createOrReplaceTempView('vwDeltaGameDetails')

Create a temporary view in the Spark database to simplify shaping the data.  Here we are looking at individual game details and flagging whether the player
voluntarily put money in the pot, or did a preflop raise.

In [5]:
%%sql
CREATE OR REPLACE TEMPORARY VIEW PlayerGameSummary AS 
WITH T1 AS (
SELECT player, GameId, gameTime
	, CAST(gameTime AS DATE) AS GameDate
	, MAX(
			CASE WHEN action = 'bet' THEN 1
				WHEN action = 'raise' THEN 1
				WHEN action = 'call' THEN 1
				ELSE 0
				END 
		 ) AS vpipct
	, MAX(
			CASE WHEN action = 'raise' THEN 1 
				ELSE 0
				END
		) AS pfrCt
	, 1 AS GameCt
FROM vwDeltaGameDetails
WHERE streetName = 'Pre-flop'
GROUP BY player, GameId, gameTime
)
, T2 AS (
SELECT player, GameId, gameTime
	, SUM(amount) AS Profit
FROM vwDeltaGameDetails
GROUP BY player, GameId, gameTime
)
SELECT CAST(T1.GameId AS VARCHAR(100)) AS GameId
	, CAST(T1.Player AS VARCHAR(100)) AS Player
	, T1.GameDate
	, SUM(T1.vpipct) AS vpipCt
	, SUM(T1.pfrct)  AS pfrCt
	, SUM(T1.GameCt) AS GameCt
	, SUM(T2.Profit) AS Winnings
FROM T1
INNER JOIN T2 ON T1.GameId = T2.GameId AND
	T1.Player = T2.Player
GROUP BY T1.GameId, T1.Player, T1.GameDate

In [6]:
import pandas as pd
import numpy as np
import plotly
import random
import plotly .express as px
from sklearn import preprocessing
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt


df = spark.sql("SELECT  player, SUM(vpipCt) as VPIP_Ct, SUM(pfrCt) as PFR_Ct, SUM(gameCt) as Game_Ct, SUM(Winnings) as Winnings, (SUM(Winnings)/SUM(gameCt))*100 as WinningsPer100 FROM PlayerGameSummary where player <> 'Player000009' GROUP BY player").toPandas()
df.sort_values(by = 'player', inplace = True)

In [7]:
# Manufacture new columns in the dataset

#PFR -> How often does the player raise before the flop
df['PFR'] = df['PFR_Ct']/df['Game_Ct']
#VP$P -> How often does the player voluntarily put money in the pot (ie. bet, call, raise, but not blind)
df['VP$IP'] = df['VPIP_Ct']/df['Game_Ct']
#pfr_rate -> When a player is betting on a game how often is it a pre flop raise.  This is a measure of aggression.
df['pfr_rate'] = df['PFR'] / df['VP$IP']
df['playertype'] = ''
df['labels'] = -1
# Fill in blanks
df['pfr_rate'] = df['pfr_rate'].fillna(0)

In [8]:
#split out the 'Silver' player types from the 'Bronze'
df_S = df[(df['Game_Ct'] >= 500) & (df['PFR_Ct'] > 0) & (df['VPIP_Ct'] > 0)].copy()
df_S['playertype'] = 'Silver'
df_B = df[(df['Game_Ct'] < 500) | (df['PFR_Ct'] == 0) | (df['VPIP_Ct'] == 0)].copy()
df_B['playertype'] = 'Bronze'

In [9]:
np.random.seed(42)
#Limit attributes for K-Means clustering
attributes = ['pfr_rate','VP$IP']

# Create DataFrame for K Means Model
x = df_S[attributes]

In [10]:
#Measure distortion to see the optimal number of logical clusters in the data


distortions = []
for i in range(1, 11):
    km = KMeans(
        n_clusters=i, init='k-means++',
        n_init=25, max_iter=300,
        tol=1e-04, random_state=42
    )
    km.fit(x)
    distortions.append(km.inertia_)

# plot
plt.plot(range(1, 11), distortions, marker='o')
plt.xlabel('Number of clusters')
plt.ylabel('Distortion')
plt.show()

In [11]:
#SPectral Clustering gave better results 
import pickle
from sklearn.cluster import SpectralClustering

# set seed
np.random.seed(42)
# define the model
model = SpectralClustering(n_clusters=4, random_state = 42)
# fit the model
model.fit(x)

#save model
model_s = pickle.dumps(model)

#load model
model_l = pickle.loads(model_s)

# assign a cluster to each example
yhat = model_l.fit_predict(x)


df_S['labels'] = yhat
df_S['labels'] = df_S['labels'].astype('category')
df_S['Winnings'] = df_S['Winnings'].astype('float64')
fig = px.scatter(df_S, x = "pfr_rate", y = "VP$IP", color = 'labels', title = str('Cluster Algorithm: ' + 'Spectral Clustering'))
alg = plotly.offline.plot(fig, output_type='div')
displayHTML(alg)



In [12]:
print('Calling Station =  low pfr_rate and high VP$IP')
print('Tight Aggressive = high pfr_rate and low VP$IP')
print('Rock = low pfr_rate and low VP$IP')
print('Loose Aggressive = high pfr_rate and high VP$IP')
print()

In [13]:
df_S_agg = df_S.groupby('labels', as_index= False)[['Winnings','Game_Ct','pfr_rate','VP$IP']].agg(['count','sum','mean'])
df_S_agg.columns = df_S_agg.columns.map(lambda x: f'{x[0]}_{x[1]}')
df_S_agg['Avg_winnings'] = (df_S_agg['Winnings_sum']/df_S_agg['Game_Ct_sum'])*100
agg_col_select = ['Winnings_count', 'pfr_rate_mean', 'VP$IP_mean', 'Avg_winnings','Winnings_sum'  ]
df_S_summary = df_S_agg[agg_col_select].reset_index()
display(df_S_summary)

In [14]:
#Get the two attributes from the noobs dataframe and predict
y = df_B[attributes] 
#labels = model.fit_predict(y)
labels = 99   # for all the players with less than 500 games or 'PFR_Ct' == 0 or 'VPIP_Ct' == 0

#add the labels into the noobs dataframe
df_B['labels'] = labels
df_B['labels'] = df_B['labels'].astype('category')
df_B['Winnings'] = df_B['Winnings'].astype('float64')
print(df_B.head())

# Persist the dataframe

Convert the pandas dataframe to Spark, then save it as table to Spark, which will create the underlying delta structure.

In [15]:
#Merge the data frames before persisting the results
df_allplayers = df_S.append(df_B)

In [16]:
#Convert the dataframe from pandas -> spark.
df_output = spark.createDataFrame(df_allplayers)

#Output the data frame as a spark table.   We expect to run this often so overwrite whatever is there.

ps_file_path = save_path + "PlayerSummary/"
df_output.write.format('delta').mode('overwrite').save(ps_file_path)

# Persist model data to the Lake database table

Create the Lake database [pokerdata] and persists the model to the table [pokerdata].[playersummary].

In [22]:
%%sql
CREATE DATABASE IF NOT EXISTS lakePokerData

In [23]:
%%pyspark
df = spark.read.load(ps_file_path, format='delta')
df.write.mode("overwrite").saveAsTable("lakePokerData.playerSummary")