# setup

In [None]:
!pip install --upgrade accelerate
!pip install git+https://github.com/modAL-python/modAL.git

In [None]:
#@title import libraries
from google.colab import drive
import json
import os
import random
import warnings
warnings.filterwarnings('ignore')
import numpy as np
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import pdb
import modAL

from modAL.models import ActiveLearner
from modAL.uncertainty import uncertainty_sampling
from modAL.uncertainty import margin_sampling
from modAL.uncertainty import entropy_sampling
from skorch import NeuralNetClassifier
from skorch.callbacks import Callback
from torch.utils.data import DataLoader, TensorDataset
from torchvision.transforms import ToTensor
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_recall_fscore_support
import matplotlib.pyplot as plt
from sklearn.model_selection import KFold


# Utility Functions

In [None]:
# Define the categories using a dictionary
category_dict = {"X": "ground",
                 "S": "breakable",
                 "-": "empty",
                 "?": "question block",
                 "Q": "question block",
                 "E": "enemy",
                 "<": "pipe",
                 ">": "pipe",
                 "[": "pipe",
                 "]": "pipe",
                 "o": "coin",
                 "B": "cannon",
                 "b": "cannon"}

def merge_dict_keys(dict_in):
    # merge tile categories that are the same
    dict_out = {}
    for key, value in dict_in.items():
        if value not in dict_out:
            dict_out[key] = value
        else:
            dict_out[key] += ', ' + value
    dict_out = {v: k for k, v in dict_out.items()}
    return {v: k for k, v in dict_out.items()}

def create_matrix(string):
    rows = string.split('\n')
    rows = [row for row in rows if row]
    matrix = []
    for row in rows:
        columns = [char for char in row]
        matrix.append(columns)
    return matrix

def replace_characters(matrix):
    # Define the characters to replace and their replacements.
    replacements = {'?': 'Q', '<': '[', '>': '[', ']': '[', 'b': 'B'}
    new_matrix = []
    for row in matrix:
        new_row = []
        for element in row:
            if element in replacements:
                new_row.append(replacements[element])
            else:
                new_row.append(element)
        new_matrix.append(new_row)
    return new_matrix

def create_binary_matrices(matrix):
    characters = {'X', 'S', '-', 'Q', 'E', '[', 'B','o'}
    num_rows = len(matrix)
    num_cols = len(matrix[0])
    binary_matrices = {}
    for character in characters:
        binary_matrix = np.zeros((num_rows, num_cols))
        for i in range(num_rows):
            for j in range(num_cols):
                if matrix[i][j] == character:
                    binary_matrix[i][j] = 1
        binary_matrices[character] = binary_matrix
    return binary_matrices

In [None]:
merged_category_dict = merge_dict_keys(category_dict)
print(merged_category_dict)

# Mount Resources

In [None]:
# extracting labels
drive.mount('/content/drive')
gram_dict = {}
map_dict = {}
ngram_dict = {}

# gram elite info
with open('/content/drive/MyDrive/Ghost Lab/levels/mario/gram_elite_info.json') as f:
    data = json.load(f)
    for filename in data['fitness'].keys():
      gram_dict[filename] = data['fitness'][filename]
print("gram_elite_info.json imported")

# map elite info
with open('/content/drive/MyDrive/Ghost Lab/levels/mario/map_elites_info.json') as f:
    data = json.load(f)
    for filename in data['fitness'].keys():
      map_dict[filename] = data['fitness'][filename]
print("map_elites_info.json imported")

# n gram info
with open('/content/drive/MyDrive/Ghost Lab/levels/mario/ngram_info.json') as f:
    data = json.load(f)
    for filename in data['fitness'].keys():
      ngram_dict[filename] = data['fitness'][filename]
print("ngram_info.json imported")

# extracting levels
playable_level_strings = []
unplayable_level_strings = []
playable_level_labels = []
unplayable_level_labels = []

#gram elite
for filename in os.listdir('/content/drive/MyDrive/Ghost Lab/levels/mario/gram elite levels'):
  if filename.endswith('.txt'):
      with open(os.path.join('/content/drive/MyDrive/Ghost Lab/levels/mario/gram elite levels', filename), 'r') as file:
          file_contents = file.read()
          if filename in gram_dict:
            if gram_dict[filename] != 0:
              playable_level_strings.append(file_contents)
              playable_level_labels.append(1)
            else:
              unplayable_level_strings.append(file_contents)
              unplayable_level_labels.append(0)
print("gram elite levels imported")

#map elite
for filename in os.listdir('/content/drive/MyDrive/Ghost Lab/levels/mario/map elites levels'):
  if filename.endswith('.txt'):
      with open(os.path.join('/content/drive/MyDrive/Ghost Lab/levels/mario/map elites levels', filename), 'r') as file:
          file_contents = file.read()
          if filename in map_dict:
            if map_dict[filename] != 0:
              playable_level_strings.append(file_contents)
              playable_level_labels.append(1)
            else:
              unplayable_level_strings.append(file_contents)
              unplayable_level_labels.append(0)
print("map elite levels imported")

#n gram
for filename in os.listdir('/content/drive/MyDrive/Ghost Lab/levels/mario/n gram levels'):
  if filename.endswith('.txt'):
      with open(os.path.join('/content/drive/MyDrive/Ghost Lab/levels/mario/n gram levels', filename), 'r') as file:
          file_contents = file.read()
          if filename in ngram_dict:
            if ngram_dict[filename] != 0:
              playable_level_strings.append(file_contents)
              playable_level_labels.append(1)
            else:
              unplayable_level_strings.append(file_contents)
              unplayable_level_labels.append(0)
print("n gram levels imported")

In [None]:
#@title balance the dataset
from imblearn.over_sampling import ADASYN
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler

print("number of playable levels before balancing : %4d| number of unplayable levels before balancing : %4d|" % (len(playable_level_strings), len(unplayable_level_strings)))

# Reshape X to a 2D array
#X_flat = np.reshape(levels, (levels.shape[0], -1))
#oversampler = RandomUnderSampler()
#y_flat = labels
#X_resampled_flat, y_resampled_flat = oversampler.fit_resample(X_flat, y_flat)

# Reshape the 2D array back to the original shape
#X_resampled = np.reshape(X_resampled_flat, (-1, levels.shape[1], levels.shape[2], levels.shape[3]))
#y_resampled = np.reshape(y_resampled_flat, (-1, 1))
#y_resampled = np.repeat(y_resampled, 2, axis=1)

#print(X_resampled.shape)
#print(y_resampled.shape)
#print("number of playable levels after balancing : %4d| number of unplayable levels after balancing : %4d|" % (np.count_nonzero(y_resampled == 0), np.count_nonzero(y_resampled == 1)))

# balance the dataset
print("number of playable levels before balancing : %4d| number of unplayable levels before balancing : %4d|" % (len(playable_level_strings), len(unplayable_level_strings)))
n = len(unplayable_level_strings) - len(playable_level_strings)
for i in range(n):
    index = random.randint(0, len(unplayable_level_strings)-1)
    unplayable_level_strings.pop(index)
    unplayable_level_labels.pop(index)
print("number of playable levels after balancing : %4d| number of unplayable levels after balancing : %4d|" % (len(playable_level_strings), len(unplayable_level_strings)))


In [None]:
#@title convert level strings to one-hot matrices
INITIAL_TRAINING_SIZE = 5
print(len(playable_level_strings))
print(len(unplayable_level_strings))
level_strings = []
level_strings.extend(playable_level_strings)
level_strings.extend(unplayable_level_strings)
labels = []
labels.extend(playable_level_labels)
labels.extend(unplayable_level_labels)
labels = np.array(labels)

playable_idx = np.random.choice(range(len(playable_level_strings)), size=5, replace=False)
unplayable_idx = np.random.choice(range(len(unplayable_level_strings)), size=5, replace=False)
initial_x = []
initial_y = []

for i in playable_idx:
  matrix = create_matrix(playable_level_strings[i])
  replaced = replace_characters(matrix)
  binary = create_binary_matrices(replaced)
  initial_x.append(np.array(list(binary.values())))
  initial_y.append(1)

for i in playable_idx:
  matrix = create_matrix(playable_level_strings[i])
  replaced = replace_characters(matrix)
  binary = create_binary_matrices(replaced)
  initial_x.append(np.array(list(binary.values())))
  initial_y.append(0)

# Define the size of the level
width = 29
height = 13
num_levels = len(level_strings)
levels = np.zeros((num_levels, 8,14,25))

for i, level_string in enumerate(level_strings):
    matrix = create_matrix(level_string)
    replaced = replace_characters(matrix)
    binary = create_binary_matrices(replaced)
    levels[i] = np.array(list(binary.values()))
print('the overal shape of X dataset: ' + str(levels.shape))
print('the overal shape of Y dataset: ' + str(labels.shape))

# Model

In [None]:
import torch
import torch.nn as nn

class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()

        self.conv1 = nn.Conv2d(8, 16, kernel_size=3, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2)

        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2)

        self.conv3 = nn.Conv2d(32, 64, kernel_size=2, padding=0)
        self.relu3 = nn.ReLU()

        self.fc1 = nn.Linear(640, 32)
        self.relu4 = nn.ReLU()

        self.fc2 = nn.Linear(32, 2)
        self.softmax = nn.Softmax(dim=1)
        #self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool1(x)

        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool2(x)

        x = self.conv3(x)
        x = self.relu3(x)
        x = torch.flatten(x, 1)

        x = self.fc1(x)
        x = self.relu4(x)

        x = self.fc2(x)
        x = self.softmax(x)
        #x = self.sigmoid(x)
        return x

import torch
import torch.nn as nn
import torchvision.models as models
import torch.nn.functional as F
class Model3(nn.Module):
    def __init__(self, num_channels=8, num_classes=2):
        super(Model3, self).__init__()
        self.resnet = models.resnet18()
        self.resnet.conv1 = nn.Conv2d(num_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.resnet.fc = nn.Linear(512, num_classes)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.resnet(x)
        #x = self.sigmoid(x)
        x = F.softmax(x, dim=1)
        return x
        #x = self.resnet(x)
        #x = self.resnet.conv1(x)
        #x = self.resnet.fc(x)
        #return x


In [None]:
import torch
import torch.nn as nn
import ipdb

class Model2(nn.Module):
    def __init__(self, input_dim = (8, 14, 25), label_size = 2, latent_dim= 32):
        super(Model2, self).__init__()
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear((input_dim[0] * input_dim[1] * input_dim[2]) + label_size, 1024)
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 256)
        self.fc4 = nn.Linear(256, 2)  # Adjust output size to 2, assuming 2 classes
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x, label):
        x = self.flatten(x)
        x = torch.cat((x, label), dim=1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = self.fc4(x)
        output = self.softmax(x)
        return output


# passive learner with convolonutional layers

In [None]:
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
import time
import pickle

def encode_labels(labels):
    if not isinstance(labels, torch.Tensor) or labels.dtype != torch.float32:
        raise ValueError("Input must be a PyTorch tensor of float32 dtype.")
    if labels.ndim != 1 or not torch.all(torch.logical_or(labels == 0, labels == 1)):
        raise ValueError("Input must be a 1D PyTorch tensor of 0s and 1s.")
    encoded_labels = torch.zeros((len(labels), 2), dtype=torch.float32)
    encoded_labels[torch.where(labels == 0)[0], 0] = 1
    encoded_labels[torch.where(labels == 1)[0], 1] = 1
    return encoded_labels

X_tensor = torch.FloatTensor(levels)
y_tensor = torch.FloatTensor(labels)
device = "cuda" if torch.cuda.is_available() else "cpu"
dataset = torch.utils.data.TensorDataset(torch.tensor(X_tensor).to(device), torch.tensor(y_tensor).to(device))
train_data, test_data = train_test_split(dataset, test_size=0.2, random_state=42)

print(len(test_data))
print(len(train_data))

num_folds = 5
num_epochs = 50
weight_decay=0.001
learning_rate = 1e-2

kf = KFold(n_splits=num_folds, shuffle=True)

all_true_labels = []
all_pred_labels = []
all_fold_acc_history = []
start_time = time.time()

for fold, (train_indices, val_indices) in enumerate(kf.split(test_data)):
    model = Model3()
    criterion = nn.BCELoss()
    optimizer = torch.optim.Adam(model.parameters(), weight_decay=weight_decay)

    train_fold_data = torch.utils.data.Subset(train_data, train_indices)
    val_fold_data = torch.utils.data.Subset(test_data, val_indices)
    train_fold_loader = torch.utils.data.DataLoader(train_fold_data, batch_size=32, shuffle=True)
    val_fold_loader = torch.utils.data.DataLoader(val_fold_data, batch_size=32, shuffle=False)

    fold_acc_history = []
    for epoch in range(num_epochs):
        train_loss = 0
        train_acc = 0
        for inputs, labels in train_fold_loader:
            optimizer.zero_grad()
            outputs = model(inputs)

            loss = criterion(outputs, encode_labels(labels))
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        train_loss /= len(val_fold_loader)
        print(f"Validation fold {fold + 1}, epoch {epoch + 1}: train_loss = {train_loss:.2f}")

        with torch.no_grad():
            model.eval()
            val_loss = 0
            val_acc = 0
            true_labels = []
            pred_labels = []
            for inputs, labels in val_fold_loader:
                outputs = model(inputs)
                loss = criterion(outputs, encode_labels(labels))
                val_loss += loss.item()
                y_pred = np.argmax(outputs, axis=1)
                true_labels += labels.tolist()
                pred_labels += y_pred.round().tolist()
            val_loss /= len(val_fold_loader)

        val_acc = accuracy_score(true_labels, pred_labels)
        fold_acc_history.append(val_acc)
        print(f"Fold {fold + 1}: val_loss = {val_loss:.2f}, val_acc = {val_acc:.2f}")
    all_fold_acc_history.append(fold_acc_history)
    all_true_labels += true_labels
    all_pred_labels += pred_labels

# Compute the overall confusion matrix and accuracy
overall_cm = confusion_matrix(all_true_labels, all_pred_labels)
overall_acc = accuracy_score(all_true_labels, all_pred_labels)

# Print the overall confusion matrix and accuracy
print(f"Overall confusion matrix:\n{overall_cm}")
print(f"Overall accuracy: {overall_acc:.2f}")

# Compute the average accuracy over all folds for each epoch
mean_acc_history = [sum([fold_acc_history[i] for fold_acc_history in all_fold_acc_history])/num_folds for i in range(len(all_fold_acc_history[0]))]

# Plot the average accuracy over all folds over time
plt.plot(mean_acc_history)
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Average Accuracy over Time')
plt.show()
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time of: {execution_time} seconds")

from datetime import datetime
timestamp = str(int(time.time()))  # Obtain the current timestamp
torch.save(model.state_dict(), '/content/drive/MyDrive/Ghost Lab/cog 2023/camera-ready/Mario-data/passive/model_'+timestamp+'_.h5')

json_object = json.dumps({
    'folds' : num_folds, 'epochs' : num_epochs,
    'weight_decay' : weight_decay, 'learning_rate': learning_rate,
    'execution_time' : execution_time}, indent=4)
with open('/content/drive/MyDrive/Ghost Lab/cog 2023/camera-ready/Mario-data/passive/parameters-'+timestamp+'.json', 'w') as outfile:
    outfile.write(json_object)

with open('/content/drive/MyDrive/Ghost Lab/cog 2023/camera-ready/Mario-data/passive/parameters-'+timestamp+'.pickle', 'wb') as handle:
    pickle.dump(
        {'all_fold_acc_history' : all_fold_acc_history,'confusion' : overall_cm}
        , handle)

# passive learner with fully connected layers

In [None]:
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
import time
import pickle
import torch.optim as optim
def encode_labels(labels):
    if not isinstance(labels, torch.Tensor) or labels.dtype != torch.float32:
        raise ValueError("Input must be a PyTorch tensor of float32 dtype.")
    if labels.ndim != 1 or not torch.all(torch.logical_or(labels == 0, labels == 1)):
        raise ValueError("Input must be a 1D PyTorch tensor of 0s and 1s.")
    encoded_labels = torch.zeros((len(labels), 2), dtype=torch.float32)
    encoded_labels[torch.where(labels == 0)[0], 0] = 1
    encoded_labels[torch.where(labels == 1)[0], 1] = 1
    return encoded_labels

X_tensor = torch.FloatTensor(levels)
y_tensor = torch.FloatTensor(labels)
device = "cuda" if torch.cuda.is_available() else "cpu"
dataset = torch.utils.data.TensorDataset(torch.tensor(X_tensor).to(device), torch.tensor(y_tensor).to(device))
train_data, test_data = train_test_split(dataset, test_size=0.2, random_state=42)

print(len(test_data))
print(len(train_data))

num_folds = 5
num_epochs = 50
weight_decay=0.001
learning_rate = 1e-2

kf = KFold(n_splits=num_folds, shuffle=True)

all_fold_acc_history = []
start_time = time.time()

for fold, (train_indices, val_indices) in enumerate(kf.split(test_data)):
    input_dim = (8, 14, 25)
    label_size = 2
    latent_dim = 32
    batch_size = 32
    num_epochs = 10
    learning_rate = 0.001

    # Create an instance of the model
    model = Model2(input_dim, label_size, latent_dim)

    # Define the loss function and optimizer
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    train_fold_data = torch.utils.data.Subset(train_data, train_indices)
    val_fold_data = torch.utils.data.Subset(test_data, val_indices)
    train_fold_loader = torch.utils.data.DataLoader(train_fold_data, batch_size=32, shuffle=True)
    val_fold_loader = torch.utils.data.DataLoader(val_fold_data, batch_size=32, shuffle=False)

    fold_acc_history = []
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss = 0
        train_acc = 0
        for inputs, labels in train_fold_loader:
            optimizer.zero_grad()
            # Forward pass
            outputs = model(inputs, encode_labels(labels))
            #ipdb.set_trace()

            loss = criterion(outputs, encode_labels(labels))

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
        train_loss /= len(val_fold_loader)
        print(f"Validation fold {fold + 1}, epoch {epoch + 1}: train_loss = {train_loss:.2f}")

        with torch.no_grad():
            model.eval()
            val_loss = 0
            val_acc = 0
            for inputs, labels in val_fold_loader:
                outputs = model(inputs, encode_labels(labels))
                loss = criterion(outputs, encode_labels(labels))
                val_loss += loss.item()
                y_pred = np.argmax(outputs, axis=1)
            val_loss /= len(val_fold_loader)

        val_acc = accuracy_score(true_labels, pred_labels)
        fold_acc_history.append(val_acc)
        print(f"Fold {fold + 1}: val_loss = {val_loss:.2f}, val_acc = {val_acc:.2f}")
    all_fold_acc_history.append(fold_acc_history)

mean_acc_history = [sum([fold_acc_history[i] for fold_acc_history in all_fold_acc_history])/num_folds for i in range(len(all_fold_acc_history[0]))]

# Plot the average accuracy over all folds over time
plt.plot(mean_acc_history)
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Average Accuracy over Time')
plt.show()
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time of: {execution_time} seconds")

# Active Learner

In [None]:
#@title active learner
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
import pdb
import math
import time
from datetime import datetime

#random_query_strategy
#margin_sampling
#entropy_sampling
#uncertainty_sampling

def train_active_learner(strategy, n_queries, n_instances, max_epochs, fraction):
    num_folds = 5
    num_rounds = 5
    all_fold_acc_history = []
    all_fold_false_positives = []
    all_fold_false_negatives = []
    # initial training dataset
    X_tensor = torch.FloatTensor(levels)
    y_tensor = torch.LongTensor(labels)
    device = "cuda" if torch.cuda.is_available() else "cpu"
    dataset = torch.utils.data.TensorDataset(torch.tensor(X_tensor).to(device), torch.tensor(y_tensor).to(device))
    dataset_loader = torch.utils.data.DataLoader(dataset, batch_size=len(dataset))
    X , y = next(iter(dataset_loader))
    X = X.detach().cpu().numpy()
    y = y.detach().cpu().numpy()
    #
    X_ini = torch.FloatTensor(initial_x)
    y_ini = torch.LongTensor(initial_y)
    ini_dataset = torch.utils.data.TensorDataset(torch.tensor(X_ini).to(device), torch.tensor(y_ini).to(device))
    ini_loader = torch.utils.data.DataLoader(ini_dataset, batch_size=len(ini_dataset))
    X_0 , y_0 = next(iter(ini_loader))
    X_0 = X_0.detach().cpu().numpy()
    y_0 = y_0.detach().cpu().numpy()

    kf = KFold(n_splits=num_folds, shuffle=True)

    start_time = time.time()
    for fold, (train_indices, val_indices) in enumerate(kf.split(dataset)):
        X_train = X[train_indices]
        X_test= X[val_indices]
        y_train = y[train_indices]
        y_test = y[val_indices]

        print(f"Fold no {fold+1}")
        # 5 trials
        rounds_accuracies = []
        rounds_fps = []
        rounds_fns = []
        for round in range (0,num_rounds):
            print(f"Fold no {fold+1} . Round no {round+1}")
            this_round_accuracies = []
            this_round_fps = 0
            this_round_fns = 0
            classifier = NeuralNetClassifier(Model3, criterion=nn.CrossEntropyLoss,
                                 optimizer=torch.optim.Adam, optimizer__weight_decay=0.001,
                                 max_epochs = max_epochs, train_split=None,
                                 verbose=0, device=device, warm_start = True,)
            learner = ActiveLearner(estimator=classifier,X_training=X_0, y_training=y_0,query_strategy=strategy)
            y_pred = learner.predict(X_test)
            ini_acc = accuracy_score(y_test, y_pred)
            print(ini_acc)
            fold_acc_history = []
            for idx in range(n_queries):
                random_indices = np.random.choice(range(len(X_train)), size=math.floor(fraction*len(X_train)), replace=False)
                query_idx, query_instance = learner.query(X_train[random_indices], n_instances=n_instances)
                x_query = X_train[query_idx]
                y_query = y_train[query_idx]
                learner.teach(X=x_query, y=y_query)
                y_pred = learner.predict(X_test)

                true_labels = y_test.tolist()
                pred_labels = y_pred.tolist()
                for kk in range(len(true_labels)):
                    if (pred_labels[kk] == 1) & (true_labels[kk] == 0):
                        this_round_fps += 1
                    if (pred_labels[kk] == 0) & (true_labels[kk] == 1):
                        this_round_fns += 1

                val_acc = accuracy_score(y_test, y_pred)
                this_round_accuracies.append(val_acc)

                X_train = np.delete(X_train, query_idx, axis=0)
                y_train = np.delete(y_train, query_idx, axis=0)
                print(f"Fold no {fold+1} . Round no {round+1} . Query no {idx+1}: Acc = {val_acc:.2f}")
            rounds_accuracies.append(this_round_accuracies)
            rounds_fps.append(this_round_fps)
            rounds_fns.append(this_round_fns)
        all_rounds_mean_accuracy = np.mean(rounds_accuracies, axis=0)
        all_fold_acc_history.append(all_rounds_mean_accuracy)

        all_rounds_fps = np.mean(rounds_fps)
        all_rounds_fns = np.mean(rounds_fns)
        all_fold_false_positives.append(all_rounds_fps)
        all_fold_false_negatives.append(all_rounds_fns)

        print(f"Fold no {fold+1} . ALL ACC = {all_rounds_mean_accuracy}: total:{len(y_test)} : fps:{all_fold_false_positives} : nps:{all_fold_false_negatives}")
        timestamp = str(int(time.time()))  # Obtain the current timestamp
        with open('/content/drive/MyDrive/Ghost Lab/cog 2023/camera-ready/Mario-data/active/model_fold'+str(fold+1)+'-'+timestamp+'.pickle', 'wb') as handle:
            pickle.dump(classifier, handle)

    end_time = time.time()
    execution_time = end_time - start_time

    return  classifier, all_fold_acc_history , execution_time, all_fold_false_positives, all_fold_false_negatives



In [None]:
def random_query_strategy(classifier, X, n_instances=1):
    indices = list(range(len(X)))
    random.shuffle(indices)
    return indices[:n_instances]

#random_query_strategy
#margin_sampling
#entropy_sampling
#uncertainty_sampling


In [None]:
#@title margin sampling
import pickle

STRATEGY = margin_sampling
N_INSTANCES = 1
N_QUERIES = 100
MAX_EPOCHS = 50
FRACTION = 0.6
np.random.seed(75)  # Set the seed value to 42

# Define the size of the level
width = 29
height = 13
num_levels = len(level_strings)
levels = np.zeros((num_levels, 8,14,25))

for i, level_string in enumerate(level_strings):
    matrix = create_matrix(level_string)
    replaced = replace_characters(matrix)
    binary = create_binary_matrices(replaced)
    levels[i] = np.array(list(binary.values()))
labels = []
labels.extend(playable_level_labels)
labels.extend(unplayable_level_labels)
labels = np.array(labels)
print('the overal shape of X dataset: ' + str(levels.shape))
print('the overal shape of Y dataset: ' + str(labels.shape))

classifier_1, all_fold_acc_history_1, execution_time_1, fps_1,fns_1= train_active_learner(STRATEGY, N_QUERIES, N_INSTANCES,MAX_EPOCHS, FRACTION)

print(f"Execution time: {execution_time_1} seconds")

print(fps_1)
print(fns_1)

# Calculate average accuracy and standard error for each epoch
mean_accuracy_1 = np.mean(all_fold_acc_history_1, axis=0)
std_error_1 = np.std(all_fold_acc_history_1, axis=0) / np.sqrt(len(all_fold_acc_history_1))
std_dev_1 = np.std(all_fold_acc_history_1, axis=0)
# Plotting
epochs_1 = range(1, len(mean_accuracy_1) + 1)
# Calculate upper and lower bounds for the fill region
lower_bound_1 = mean_accuracy_1 - std_dev_1
upper_bound_1 = mean_accuracy_1 + std_dev_1

plt.errorbar(epochs_1, mean_accuracy_1, yerr=std_error_1, capsize=3)
plt.xlabel('Queries')
plt.ylabel('Accuracy')
plt.title('Average Accuracy over Epoch with Standard Error')
plt.show()

plt.plot(epochs_1, mean_accuracy_1, label='Average Accuracy')
plt.fill_between(epochs_1, lower_bound_1, upper_bound_1, alpha=0.3, label='Standard Deviation')
plt.xlabel('Queries')
plt.ylabel('Accuracy')
plt.title('Average Accuracy over Queries with Standard Deviation')
plt.legend()
plt.show()

from datetime import datetime
timestamp = str(int(time.time()))  # Obtain the current timestamp
with open('/content/drive/MyDrive/Ghost Lab/cog 2023/camera-ready/Mario-data/active/model_'+timestamp+'.pickle', 'wb') as handle:
    pickle.dump(classifier_1, handle)

json_object = json.dumps({
    'STRATEGY' : 'margin_sampling', 'num_instances' : N_INSTANCES,
    'num_queries' : N_QUERIES, 'epochs' : MAX_EPOCHS,
    'fraction': FRACTION,
    'weight_decay' : 0.001, 'learning_rate': '?',
    'execution_time' : execution_time_1}, indent=4)
with open('/content/drive/MyDrive/Ghost Lab/cog 2023/camera-ready/Mario-data/active/parameters-'+timestamp+'.json', 'w') as outfile:
    outfile.write(json_object)

with open('/content/drive/MyDrive/Ghost Lab/cog 2023/camera-ready/Mario-data/active/results-'+timestamp+'.pickle', 'wb') as handle:
    pickle.dump({'all_fold_acc_history' : all_fold_acc_history_1,'fps' : fps_1, 'fns': fns_1}, handle)


In [None]:
#@title random sampling
STRATEGY = random_query_strategy
N_INSTANCES = 1
N_QUERIES = 100
MAX_EPOCHS = 20
FRACTION = 0.55
np.random.seed(30)  # Set the seed value to 42

# Define the size of the level
width = 29
height = 13
num_levels = len(level_strings)
levels = np.zeros((num_levels, 8,14,25))

for i, level_string in enumerate(level_strings):
    matrix = create_matrix(level_string)
    replaced = replace_characters(matrix)
    binary = create_binary_matrices(replaced)
    levels[i] = np.array(list(binary.values()))
labels = []
labels.extend(playable_level_labels)
labels.extend(unplayable_level_labels)
labels = np.array(labels)
print('the overal shape of X dataset: ' + str(levels.shape))
print('the overal shape of Y dataset: ' + str(labels.shape))

classifier_2, all_fold_acc_history_2, execution_time_2, fps_2,fns_2= train_active_learner(STRATEGY, N_QUERIES, N_INSTANCES,MAX_EPOCHS, FRACTION)

print(f"Execution time: {execution_time_1} seconds")

print(fps_2)
print(fns_2)

# Calculate average accuracy and standard error for each epoch
mean_accuracy_2 = np.mean(all_fold_acc_history_2, axis=0)
std_error_2 = np.std(all_fold_acc_history_2, axis=0) / np.sqrt(len(all_fold_acc_history_2))
std_dev_2 = np.std(all_fold_acc_history_2, axis=0)
# Plotting
epochs_2 = range(1, len(mean_accuracy_2) + 1)
# Calculate upper and lower bounds for the fill region
lower_bound_2 = mean_accuracy_2 - std_dev_2
upper_bound_2 = mean_accuracy_2 + std_dev_2

plt.errorbar(epochs_2, mean_accuracy_2, yerr=std_error_2, capsize=3)
plt.xlabel('Queries')
plt.ylabel('Accuracy')
plt.title('Average Accuracy over Epoch with Standard Error')
plt.show()

plt.plot(epochs_2, mean_accuracy_2, label='Average Accuracy')
plt.fill_between(epochs_2, lower_bound_2, upper_bound_2, alpha=0.3, label='Standard Deviation')
plt.xlabel('Queries')
plt.ylabel('Accuracy')
plt.title('Average Accuracy over Queries with Standard Deviation')
plt.legend()
plt.show()

from datetime import datetime
timestamp = str(int(time.time()))  # Obtain the current timestamp
with open('/content/drive/MyDrive/Ghost Lab/cog 2023/camera-ready/Mario-data/active/model_'+timestamp+'.pickle', 'wb') as handle:
    pickle.dump(classifier_2, handle)

json_object = json.dumps({
    'STRATEGY' : 'random_sampling', 'num_instances' : N_INSTANCES,
    'num_queries' : N_QUERIES, 'epochs' : MAX_EPOCHS,
    'fraction': FRACTION,
    'weight_decay' : 0.001, 'learning_rate': '?',
    'execution_time' : execution_time_2}, indent=4)
with open('/content/drive/MyDrive/Ghost Lab/cog 2023/camera-ready/Mario-data/active/parameters-'+timestamp+'.json', 'w') as outfile:
    outfile.write(json_object)

with open('/content/drive/MyDrive/Ghost Lab/cog 2023/camera-ready/Mario-data/active/results-'+timestamp+'.pickle', 'wb') as handle:
    pickle.dump({'all_fold_acc_history' : all_fold_acc_history_2,'fps' : fps_2, 'fns': fns_2}, handle)

# Plots

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats



# Calculate average accuracy and standard error for each epoch
mean_accuracy_1 = np.mean(all_fold_acc_history_1, axis=0)
std_error_1 = np.std(all_fold_acc_history_1, axis=0) / np.sqrt(len(all_fold_acc_history_1))
std_dev_1 = np.std(all_fold_acc_history_1, axis=0)
# Plotting
epochs_1 = range(10, len(mean_accuracy_1) + 10)
# Calculate upper and lower bounds for the fill region
lower_bound_1 = mean_accuracy_1 - std_dev_1
upper_bound_1 = mean_accuracy_1 + std_dev_1


# Calculate average accuracy and standard error for each epoch
mean_accuracy_2 = np.mean(all_fold_acc_history_2, axis=0)

std_error_2 = np.std(all_fold_acc_history_2, axis=0) / np.sqrt(len(all_fold_acc_history_2))
std_dev_2 = np.std(all_fold_acc_history_2, axis=0)
# Plotting
epochs_2 = range(10, len(mean_accuracy_2) + 10)
print(epochs_2)
# Calculate upper and lower bounds for the fill region
lower_bound_2 = mean_accuracy_2 - std_dev_2
upper_bound_2 = mean_accuracy_2 + std_dev_2

plt.plot(epochs_1,  mean_accuracy_1, color='b', linestyle='-', label='Margin Sampling')
plt.fill_between(epochs_1, lower_bound_1, upper_bound_1, alpha=0.3)
plt.plot(epochs_2,  mean_accuracy_2,color='r', linestyle='-', label='Random Sampling')
plt.fill_between(epochs_2, lower_bound_2, upper_bound_2, alpha=0.3)
#plt.axhline(y=np.mean(all_fold_acc_history), color='g', linestyle='--', label='Passive Learner (2480 samples)')
plt.ylim(0.45, 0.8)
#plt.xlim(10, 410)
#plt.xticks(np.arange(10,410, 50))
plt.xlabel('#Samples')
plt.ylabel('Accuracy')
plt.title('Average Accuracy over Samples')
plt.legend()
plt.show()