# Model 2: Predicting Bot Performance

The purpose of this model is to predict the performance of the bot.

## Preliminary Setup

### Imports

In [None]:
import os
import numpy as np
import random
import math
import json
import pandas as pd
import ast
import datetime

In [None]:
#%pip install json

In [None]:
#!pip install nbimporter

In [None]:
import nbimporter

### Additional Bot Config

In [None]:
import Bot1

In [None]:
file_name = 'Data/Model2/model2_data_raw.csv'

In [None]:
%run Bot1.ipynb

In [None]:
grid, open_cells = create_grid() # Fixed grid orientation

In [None]:
def determine_probabilities(bot, matrix):
    directions = {'up': (bot[0], bot[1] - 1), 
                  'down': (bot[0], bot[1] + 1), 
                  'left': (bot[0] - 1, bot[1]), 
                  'right': (bot[0] + 1, bot[1]),
                  'stay': bot}
    return [matrix.get(directions[direction], 0) for direction in ['up', 'down', 'left', 'right', 'stay']]

In [None]:
def Bot1_collect_data(k, alpha, max_iter, timeout):
    global grid, open_cells
    
    grid, open_cells = reset_grid(grid, open_cells)
    bot, ship, open_cells = place_bot(grid, open_cells)

    crew_list = []
    alien_list = []
    d_lookup_table = {}
    
  #  data_log = [] # Data Log Initialization

    crew_list, ship = place_crew(ship, open_cells, crew_list)
    alien_list, ship = place_alien(ship, open_cells, alien_list, bot, k)

    alien_matrix = initialize_alienmatrix(open_cells, bot, k)
    crew_matrix = initialize_crewmatrix(open_cells, crew_list, bot)
    
    alien_detected = False
    crew_detected = False
    
    next_move_str = 'stay'

    win_count = 0
    loss_count = 0
    move = 0
    win_move_count = []
    marker = 0
    df = pd.DataFrame()
    cur_df = pd.DataFrame()
    while (win_count + loss_count) < max_iter:
        neighbors = check_valid_neighbors(len(ship), bot[0], bot[1])
        open_moves = [neigh for neigh in neighbors if (grid[neigh] != 1)]
        open_moves.append(bot) # Bot can stay in place 
        next_move = determine_move(open_moves, alien_matrix, crew_matrix)
        
#         alien_matrix_str_keys = {str(key): round(value, 5) for key, value in alien_matrix.items()}
#         crew_matrix_str_keys = {str(key): round(value, 5) for key, value in crew_matrix.items()}

#         alien_matrix_json = json.dumps(alien_matrix_str_keys)
#         crew_matrix_json = json.dumps(crew_matrix_str_keys)

#         alien_matrix_flat = [round(alien_matrix.get((x, y), 0), 5) for x in range(30) for y in range(30)]
#         crew_matrix_flat = [round(crew_matrix.get((x, y), 0), 5) for x in range(30) for y in range(30)]

        alien_probs = determine_probabilities(bot, alien_matrix)
        crew_probs = determine_probabilities(bot, crew_matrix)
        
        # Convert relative move to string      
        if next_move[0] > bot[0]:
            next_move_str = 'right'
        elif next_move[0] < bot[0]:
            next_move_str = 'left'
        elif next_move[1] > bot[1]:
            next_move_str = 'up'
        elif next_move[1] < bot[1]:
            next_move_str = 'down'
        else:
            next_move_str = 'stay'
        
        # One-Hot Encoding
        actions = {'up': [1, 0, 0, 0, 0], 'down': [0, 1, 0, 0, 0], 'left': [0, 0, 1, 0, 0], 'right': [0, 0, 0, 1, 0], 'stay': [0, 0, 0, 0, 1]}
        best_move_encoded = actions[next_move_str]
        
        log_entry = {
            'bot_x': bot[0],
            'bot_y': bot[1],
            
            'alien_up': alien_probs[0],
            'alien_down': alien_probs[1],
            'alien_left': alien_probs[2],
            'alien_right': alien_probs[3],
            'alien_stay': alien_probs[4],
            
            'crew_up': crew_probs[0],
            'crew_down': crew_probs[1],
            'crew_left': crew_probs[2],
            'crew_right': crew_probs[3],
            'crew_stay': crew_probs[4],
            
            'alien_detected': 1 if alien_detected else 0,
            'crew_detected': 1 if crew_detected else 0,
            
            'successful': 0
        }
       # data_log.append(log_entry)
        cur_df = cur_df.append(log_entry, ignore_index=True)
        

        prev_win_count = win_count
        bot, crew_list, ship, open_cells, win_count, marker = move_bot(ship, bot, next_move, crew_list, alien_list, open_cells, win_count, 1)
        move += 1

        if marker == 1 or move >= timeout:
            loss_count += 1
            print(f"Bot captured! Win Count: {win_count}, Loss Count: {loss_count}")
            df = df.append(cur_df, ignore_index=True)
            cur_df.drop(cur_df.index, axis=0, inplace=True)
            cur_df.drop(cur_df.columns, axis=1, inplace=True)
            grid, open_cells = reset_grid(grid, open_cells)
            bot, ship, open_cells = place_bot(grid, open_cells)
            crew_list = []
            alien_list = []
            d_lookup_table = {}

            crew_list, ship = place_crew(ship, open_cells, crew_list)
            alien_list, ship = place_alien(ship, open_cells, alien_list, bot, k)

            alien_matrix = initialize_alienmatrix(open_cells, bot, k)
            crew_matrix = initialize_crewmatrix(open_cells, crew_list, bot)
            marker = 0
            move = 0

            continue

        if win_count > prev_win_count:
            print(f"Crew saved! Win Count: {win_count}, Loss Count: {loss_count}")
            cur_df['successful'] = 1
            df = df.append(cur_df, ignore_index=True)
            cur_df.drop(cur_df.index, axis=0, inplace=True)
            cur_df.drop(cur_df.columns, axis=1, inplace=True)
            win_move_count.append(move)
            move = 0
            d_lookup_table = {}
            alien_matrix = initialize_alienmatrix(open_cells, bot, k)
            crew_matrix = initialize_crewmatrix(open_cells, crew_list, bot)
        
       # print(f"Bot: {bot}, Crew: {crew_list}, Aliens: {alien_list}")

        alien_matrix, crew_matrix = update_afterbotmove(bot, alien_matrix, crew_matrix)

        # Move bot to optimal neighbor
        marker, alien_list, ship = move_aliens(ship, alien_list, bot) # Move alien randomly

        if marker == 1 or move >= timeout:
            loss_count += 1
            print(f"Bot captured! Win Count: {win_count}, Loss Count: {loss_count}")
            df = df.append(cur_df, ignore_index=True)
            cur_df.drop(cur_df.index, axis=0, inplace=True)
            cur_df.drop(cur_df.columns, axis=1, inplace=True)
            grid, open_cells = reset_grid(grid, open_cells)
            bot, ship, open_cells = place_bot(grid, open_cells)
            crew_list = []
            alien_list = []
            d_lookup_table = {}

            crew_list, ship = place_crew(ship, open_cells, crew_list)
            alien_list, ship = place_alien(ship, open_cells, alien_list, bot, k)

            alien_matrix = initialize_alienmatrix(open_cells, bot, k)
            crew_matrix = initialize_crewmatrix(open_cells, crew_list, bot)
            marker = 0
            move = 0

            continue
        
        alien_matrix = update_afteralienmove(ship, alien_list, alien_matrix) # Update after alien move
        
        alien_detected = alien_sensor(alien_list, bot, k) # Run Alien Sensor
        crew_detected, d_lookup_table = crew_sensor(ship, bot, alpha, d_lookup_table, crew_list) # Run Crew Sensor
        
        alien_matrix = update_alienmatrix(alien_matrix, alien_detected, bot, k) # Update based on alien sensor

        crew_matrix = update_crewmatrix(crew_matrix, crew_detected, d_lookup_table, bot, alpha) # Update based on crew sensor
    
    df = df.append(cur_df, ignore_index=True)
    
#     df = pd.DataFrame(data_log)
    
    if os.path.isfile(file_name):
        df.to_csv(file_name, mode='a', index=False, header=False)
    else:
        df.to_csv(file_name, mode='w', index=False, header=True)

    return sum(win_move_count) // max(1, len(win_move_count)), (win_count / max(1, (win_count + loss_count))), win_count

In [None]:
def Bot1_simulation(alpha_values, k_values, max_iter, timeout, num_simulations):
    avg_rescue_moves = {k: [] for k in k_values}
    prob_crew_rescue = {k: [] for k in k_values}
    avg_crew_saved = {k: [] for k in k_values}

    for k in k_values:
        for alpha in alpha_values:
            total_metric1, total_metric2, total_metric3 = 0, 0, 0
            
            for i in range(num_simulations):
                metric1, metric2, metric3 = Bot1_collect_data(k, alpha, max_iter, timeout)
                total_metric1 += metric1
                total_metric2 += metric2
                total_metric3 += metric3

            avg_metric1 = total_metric1 / num_simulations
            avg_metric2 = total_metric2 / num_simulations
            avg_metric3 = total_metric3 / num_simulations

            print(f"k: {k}, Alpha: {alpha}\nAverage Rescue Moves: {avg_metric1}\nProbability of Crew Rescue: {avg_metric2}\nAverage Crew Saved: {avg_metric3}\n")

            avg_rescue_moves[k].append(avg_metric1)
            prob_crew_rescue[k].append(avg_metric2)
            avg_crew_saved[k].append(avg_metric3)

    return avg_rescue_moves, prob_crew_rescue, avg_crew_saved

In [None]:
def one_alien_one_crew(alpha_values, k_values, max_iter, timeout, num_simulations):
    bot1_avg_rescue_moves, bot1_prob_crew_rescue, bot1_avg_crew_saved = Bot1_simulation(alpha_values, k_values, max_iter, timeout, num_simulations)

    bot1_prob_crew_rescue = {k: [round(prob, 3) for prob in probs] for k, probs in bot1_prob_crew_rescue.items()}

    print(bot1_avg_rescue_moves, bot1_prob_crew_rescue, bot1_avg_crew_saved, "\n")

In [None]:
alpha_values = [0.004] # 0.004 > 0.01
k_values = [3]
max_iter = 30
timeout = 10000
num_simulations = 50

In [None]:
#one_alien_one_crew(alpha_values, k_values, max_iter, timeout, num_simulations)

In [None]:
# k: 3, Alpha: 0.004
# Average Rescue Moves: 585.05
# Probability of Crew Rescue: 0.71
# Average Crew Saved: 21.3

# {3: [585.05]} {3: [0.71]} {3: [21.3]} 

### Model 1 Training Functions

In [None]:
# Softmax Function
def softmax(z):
    e_z = np.exp(z - np.max(z, axis=1, keepdims=True))
    return e_z / np.sum(e_z, axis=1, keepdims=True)

In [None]:
def sigmoid(z):
    return 1 / (1 + np.exp(-z))

In [None]:
# Initialize weights and biases
def init_params(num_features, num_classes):
    W = np.random.randn(num_features, num_classes) * 0.01 # Initialize to a small random number
    b = np.zeros((1, num_classes))
#     print(W, b, W.shape, b.shape)
    return W, b

In [None]:
# Loss Function
def loss_function(y_true, y_pred):
    m = y_true.shape[0]
    #return -np.sum(np.full((m, 2), [1.556, 0.737])*y_true * np.log(y_pred + 1e-15)) / m # Class weights due to imbalance in data
    return -np.sum(y_true * np.log(y_pred + 1e-15)) / m

In [None]:
# Calculate the gradient
def compute_gradient(X, y_true, y_pred):
    m = X.shape[0]
    der_z = y_pred - y_true
    # dW = 1/m * np.dot(X.T, der_z * np.full((m, 2), [1.556, 0.737])) # Class weights due to imbalance in data
    #db = 1/m * np.sum(der_z * np.full((m, 2), [1.556, 0.737]), axis=0, keepdims=True)
    dW = 1/m * np.dot(X.T, der_z)
    db = 1/m * np.sum(der_z, axis=0, keepdims=True)
    return dW, db

In [None]:
# Prediction Function
def predict(X, W, b):

    z = np.dot(X, W) + b
    y_pred = sigmoid(z)
    return y_pred

In [None]:
# Prediction Function to account for cases with invalid predictions
# For example, [0, 0.1, 0.2, 0.4, 0.3] would become [0, 0.1, 0, 0, 0.3] if left and right were invalid, and then normalization would cause it to become [0, 0.25, 0, 0, 0.75]
def predict_constrained(X, W, b, valid):
    y_pred = predict(X, W, b)
    
    valid_y_pred = y_pred * valid
    valid_y_pred_sum = valid_y_pred.sum(axis=1, keepdims=True)
    valid_y_pred /= valid_y_pred_sum
    
    one_hot_pred = np.zeros_like(valid_y_pred, dtype=int)
    one_hot_pred[np.arange(len(valid_y_pred)), np.argmax(valid_y_pred, axis=1)] = 1

    return one_hot_pred

In [None]:
# Train Function that uses SGD and GD
def train(X, y, alpha, epochs, initial_batch_size, loss_threshold):
    num_features = X.shape[1]
    num_classes = 2
    n = X.shape[0]

    W, b = init_params(num_features, num_classes)
#     print(W, b)

    previous_loss = float('inf')  # Set starting loss to infinity
    batch_size = n  # Start with SGD (smaller batch size)
    switched_to_gd = False
    loss_list = []  # Store loss values over time
    
    # Number of times iterated through entire dataset
    for epoch in range(epochs):
        current_loss = 0
        
        # Only iterate over batch size. In SGD, batch size is small, so iterate over smaller batches and update loss
        for i in range(0, n, batch_size):
            X_batch = X.iloc[i:i + batch_size]
            y_batch = y.iloc[i:i + batch_size]
            replace_exp = lambda x: np.eye(2)[0] if x == 0 else np.eye(2)[1]
            y_batch['successful'] = y_batch['successful'].apply(replace_exp)
            y_pred = predict(X_batch, W, b)
            
# # #             print(y_batch, y_pred, valid_batch)
            batch_loss = loss_function(np.vstack(y_batch['successful'].values), y_pred)

            current_loss += batch_loss

            dW, db = compute_gradient(X_batch, np.vstack(y_batch['successful'].values), y_pred)
            W -= alpha * dW
            b -= alpha * db

        current_loss /= (n // batch_size)
        loss_list.append(current_loss)

# #         # Check if loss threshold is met to switch to GD
#         if not switched_to_gd and abs(previous_loss - current_loss) < loss_threshold:
#             batch_size = n  # Set batch size to full dataset (switch to Gradient Descent)
#             switched_to_gd = True
#             print(f"Switched to Gradient Descent. Epoch: {epoch}")

        previous_loss = current_loss
        
        print(f"Epoch {epoch}, Loss: {current_loss}")

      
    return W, b, loss_list

In [None]:
def is_valid(x, y, move, grid, open_cells):
    if move == 'up' and (x, y + 1) in open_cells:
        return 1
    elif move == 'down' and (x, y - 1) in open_cells:
        return 1
    elif move == 'left' and (x - 1, y) in open_cells:
        return 1
    elif move == 'right' and (x + 1, y) in open_cells:
        return 1
    elif move == 'stay':
        return 1
    else:
        return 0

In [None]:
grid, open_cells = reset_grid(grid, open_cells)

def create_valid_matrix(X):
    global grid, open_cells
    directions = ['up', 'down', 'left', 'right', 'stay']
    valid_list = []
    for i in range(len(X)):
        x, y = X.iloc[i, 0], X.iloc[i, 1]
        validity_for_each_direction = [is_valid(x, y, move, grid, open_cells) for move in directions]
        valid_list.append(validity_for_each_direction)

    valid_array = np.array(valid_list)
    return valid_array

In [None]:
def plot_training_loss(loss_list):
    directory = "Results/Model2"
    
    if not os.path.exists(directory):
        os.makedirs(directory)

    filename = f"loss_plot_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
    filepath = os.path.join(directory, filename)
    
    plt.figure(figsize=(10, 6))
    plt.plot(loss_list, label='Loss per Epoch')
    plt.title('Model Loss over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    
    plt.savefig(filepath)
    plt.show()

### Model 1 Testing Functions

In [None]:
def calculate_accuracy(y_true, y_pred):
    correct_predictions = (y_true == y_pred)
    accuracy = correct_predictions.sum() / correct_predictions.size
    return accuracy

In [None]:
def test(W, b, X_train, y_train,X_test, y_test):
    y_batch_train = y_train.iloc[0:]
    y_batch_test = y_test.iloc[0:]
    replace_exp = lambda x: np.eye(2)[0] if x == 0 else np.eye(2)[1]
    y_batch_train['successful'] = y_batch_train['successful'].apply(replace_exp)
    y_batch_test['successful'] = y_batch_test['successful'].apply(replace_exp)
    
    
    
#     y_train_true = np.array(y_train_df.apply(ast.literal_eval).tolist())
#     y_test_true = np.array(y_test_df.apply(ast.literal_eval).tolist())
    
    y_train_pred = predict(X_train, W, b)
    y_train_pred_binary = (y_train_pred >= 0.5).astype(int)
    

# #     print(y_train_true, y_train_pred)
    train_acc = calculate_accuracy(np.vstack(y_batch_train['successful'].values), y_train_pred_binary)
    y_test_pred = predict(X_test, W, b)
    y_test_pred_binary = (y_test_pred >= 0.5).astype(int)
    
#     y_test_pred = predict(X_test, W, b)
    test_acc = calculate_accuracy(np.vstack(y_batch_test['successful'].values), y_test_pred_binary)
    
    print(f"Training Accuracy: {train_acc}\nTesting Accuracy: {test_acc}")
    return train_acc, test_acc

### Data Preprocessing

In [None]:
data = pd.read_csv('Data/Model2/model2_data_raw.csv')

In [None]:
data.head()

In [None]:
data.shape

In [None]:
# data.head()

In [None]:
model2_df = data.drop_duplicates()
model2_df.shape

In [None]:
model2_df[model2_df['successful'] == 0.0].shape

In [None]:
model2_df.head()

In [None]:
percentage_training = 0.80
percentage_test = 0.20



num_rows_total = model2_df.shape[0]
num_rows_training = int(percentage_training * num_rows_total)
num_rows_test = int(percentage_test * num_rows_total)

shuffled_indices = np.random.permutation(num_rows_total)


# Split the indices into training and test sets
training_indices = shuffled_indices[:num_rows_training]
test_indices = shuffled_indices[num_rows_training:num_rows_training + num_rows_test]


#train_size = int(0.8 * len(model2_df))

train_df = model2_df.iloc[training_indices, :]
test_df = model2_df.iloc[test_indices, :]

#Make sure equal number of points from both classes in training matrix 
# num_rows_total = model2_df.shape[0]
# shuffled_indices = np.random.permutation(num_rows_total)
# x = model2_df[model2_df['successful'] == 1.0]
# filtered_1 = np.random.choice(x.shape[0], model2_df[model2_df['successful'] == 0.0].shape[0], replace=False)
# train_df = pd.concat([model2_df[model2_df['successful'] == 0.0], x.iloc[filtered_1, :]])
# percentage_test = 0.20
# num_rows_test = int(percentage_test * num_rows_total)
# test_indices = shuffled_indices[num_rows_training:num_rows_training + num_rows_test]
# test_df = model2_df.iloc[test_indices, :]

#########

# # Choose the percentage for training and test sets
# percentage_training = 0.80
# percentage_test = 0.20

# # Calculate the number of rows for training and test sets
# num_rows_total = model2_df.shape[0]
# num_rows_training = int(percentage_training * num_rows_total)
# num_rows_test = int(percentage_test * num_rows_total)

# shuffled_indices = np.random.permutation(num_rows_total)


# # Split the indices into training and test sets
# training_indices = shuffled_indices[:num_rows_training]
# test_indices = shuffled_indices[num_rows_training:num_rows_training + num_rows_test]


# # # Create the training set
# training_set = model2_df.iloc[training_indices, :]

# # Define output column 
# wanted_column = -1

# # Filter the dataset to get rows with value 1 
# filteredvalue_1 = training_set.loc[training_set.iloc[:, wanted_column] == 1]

# # Filter the dataset to get rows with value 0 
# filteredvalue_0 = training_set.loc[training_set.iloc[:, wanted__column] == 0]

# # Calculate the number of samples needed for training 
# num_samples = num_rows_training // 2

# # Randomly select 50% of the rows with value 1 
# training_value_1 = np.random.choice(filteredvalue_1.shape[0], num_samples, replace=False)
# training_dataset_1 = filteredvalue_1.iloc[training_value_1, :]

# # Randomly select 50% of the rows with value 0 
# training_value_0 = np.random.choice(filteredvalue_0.shape[0], num_samples)
# training_dataset_0 = filtered_rows_value_0.iloc[training_value_0, :]

# # # Combine datasets to create final training data set
# train_df = pd.concat([training_dataset_1, training_dataset_0])

# # Create the test set
# test_df = model2_df.iloc[test_indices, :]

In [None]:
train_df.shape

In [None]:
test_df.shape

In [None]:
#train_df.to_csv('Data/Model2/model2_train.csv', index=False)
#test_df.to_csv('Data/Model2/model2_test.csv', index=False)

In [None]:
X_train = train_df.iloc[:,:-1]
y_train = train_df.iloc[:,-1:]

X_test = test_df.iloc[:,:-1]
y_test = test_df.iloc[:,-1:]

## Model Training

In [None]:
alpha = 0.005
epochs = 50
initial_batch_size = 32
loss_threshold = 0.0001

In [None]:
W, b, loss_list = train(X_train, y_train, alpha, epochs, initial_batch_size, loss_threshold)

In [None]:
plot_training_loss(loss_list)

In [None]:
y_train

## Model Testing 

In [None]:
test(W, b, X_train, y_train,X_test, y_test)

In [None]:
min_random_acc = 1
max_random_acc = 0

for i in range(100):
    random_W = np.random.randn(14, 2) * 0.01
    random_b = np.zeros((1, 2))
    
    _, random_test_acc = test(random_W, random_b, X_train, y_train, X_test, y_test)
    
    max_random_acc = max(max_random_acc, random_test_acc)
    min_random_acc = min(min_random_acc, random_test_acc)

print(f"Random W, b accuracy in range: ({min_random_acc}, {max_random_acc})")