In [None]:
from kafka import KafkaConsumer
import pandas as pd
from tqdm import tqdm
import uuid
import time
from collections import defaultdict, deque
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from apscheduler.schedulers.background import BackgroundScheduler
import sys
import schedule
import time
import threading
import uuid
from kafka import KafkaConsumer
import pandas as pd
from tqdm import tqdm
import queue
import sys


def data_engineering(df):
    ##Creating id for each unique player
    df['Player_1']=df['Player_1'].str.lower()
    df['Player_2']=df['Player_2'].str.lower()
    df['Winner']=df['Winner'].str.lower()
    all_players = list(df['Player_1'].unique()) + list(df['Player_2'].unique())


    unique_players = list(set(all_players))
    global player_id_map
    player_id_map = {player: i for i, player in enumerate(unique_players)}


    new_df = df.copy()
    new_df['Player_1_ID'] = new_df['Player_1'].map(player_id_map)
    new_df['Player_2_ID'] = new_df['Player_2'].map(player_id_map)
    new_df['Winner_id'] = new_df['Winner'].map(player_id_map)
    for index, row in new_df.iterrows():
        if row['Winner_id'] == row['Player_1_ID']:
            new_df.at[index, 'Label'] = 1
            new_df.at[index, 'Loser_id'] = row['Player_2_ID']
        if row['Winner_id'] == row['Player_2_ID']:
            new_df.at[index, 'Label'] = 0
            new_df.at[index, 'Loser_id'] = row['Player_1_ID']


    #Creating elo for each player (all of them start with 1500 and the maximum gain from one match is 24
     ##(similiar to chess))
    global elo_players
    elo_players = defaultdict(int)
    global all_elo
    all_elo = defaultdict(lambda: deque())
    df_elo = []
    k = 24
    for w_id, l_id in zip(new_df['Winner_id'], new_df['Loser_id']):

        elo_w = elo_players.get(w_id, 1500)
        elo_l = elo_players.get(l_id, 1500)

        exp_w = 1/(1+10**((elo_l-elo_w)/400))
        exp_l = 1/(1+10**((elo_w-elo_l)/400))

        elo_w += k*(1-exp_w)
        elo_l += k*(0-exp_l)

        df_elo.append(elo_w-elo_l)

    # Update
        elo_players[w_id] = elo_w
        elo_players[l_id] = elo_l

        all_elo[w_id].append(elo_w)
        all_elo[l_id].append(elo_l)
    new_df['Elo_diff'] = df_elo


    ##Creating elo for each player based on their prefmormance on the court (similiar to the previous one)
    global elo_surfaces
    elo_surfaces = defaultdict(lambda: defaultdict(int))
    all_elo_surfaces = defaultdict(lambda: defaultdict(lambda: deque()))
    df_elo = []

    for w_id, l_id, surface in zip(new_df['Winner_id'], new_df['Loser_id'], new_df['Surface']):
        elo_w = elo_surfaces[surface].get(w_id, 1500)
        elo_l = elo_surfaces[surface].get(l_id, 1500)

        exp_w = 1/(1+10**((elo_l-elo_w)/400))
        exp_l = 1/(1+10**((elo_w-elo_l)/400))

        elo_w += k*(1-exp_w)
        elo_l += k*(0-exp_l)
        df_elo.append(elo_w-elo_l)

    # Update
        elo_surfaces[surface][w_id] = elo_w
        elo_surfaces[surface][l_id] = elo_l

        all_elo_surfaces[surface][w_id].append(elo_w)
        all_elo_surfaces[surface][l_id].append(elo_l)

        for s in ["Clay", "Grass", "Hard", "Carpet"]:
            if surface != s:
                all_elo_surfaces[s][w_id].append(elo_surfaces[s].get(w_id, 1500))
                all_elo_surfaces[s][l_id].append(elo_surfaces[s].get(l_id, 1500))

    new_df["ELO_SURFACE_DIFF"] = df_elo
    new_df.drop(['Tournament',"Date",'Series',"Court",'Surface',"Round","Best of",'Player_1','Player_2','Winner','Score','Player_1_ID','Player_2_ID','Winner_id','Loser_id','Pts_1',"Pts_2"],axis=1,inplace=True)
    new_df['Rank_1'] =new_df['Rank_1'].astype(float)
    new_df['Rank_2'] =new_df['Rank_2'].astype(float)
    new_df['Odd_1'] =new_df['Odd_1'].astype(float)
    new_df['Odd_2'] =new_df['Odd_2'].astype(float)



    return new_df


def create_plot():
    nadal = list(all_elo[player_id_map.get('nadal r.')])
    novak = list(all_elo[player_id_map.get('djokovic n.')])
    federer = list(all_elo[player_id_map.get('federer r.')])

    fig = plt.figure(figsize=(8, 4))
    for player in all_elo.keys():
        plt.plot(list(all_elo[player]), marker='.', linewidth=0.1, markersize=1, linestyle='-', color='black')

##מוסיף צבעים לשלושת השחקנים עם האלו הכי גבוה
    plt.plot(nadal, marker='.', linewidth=0.5, markersize=1, linestyle='-', color='blue', label="Rafa Nadal")
    plt.plot(novak, marker='.', linewidth=0.5, markersize=1, linestyle='-', color='red', label="Novak Djokovic")
    plt.plot(federer, marker='.', linewidth=0.5, markersize=1, linestyle='-', color='green', label="Roger Federer")
    plt.title("Elo Ratings Over Time")
    plt.xlabel("Match Number")
    plt.ylabel("Elo Rating")
    plt.legend(loc="lower right")
    plt.draw()
    plt.show()



def lr_spark(df):
    spark = SparkSession \
    .builder \
    .appName("Spark Logistic regression model") \
    .getOrCreate()


    spark_df = spark.createDataFrame(df)
    feature_cols = ['Rank_1', 'Rank_2', 'Odd_1', 'Odd_2', 'Elo_diff', 'ELO_SURFACE_DIFF']
    assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
    spark_df = assembler.transform(spark_df)
    spark_train, spark_test = spark_df.randomSplit([0.8, 0.2], seed=42)
    lr = LogisticRegression(featuresCol='features', labelCol='Label')
    lr_model = lr.fit(spark_train)
    predictions = lr_model.transform(spark_test)
    evaluator = MulticlassClassificationEvaluator( labelCol='Label', predictionCol='prediction', metricName='accuracy')
    accuracy = evaluator.evaluate(predictions)
    return lr_model,accuracy



def rf_spark(df):
    spark = SparkSession \
    .builder \
    .appName("Spark Logistic regression model") \
    .getOrCreate()


    spark_df = spark.createDataFrame(df)
    feature_cols = ['Rank_1', 'Rank_2', 'Odd_1', 'Odd_2', 'Elo_diff', 'ELO_SURFACE_DIFF']
    assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
    spark_df = assembler.transform(spark_df)
    spark_train, spark_test = spark_df.randomSplit([0.8, 0.2], seed=42)
    rf = RandomForestClassifier(featuresCol='features', labelCol='Label', numTrees=100, seed=42)
    spark_train, spark_test = spark_df.randomSplit([0.8, 0.2], seed=42)
    rf_model = rf.fit(spark_train)
    predictions = rf_model.transform(spark_test)
    evaluator = MulticlassClassificationEvaluator( labelCol='Label', predictionCol='prediction', metricName='accuracy')
    accuracy = evaluator.evaluate(predictions)
    return rf_model,accuracy


def get_an_input_for_prediction(text):
    user_input = input(text)
    return user_input


def use_a_model(model):
    surfaces =["Clay", "Grass", "Hard", "Carpet"]
    surface = None
    Player_1 = None
    while Player_1 not in player_id_map.keys():
        Player_1 = get_an_input_for_prediction("What is the Name of Player 1")
        #lets say we get Roger Federer and we need to make it into federer r.
        Player_1 =f"{Player_1.split()[1].lower()} {Player_1.split()[0][0].lower()}."
        if Player_1 not in player_id_map.keys():
            print("Player not found try again")
    Player_2 = None
    while Player_2 not in player_id_map.keys() or Player_1 == Player_2:
        Player_2 = get_an_input_for_prediction("What is the Name of Player 2")
        #lets say we get Roger Federer and we need to make it into federer r.
        Player_2 =f"{Player_2.split()[1].lower()} {Player_2.split()[0][0].lower()}."

        if Player_2 not in player_id_map.keys():
            print("Player not found try again")
        if Player_2 == Player_1:
            print('Player cannot play himself')
    while surface not in surfaces:
        surface = get_an_input_for_prediction("What is the surface? (Clay,Grass,Hard,Carpet)")
        if surface not in surfaces:
            print('Surface is invalid try again')

    Rank_1 = int(get_an_input_for_prediction("What is player 1 ranking?"))
    Rank_2 = int(get_an_input_for_prediction("What is player 2 ranking"))
    Odd_1 = float(get_an_input_for_prediction("What is player 1 odds?"))
    Odd_2 = float(get_an_input_for_prediction("What is player 2 odds?"))
    Player_1_ID= player_id_map.get(Player_1)
    Player_2_ID= player_id_map.get(Player_2)
    Elo_diff = float(elo_players.get(Player_1_ID)-elo_players.get(Player_2_ID))
    ELO_SURFACE_DIFF=float(elo_surfaces.get(surface).get(Player_1_ID)-elo_surfaces.get(surface).get(Player_2_ID))
    data = [(Rank_1, Rank_2, Odd_1, Odd_2, Elo_diff, ELO_SURFACE_DIFF)]
    columns = ["Rank_1", "Rank_2", "Odd_1", "Odd_2", "Elo_diff", "ELO_SURFACE_DIFF"]
    df = spark.createDataFrame(data, columns)
    assembler = VectorAssembler(inputCols=columns, outputCol="features")
    df_transformed = assembler.transform(df)
    prediction = model.transform(df_transformed)
    if prediction.select('prediction').take(1)[0][0] == 1:
        print(f'Model predict Player 1 to win')
    else:
        print(f'Model predict Player 2 to win')



# Event to coordinate between threads
data_ready_event = threading.Event()
stop_event = threading.Event()

def data_retrivel():
    """
    Perform data retrieval and model training
    """
    try:
        # Set flag that retrieval is in progress
        global_data['retrieval_in_progress'] = True
        
        consumer = KafkaConsumer(
            'tennis_daily_data',
            bootstrap_servers='localhost:9092',
            group_id=str(uuid.uuid4()),
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            value_deserializer=lambda m: m.decode('utf-8')
        )
        
        # קריאת הנתונים
        data = []
        x = 0
        for message in tqdm(consumer):
            if x > 1 and data[0] == message.value.split(','):
                break
            x += 1
            row = message.value.split(',')  # פיצול לפי פסיקים
            data.append(row)
        
        df = pd.DataFrame(data, columns=['Tournament', 'Date', 'Series', 'Court', 'Surface', 'Round', 'Best of', 'Player_1', 'Player_2', 'Winner', 'Rank_1', 'Rank_2', 'Pts_1', 'Pts_2', 'Odd_1', 'Odd_2', 'Score'])     
        global_data['new_df'] = data_engineering(df)        
        global_data['lr_spark_model'], global_data['acc_lr_spark'] = lr_spark(global_data['new_df'])
        global_data['rf_spark_model'], global_data['acc_rf_spark'] = rf_spark(global_data['new_df'])        
        # Signal that data is ready
        data_ready_event.set()
    except Exception as e:
        print(f"Error in data retrieval: {e}")
        print(global_data)
    finally:
        # Reset the retrieval in progress flag
        global_data['retrieval_in_progress'] = False
        
        
        
        
    
def schedule_data_retrieval():
    """
    Schedule daily data retrieval at 8 AM
    """
    schedule.every().day.at("20:0").do(data_retrivel)
    
    while not stop_event.is_set():
        schedule.run_pending()
        time.sleep(1)

def ui():
    """
    Interactive UI function with handling for background data retrieval
    """
    while not stop_event.is_set():
        # Wait if a retrieval is in progress
        while global_data['retrieval_in_progress']:
            print("Data retrieval in progress. Please wait...")
            data_ready_event.wait()
            data_ready_event.clear()
        
        try:
            user_input = input(f'Input 1 To see Elo Graph\n Input 2 To see Models accuracy\n Input 3 To Predict Match on a model\n Input 9 To Exit\n')
            user_input = int(user_input)
            
            if user_input == 1:
                create_plot()
            elif user_input == 2:
                if global_data['lr_spark_model'] and global_data['rf_spark_model']:
                    print(f'The accuracy for Logistic Regression is {global_data["acc_lr_spark"]}\n The accuracy for Random Forest is {global_data["acc_rf_spark"]}')
                else:
                    print("No models trained yet. Run data retrieval first.")
            elif user_input == 3:
                if not global_data['lr_spark_model'] or not global_data['rf_spark_model']:
                    print("No models available. Run data retrieval first.")
                    continue
                
                model_choice = int(input(f'Input 1 To use Logistic Regression\n Input 2 To use Random Forest\n'))
                
                if model_choice == 1:
                    use_a_model(global_data['lr_spark_model'])
                elif model_choice == 2:
                    use_a_model(global_data['rf_spark_model'])
            elif user_input == 9:
                stop_event.set()
                break
            else:
                print("Invalid input. Try again.")
        
        except ValueError:
            print("Please enter a valid number.")

def main():
    """
    Main function to coordinate threads and run the application
    """
    try:
        # Scheduler thread
        scheduler_thread = threading.Thread(target=schedule_data_retrieval, daemon=True)
        scheduler_thread.start()
        
        # Trigger initial data retrieval
        data_retrivel()
        
        # Run UI
        ui()
    
    except KeyboardInterrupt:
        print("\nApplication interrupted by user.")
    finally:
        # Ensure all threads are stopped
        stop_event.set()
        sys.exit(0)

if __name__ == "__main__":
    
    global_data = {
    'new_df': None,
    'lr_spark_model': None,
    'rf_spark_model': None,
    'acc_lr_spark': None,
    'acc_rf_spark': None,
    'retrieval_in_progress': False
    }
    main()




Exception in thread Thread-16:
Traceback (most recent call last):
  File "/home/linuxu/anaconda3/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
  File "/home/linuxu/anaconda3/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_174493/3917353308.py", line 302, in schedule_data_retrieval
  File "/home/linuxu/anaconda3/lib/python3.9/site-packages/schedule/__init__.py", line 514, in at
    raise ScheduleValueError(
schedule.ScheduleValueError: Invalid time format for a daily job (valid format is HH:MM(:SS)?)
64781it [00:01, 46229.19it/s]


Error in data retrieval: name 'new_df' is not defined
{'new_df':        Rank_1  Rank_2  Odd_1  Odd_2  Label    Elo_diff  ELO_SURFACE_DIFF
0        63.0    77.0  -1.00  -1.00    1.0   24.000000         24.000000
1        56.0     5.0  -1.00  -1.00    0.0   24.000000         24.000000
2        40.0   655.0  -1.00  -1.00    1.0   24.000000         24.000000
3        87.0    65.0  -1.00  -1.00    0.0   24.000000         24.000000
4        81.0   198.0  -1.00  -1.00    1.0   24.000000         24.000000
...       ...     ...    ...    ...    ...         ...               ...
64776    14.0    12.0   1.44   2.75    1.0  158.527290        131.938448
64777    26.0     3.0  10.00   1.06    0.0  312.206606        333.226833
64778    13.0     6.0   2.50   1.53    1.0  -27.161527        -44.764316
64779     3.0    14.0   1.30   3.50    0.0  -76.428717        -41.946634
64780    14.0    13.0   1.57   2.38    1.0   93.018432         79.630298

[64781 rows x 7 columns], 'lr_spark_model': None, 'rf_spar