In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, udf
from pyspark.sql.types import StructType, StructField, FloatType
from joblib import load

spark = SparkSession.builder \
    .appName("HorseRaceConsumer") \
    .getOrCreate()

schema = StructType([
    StructField("jockey_score", FloatType(), True),
    StructField("horse_track_relative_score", FloatType(), True),
    StructField("horse_course_relative_score", FloatType(), True),
    StructField("horse_race_relative_score", FloatType(), True),
    StructField("distance_id", FloatType(), True),
    StructField("weight_carried", FloatType(), True),
    StructField("odds", FloatType(), True),
    StructField("position_at_finish", FloatType(), True)  # Target column
])

model = load("horsemodel_randomforest.pkl")
broadcast_model = spark.sparkContext.broadcast(model)

def predict(jockey_score, horse_track_relative_score, horse_course_relative_score,
            horse_race_relative_score, distance_id, weight_carried, odds):
    feature_list = [jockey_score, horse_track_relative_score, horse_course_relative_score,
                    horse_race_relative_score, distance_id, weight_carried, odds]
    return float(broadcast_model.value.predict([feature_list])[0])

predict_udf = udf(predict, FloatType())

kafka_topic = "data_analytics"
kafka_bootstrap_servers = "localhost:9092"

kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .load()

parsed_df = kafka_df.selectExpr("CAST(value AS STRING) AS value") \
    .select(from_json(col("value"), schema).alias("parsed_value")) \
    .select("parsed_value.*")  # Flatten the parsed JSON fields

predictions_df = parsed_df.withColumn(
    "prediction",
    predict_udf(
        col("jockey_score"),
        col("horse_track_relative_score"),
        col("horse_course_relative_score"),
        col("horse_race_relative_score"),
        col("distance_id"),
        col("weight_carried"),
        col("odds")
    )
)

total_predictions = spark.sparkContext.accumulator(0)
correct_predictions = spark.sparkContext.accumulator(0)
close_predictions = spark.sparkContext.accumulator(0)

def process_batch(batch_df, batch_id):
    global total_predictions, correct_predictions, close_predictions
    rows = batch_df.collect()

    for row in rows:
        actual = row["position_at_finish"]
        predicted = round(row["prediction"])

        print(f"Predicted: {predicted}, Actual: {actual}")

        total_predictions += 1
        if actual == predicted:
            correct_predictions += 1
        elif abs(actual - predicted) <= 2:
            close_predictions += 1

    if total_predictions.value > 0:
        correct_percentage = (correct_predictions.value / total_predictions.value) * 100
        close_percentage = (close_predictions.value / total_predictions.value) * 100
    else:
        correct_percentage = 0.0
        close_percentage = 0.0

    # Print percentages
    print(f"Correct Predictions: {correct_percentage:.2f}%")
    print(f"Close Predictions: {close_percentage:.2f}%")
    print("-" * 50)

# Use foreachBatch for real-time processing
query = predictions_df.writeStream \
    .foreachBatch(process_batch) \
    .outputMode("append") \
    .start()

query.awaitTermination()

Correct Predictions: 0.00%
Close Predictions: 0.00%
--------------------------------------------------
Predicted: 5, Actual: 6.0
Predicted: 5, Actual: 4.0
Correct Predictions: 0.00%
Close Predictions: 100.00%
--------------------------------------------------
Predicted: 6, Actual: 6.0
Predicted: 5, Actual: 5.0
Predicted: 5, Actual: 5.0
Correct Predictions: 60.00%
Close Predictions: 40.00%
--------------------------------------------------
Predicted: 5, Actual: 6.0
Predicted: 6, Actual: 9.0
Correct Predictions: 42.86%
Close Predictions: 42.86%
--------------------------------------------------
Predicted: 4, Actual: 2.0
Predicted: 7, Actual: 7.0
Predicted: 5, Actual: 3.0
Correct Predictions: 40.00%
Close Predictions: 50.00%
--------------------------------------------------
Predicted: 5, Actual: 3.0
Predicted: 5, Actual: 2.0
Correct Predictions: 33.33%
Close Predictions: 50.00%
--------------------------------------------------
Predicted: 5, Actual: 7.0
Predicted: 5, Actual: 8.0
Correct 

In [None]:
# IT WAS COMPLETE, I MANUALLY STOPPED IT.