In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, DoubleType
from pyspark.sql.functions import from_json, col, avg, abs, size, split, count, when, rank
from pyspark.sql.window import Window
import findspark
import pandas as pd
import time
import sqlite3


In [2]:
# Establish a connection to SQLite database
conn = sqlite3.connect(r'C:\databases\kpi_data.db')
# Create a cursor object to execute SQL commands
cursor = conn.cursor()

In [3]:
# Initialize the Spark environment
findspark.init()

In [4]:
# List of required JAR files for Spark-Kafka integration
jar_files = ["C:\\Spark\\jars\\spark-sql-kafka-0-10_2.12-3.5.1.jar",
             "C:\\spark\\jars\\kafka-clients-3.7.1.jar",
             "C:\\Spark\\jars\\commons-pool2-2.11.1.jar",
             "C:\\Spark\\jars\\spark-token-provider-kafka-0-10_2.12-3.5.1.jar",
             "C:\\Spark\\jars\\spark-streaming-kafka-0-10-assembly_2.12-3.5.1.jar"]

# Combine all JAR files into a single string
jars = ",".join(jar_files)

# Initialize Spark Session with JAR files
spark = SparkSession.builder \
    .appName("KafkaSparkConsumer") \
    .config("spark.jars", jars) \
    .getOrCreate()

In [5]:
# Define the schema for the JSON data coming from Kafka
schema = StructType([
    StructField("_id", StructType([
        StructField("_data", StringType(), True)
    ]), True),
    StructField("operationType", StringType(), True),
    StructField("clusterTime", StructType([
        StructField("$timestamp", StructType([
            StructField("t", LongType(), True),
            StructField("i", LongType(), True)
        ]), True)
    ]), True),
    StructField("wallTime", StructType([
        StructField("$date", LongType(), True)
    ]), True),
    StructField("fullDocument", StructType([
        StructField("_id", StructType([
            StructField("$oid", StringType(), True)
        ]), True),
        StructField("id", StringType(), True),
        StructField("rated", BooleanType(), True),
        StructField("created_at", DoubleType(), True),
        StructField("last_move_at", DoubleType(), True),
        StructField("turns", LongType(), True),
        StructField("victory_status", StringType(), True),
        StructField("winner", StringType(), True),
        StructField("increment_code", StringType(), True),
        StructField("white_id", StringType(), True),
        StructField("white_rating", LongType(), True),
        StructField("black_id", StringType(), True),
        StructField("black_rating", LongType(), True),
        StructField("moves", StringType(), True),
        StructField("opening_eco", StringType(), True),
        StructField("opening_name", StringType(), True),
        StructField("opening_ply", LongType(), True)
    ]), True)
])

In [6]:
# Define a streaming DataFrame to read from the Kafka topic and parse the JSON data
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "chess-.chess.games") \
    .option("startingOffsets", "earliest") \
    .load() \
    .selectExpr("CAST(value AS STRING) as json_string") 

In [7]:
# Extract the 'fullDocument' field and its nested fields into a DataFrame
chess_streaming_df = df.select(from_json(col("json_string"), schema).alias("data")) \
    .select("data.fullDocument.*")

In [8]:
# Global variable to hold the accumulated Spark DataFrame
chess_df = None

# Function to process each batch and append to the global DataFrame
def process_batch(batch_df, epoch_id):
    global chess_df
    if chess_df is None:
        chess_df = batch_df
    else:
        chess_df = global_spark_df.union(chess_df)

# Write the output of the stream using foreachBatch to process each batch
query = chess_streaming_df.writeStream \
    .outputMode("append") \
    .foreachBatch(process_batch) \
    .start()

# Wait for some time to accumulate data
time.sleep(3)

# Stop the stream after collecting data
query.stop()


In [9]:
# Remove unused columns and print column count before and after removal
print("Number of columns before removing unused columns: ", len(chess_df.columns))
chess_df = chess_df.drop("created_at","last_move_at","rated","white_id","black_id","opening_eco")
print("Number of columns after removing unused columns: ", len(chess_df.columns))

Number of columns before removing unused columns:  17
Number of columns after removing unused columns:  11


In [10]:
# Drop rows with missing values and print counts before and after dropping
print("Number of games before deleting rows with missing values: ",chess_df.count())
chess_df = chess_df.na.drop()
print("Number of games after deleting rows with missing values: ",chess_df.count())

Number of games before deleting rows with missing values:  20058
Number of games after deleting rows with missing values:  20058


In [11]:
# Remove duplicate games based on the 'id' column and print counts before and after removal
print("Number of games before removing duplicates: ", chess_df.count())
chess_df = chess_df.dropDuplicates(["id"])
print("Number of games after removing duplicates: ", chess_df.count())

Number of games before removing duplicates:  20058
Number of games after removing duplicates:  19113


In [12]:
# Calculate the total number of games
total_games = chess_df.count()

In [13]:
data = []

# Calculate the percentage of games won by the white pieces
value = (chess_df.filter(col("winner") == "white").count()/total_games)*100
data.append(("White winning %",value))

# Calculate the percentage of games won by the black pieces
value = (chess_df.filter(col("winner") == "black").count()/total_games)*100
data.append(("Black winning %",value))

# Calculate the percentage of games that ended in a draw
value = (chess_df.filter(col("winner") == "draw").count()/total_games)*100
data.append(("Draw %",value))

# Calculate the average number of turns per game
value = chess_df.select(avg("turns")).collect()[0][0]
data.append(("Average Turns",value))

# Calculate the average number of moves in the opening phase of the game 
value  = chess_df.select(avg("opening_ply")).collect()[0][0]
data.append(("Average Opening Ply",value))

# Loop through different rating difference thresholds (100, 200, 300)
# For each threshold, calculate the percentage of games won by the lower-rated player 
# where the rating difference is greater than the threshold.
for x in [100,200,300]:
    value = (
    (chess_df.filter((col("white_rating") - col("black_rating") > x) & (col("winner") == "black")).count()
     + chess_df.filter((col("black_rating") - col("white_rating") > x) & (col("winner") == "white")).count())
    / chess_df.filter(abs(col("white_rating") - col("black_rating")) > x).count())
    value *= 100
    data.append((f"% of games won by lower rated Player (when rating difference > {x})",value))

# Calculate the percentage of games that ended with scholar's mate (a specific checkmate pattern)
value = (chess_df.filter((size(split(col("moves")," ")) == 7) & (col("winner") == "white") & (col("victory_status") == "mate"))
         .count()/total_games)*100
data.append(("% of games that ended with scholar mate",value))



In [14]:
# Create a DataFrame for the calculated KPIs
schema = StructType([
    StructField("KPI Name",StringType(),True),
    StructField("Value",DoubleType(),True)
])
kpi_df = spark.createDataFrame(data, schema = schema)

# Display the DataFrame
kpi_df.show()

# Convert the DataFrame to a Pandas DataFrame for insertion into SQLite
pandas_df = kpi_df.toPandas()

# Create a table (named table1) in the SQLite database (if it doesn't exist) and insert the DataFrame
cursor.execute('''
CREATE TABLE IF NOT EXISTS table1 (
    kpi_id INTEGER PRIMARY KEY,
    kpi_name TEXT,
    kpi_value Double
)
''')
pandas_df.to_sql('table1', conn, if_exists='replace', index=False)

+--------------------+-------------------+
|            KPI Name|              Value|
+--------------------+-------------------+
|     White winning %| 49.939831528279186|
|     Black winning %| 45.414116046669804|
|              Draw %|  4.646052425051012|
|       Average Turns| 60.513838748495786|
| Average Opening Ply|  4.815779835713912|
|% of games won by...|  24.98313903073514|
|% of games won by...| 19.004676018704075|
|% of games won by...| 15.685706001739636|
|% of games that e...|0.10987286140323341|
+--------------------+-------------------+



In [15]:
# Calculate usage % of each major opening and display the result
chess_df = chess_df.withColumn("major_opening_name", split(col("opening_name"),":")[0])
kpi_df = chess_df.groupBy("major_opening_name").count()
kpi_df = kpi_df.withColumn("Usage %",(col("count")/total_games)*100)
kpi_df = kpi_df.select("major_opening_name","Usage %")
kpi_df = kpi_df.orderBy(col("Usage %").desc())
kpi_df.show()

# Convert the DataFrame to a Pandas DataFrame for insertion into SQLite
pandas_df = kpi_df.toPandas()

# Create a table (named table2) in the SQLite database (if it doesn't exist) and insert the DataFrame
cursor.execute('''
CREATE TABLE IF NOT EXISTS table2 (
    kpi_id INTEGER PRIMARY KEY,
    kpi_name TEXT,
    kpi_value Double
)
''')
pandas_df.to_sql('table2', conn, if_exists='replace', index=False)

+--------------------+------------------+
|  major_opening_name|           Usage %|
+--------------------+------------------+
|    Sicilian Defense|12.938837440485534|
|      French Defense| 6.581907602155601|
|   Queen's Pawn Game| 5.294825511432009|
|        Italian Game| 4.844869983780673|
|    King's Pawn Game| 4.557107727724585|
|           Ruy Lopez| 4.248417307591692|
|     English Opening| 3.657196672421912|
|Scandinavian Defense| 3.599644221210694|
|   Caro-Kann Defense|2.9247109297336893|
|         Scotch Game|2.3073300894679014|
|Queen's Gambit De...|1.9724794642389996|
|   Four Knights Game| 1.799822110605347|
|Van't Kruijs Opening| 1.789358028566944|
|         Indian Game|1.6323967979908962|
|    Bishop's Opening| 1.621932715952493|
|   Zukertort Opening| 1.559148223722074|
|    Philidor Defense|1.3760267880500183|
|Queen's Gambit Re...| 1.276618008685188|
|        Russian Game| 1.266153926646785|
|Queen's Gambit Ac...| 1.266153926646785|
+--------------------+------------

In [16]:
# Group the  chess_df DataFrame by 'increment_code' and count the number of games for each group
kpi_df = chess_df.groupBy("increment_code").agg(count("*").alias("Total Games Count"))

# Filter the DataFrame to include only games that ended with a loss due to time running out ('outoftime')
# Group the resulting DataFrame by 'increment_code' and count the number of time losses for each group
time_loss_df = chess_df.filter(col("victory_status") == "outoftime")
time_loss_df = time_loss_df.groupBy("increment_code").agg(count("*").alias("Time Loss Count"))

# Filter the DataFrame to include only games that ended with resignation ('resign')
# Group the resulting DataFrame by 'increment_code' and count the number of resignations for each group
resignation_df = chess_df.filter(col("victory_status") == "resign")
resignation_df = resignation_df.groupBy("increment_code").agg(count("*").alias("Resignations Count"))

# This combines the total games, time loss counts, and resignation count into a single DataFrame
# (by performing join operations according to 'increment_code column')
kpi_df = kpi_df.join(time_loss_df,on = "increment_code",how = "inner")
kpi_df = kpi_df.join(resignation_df,on = "increment_code",how = "inner")

# Calculate the usage percentage for each increment code (i.e., the percentage of games using that increment code)
kpi_df = kpi_df.withColumn("Usage %", (col("Total Games Count")/total_games)*100)

# Calculate the percentage of games that ended due to time running out for each increment code
kpi_df = kpi_df.withColumn("Time Loss %", (col("Time Loss Count")/col('Total Games Count')*100))

# Calculate the percentage of games that ended with resignation for each increment code
kpi_df = kpi_df.withColumn("Resignation %", (col("Resignations Count")/col('Total Games Count')*100))

# Select only the relevant columns: 'increment_code', 'Usage %', 'Time Loss %', and 'Resignation %'
kpi_df = kpi_df.select("increment_code","Usage %","Time Loss %","Resignation %")

# Order the resulting DataFrame by 'Usage %' in descending order, so that the most used increment codes are listed first
kpi_df = kpi_df.orderBy(col("Usage %").desc())

# Display the resulting DataFrame
kpi_df.show()

# Convert the DataFrame to a Pandas DataFrame for insertion into SQLite
pandas_df = kpi_df.toPandas()

# Create a table (named table3) in the SQLite database (if it doesn't exist) and insert the DataFrame
cursor.execute('''
CREATE TABLE IF NOT EXISTS table3 (
    kpi_id INTEGER PRIMARY KEY,
    kpi_name TEXT,
    kpi_value Double
)
''')
pandas_df.to_sql('table3', conn, if_exists='replace', index=False)

+--------------+------------------+------------------+------------------+
|increment_code|           Usage %|       Time Loss %|     Resignation %|
+--------------+------------------+------------------+------------------+
|          10+0|  38.4868937372469| 9.883088635127788| 54.74442631865144|
|          15+0| 6.581907602155601|  8.34658187599364| 55.16693163751988|
|         15+15| 4.295505676764506|3.6540803897685747|56.881851400730824|
|           5+5|  3.78276565688275| 9.266943291839558|56.293222683264176|
|           5+8| 3.547323811018678| 8.702064896755163| 56.78466076696165|
|           8+0|2.9456390938104953|17.761989342806395|49.733570159857905|
|          10+5|2.7939099042536495| 7.490636704119851| 58.80149812734082|
|         15+10| 2.249777638256684| 4.651162790697675| 64.88372093023256|
|          20+0| 2.239313556218281| 5.607476635514018| 52.10280373831776|
|          30+0|1.8835347669125726|3.3333333333333335| 59.72222222222222|
|         10+10|1.6951812902213155|  4

In [17]:
# Define a function to classify ratings into ranges.
# This function creates labels such as "rookie", "beginner", "intermediate", etc., based on the player's rating.
def rating_range(rating):
    return when(col(rating) <= 1100, "rookie - <= 1100") \
        .when((col(rating) > 1100) & (col(rating) <= 1600), "beginner - (1100, 1600]") \
        .when((col(rating) > 1600) & (col(rating) <= 2000), "intermediate - (1600, 2000]") \
        .when((col(rating) > 2000) & (col(rating) <= 2300), "advance - (2000, 2300]") \
        .otherwise("expert - > 2300")

# Add new columns to the DataFrame representing the rating ranges for both white and black players.
processed_chess_df = chess_df.withColumn("White Rating Range", rating_range("white_rating"))
processed_chess_df = processed_chess_df.withColumn("Black Rating Range", rating_range("black_rating"))

# Add a 'Game Level' column, which stores the rating range if both players are in the same range.
# If they are not, the value is set to None.
processed_chess_df = processed_chess_df.withColumn("Game Level",
    when(col("White Rating Range") == col("Black Rating Range"), col("White Rating Range"))
    .otherwise(None)
)

# Remove rows where 'Game Level' is null (i.e., games where the players' ratings are in different ranges).
processed_chess_df = processed_chess_df.dropna()

# Calculate the total number of games that have a defined 'Game Level'.
total_defined_games = processed_chess_df.count()

# Aggregate the number of games for each 'Game Level'.
kpi_df = processed_chess_df.groupBy("Game Level").agg(count("*").alias("Total Games Count"))

# Filter the DataFrame to include only the games where the victory status was "outoftime"
# then group by 'Game Level' and count the number of such games for every Game Level.
time_loss_df = processed_chess_df.filter(col("victory_status") == "outoftime").groupBy("Game Level").agg(count("*").alias("Time Loss Count"))

# Filter the DataFrame to include only the games where the victory status was "resign"
# then group by 'Game Level' and count the number of such games for every Game Level.
resignation_df = processed_chess_df.filter(col("victory_status") == "resign").groupBy("Game Level").agg(count("*").alias("Resignations Count"))

# Filter the DataFrame to include only the games where the victory status was "draw"
# then group by 'Game Level' and count the number of such games for every Game Level.
draw_df = processed_chess_df.filter(col("victory_status") == "draw").groupBy("Game Level").agg(count("*").alias("Draws Count"))

# Filter the DataFrame to include only the games where the victory status was "mate"
# then group by 'Game Level' and count the number of such games for every Game Level.
mate_df = processed_chess_df.filter(col("victory_status") == "mate").groupBy("Game Level").agg(count("*").alias("Mates Count"))

# Group the DataFrame by 'Game Level' and 'major_opening_name', then count the occurrences
# of each opening within each game level.
grouped_df = processed_chess_df.groupBy("Game Level", "major_opening_name").count()

# Define a window specification that partitions the data by 'Game Level' and orders
# the openings within each level by their frequency in descending order.
window_spec = Window.partitionBy("Game Level").orderBy(col("count").desc())


# Apply ranking to the openings within each game level based on their frequency,
# then filter to keep only the most common (highest ranked) opening for each game level.
# The resulting DataFrame will contain the most popular opening for each game level.
ranked_df = grouped_df.withColumn("rank", rank().over(window_spec))
most_common_opening_df = ranked_df.filter(col("rank") == 1).select("Game Level", "major_opening_name") \
                    .withColumnRenamed("major_opening_name", "Most Common Opening")

# Join all the aggregated dataframes (total games, time losses, resignations, draws, mates, and most common opening)
# based on the 'Game Level' column.
kpi_df = kpi_df.join(time_loss_df, on="Game Level", how="inner")
kpi_df = kpi_df.join(resignation_df, on="Game Level", how="inner")
kpi_df = kpi_df.join(draw_df, on="Game Level", how="inner")
kpi_df = kpi_df.join(mate_df, on="Game Level", how="inner")
kpi_df = kpi_df.join(most_common_opening_df, on="Game Level", how="inner")

# Calculate the percentage of games for each 'Game Level' relative to the total number of defined games.
kpi_df = kpi_df.withColumn("% Of Games", (col("Total Games Count") / total_defined_games) * 100)

# Calculate the percentage of games that ended due to time running out within each 'Game Level'.
kpi_df = kpi_df.withColumn("Time Loss %", (col("Time Loss Count") / col("Total Games Count")) * 100)

# Calculate the percentage of games that ended in resignation within each 'Game Level'.
kpi_df = kpi_df.withColumn("Resignation %", (col("Resignations Count") / col("Total Games Count")) * 100)

# Calculate the percentage of games that ended in a draw within each 'Game Level'.
kpi_df = kpi_df.withColumn("Draw %", (col("Draws Count") / col("Total Games Count")) * 100)

# Calculate the percentage of games that ended with a checkmate within each 'Game Level'.
kpi_df = kpi_df.withColumn("Mate %", (col("Mates Count") / col("Total Games Count")) * 100)

# Select the desired columns and order the data by the percentage of games in descending order.
kpi_df = kpi_df.select("Game Level", "% Of Games", "Time Loss %", "Resignation %", "Draw %", "Mate %", "Most Common Opening") \
    .orderBy(col("% Of Games").desc())

# Convert the DataFrame to a Pandas DataFrame for insertion into SQLite
pandas_df = kpi_df.toPandas()

# Create a table (named table4 in the SQLite database (if it doesn't exist) and insert the DataFrame
cursor.execute('''
CREATE TABLE IF NOT EXISTS table4 (
    kpi_id INTEGER PRIMARY KEY,
    kpi_name TEXT,
    kpi_value Double
)
''')
pandas_df.to_sql('table4', conn, if_exists='replace', index=False)

