### Notebook for concept detection in neural network

In [1]:
%load_ext autoreload
%autoreload 2

In [None]:
import numpy as np
import os
import sys
from tqdm import tqdm
import json
import random
import string

sys.path.append(os.path.abspath(os.path.join(os.path.pardir, 'src')))

from concepts import static_concepts, linear_regression
import env
from policy import ConvNet

model_name = "net"
session_name = "endeavor"
board_size = 5
board_name = f'{board_size}x{board_size}'

agents_to_sample = [0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]

full_model_path = f"../models/saved_sessions/board_size_{board_size}/{session_name}/"

CONCEPT_FUNC = static_concepts.play_center_in_opening

CONCEPT_NAME = static_concepts.play_center_in_opening.__name__

CASES_TO_SAMPLE = 2500 # 25000


In [None]:
def load_model(full_name, model_name, epoch):
    model_path = full_name + model_name + "_" + str(epoch) + ".keras"
    model = ConvNet(board_size, model_path)
    return model

agents = [load_model(full_model_path, model_name, epoch) for epoch in agents_to_sample]

In [None]:
def play_match(agents, board_size, concept_function):
    go_env = env.GoEnv(board_size)
    go_env.reset()

    player_to_start = 1 if np.random.random() > 0.5 else 0
    current_player = player_to_start

    total_moves = board_size * board_size * 4
    moves = 0
    random_moves = False

    positive_cases = []
    negative_cases = []

    sample_ratio = 0.8
    alpha = 1

    game_over = False

    prev_turn_state = np.zeros((board_size, board_size))
    temp_prev_turn_state = np.zeros((board_size, board_size))
    prev_opposing_state = np.zeros((board_size, board_size))

    while not game_over:
        if moves == total_moves:
            break

        state = go_env.canonical_state()

        valid_moves = go_env.valid_moves()

        if current_player == 0:
            state_copy = np.array([state[0], prev_turn_state, state[1], prev_opposing_state, np.zeros((board_size, board_size))])
        else:
            state_copy = np.array([state[0], prev_turn_state, state[1], prev_opposing_state, np.ones((board_size, board_size))])
    
        if np.random.random() < sample_ratio:
            pos = concept_function(state)
            if pos:
                positive_cases.append(state_copy)
            elif not pos:
                negative_cases.append(state_copy)

        if random_moves:
            action = go_env.uniform_random_action()
        else:
            action = agents[current_player].best_action(state_copy, valid_moves, alpha=alpha)

        _, _, game_over, _ = go_env.step(action)

        moves += 1

        current_player = 1 - current_player
        # Update the previous state
        prev_turn_state = temp_prev_turn_state
        prev_opposing_state = state[0]
        temp_prev_turn_state = prev_opposing_state
    
    return positive_cases, negative_cases

positive_cases = []
negative_cases = []

positive_bar = tqdm(total=CASES_TO_SAMPLE, desc="Positive cases")

while len(positive_cases) < CASES_TO_SAMPLE or len(negative_cases) < CASES_TO_SAMPLE:
    for i in range(len(agents)):
        for j in range(i + 1, len(agents)):
            pos, neg = play_match([agents[i], agents[j]], board_size, CONCEPT_FUNC)

            positive_cases.extend(pos)
            negative_cases.extend(neg)

            positive_bar.update(len(pos))

positive_cases = positive_cases[:CASES_TO_SAMPLE]
negative_cases = negative_cases[:CASES_TO_SAMPLE]

# Create numpy arrays
positive_cases = np.array(positive_cases)
negative_cases = np.array(negative_cases)

In [None]:
# Print shapes
print("Positive cases: ", positive_cases.shape)
print("Negative cases: ", negative_cases.shape)

In [6]:
# Test if there are any duplicates accross the two sets
for i in range(len(positive_cases)):
    for j in range(len(negative_cases)):
        if np.array_equal(positive_cases[i], negative_cases[j]):
            print("Duplicate found!")
            print(positive_cases[i])
            print(negative_cases[j])
            break

In [None]:
# Find the number of any duplicates within the positive cases
duplicate_count = 0
for i in range(len(positive_cases)):
    for j in range(i + 1, len(positive_cases)):
        if np.array_equal(positive_cases[i], positive_cases[j]):
            duplicate_count += 1
            break

print("Duplicate count in positive cases: ", duplicate_count)

# Find the number of any duplicates within the negative cases
duplicate_count = 0
for i in range(len(negative_cases)):
    for j in range(i + 1, len(negative_cases)):
        if np.array_equal(negative_cases[i], negative_cases[j]):
            duplicate_count += 1
            break

print("Duplicate count in negative: ", duplicate_count)

In [None]:
# Print all the positive cases
for i in range(len(positive_cases)):
    print(f"Positive case {i}:")
    print(positive_cases[i])
    print()

In [None]:
# Positions to consider are 80% of the total positions
POSITIONS_TO_CONSIDER = int(0.8 * positive_cases.shape[0] * 2)
print(f"Positions to consider: {POSITIONS_TO_CONSIDER}")
#POSITIONS_TO_CONSIDER = 4000 #40000
VALIDATION_POSITIONS = 10000 #10000

BINARY = True

In [None]:
# First test if the concept can be regressed form the inputs
name = "input"
all_cases = np.concatenate([positive_cases, negative_cases])
all_labels = [1] * positive_cases.shape[0] + [0] * negative_cases.shape[0]
all_labels = np.array(all_labels)
shuffled_indices = np.arange(all_labels.shape[0])

np.random.shuffle(shuffled_indices)

all_cases = all_cases[shuffled_indices]
all_labels = all_labels[shuffled_indices]

points = all_cases.reshape(all_cases.shape[0], -1)

# Use the regression
score = linear_regression.perform_regression(
    points=points[:POSITIONS_TO_CONSIDER], 
    targets=all_labels[:POSITIONS_TO_CONSIDER], 
    validation_points=points[POSITIONS_TO_CONSIDER:], 
    validation_targets=all_labels[POSITIONS_TO_CONSIDER:], 
    is_binary=BINARY,
    epochs=10
)

print("Regression score: ", score)

# Remove the files if they exist
if os.path.exists("../concept_presences/static/{}/{}/{}/".format(board_name, session_name, CONCEPT_NAME)):
    # Test if epoch folder exists
    if os.path.exists("../concept_presences/static/{}/{}/{}/{}".format(board_name, session_name, CONCEPT_NAME, name)):
        # Remove all files in the epoch folder
        # Fist save the 
        for file in os.listdir("../concept_presences/static/{}/{}/{}/{}".format(board_name, session_name, CONCEPT_NAME, name)):
            os.remove("../concept_presences/static/{}/{}/{}/{}/{}".format(board_name, session_name, CONCEPT_NAME, name, file))

os.makedirs("../concept_presences", exist_ok=True)
os.makedirs("../concept_presences/static", exist_ok=True)
os.makedirs("../concept_presences/static/{}".format(board_name), exist_ok=True)
os.makedirs("../concept_presences/static/{}/{}".format(board_name, session_name), exist_ok=True)
os.makedirs("../concept_presences/static/{}/{}/{}".format(board_name, session_name, CONCEPT_NAME), exist_ok=True)
os.makedirs("../concept_presences/static/{}/{}/{}/{}".format(board_name, session_name, CONCEPT_NAME, name), exist_ok=True)

random_suffix = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))

# Save concept presences in json file
with open("../concept_presences/static/{}/{}/{}/{}/{}.json".format(board_name, session_name, CONCEPT_NAME, name, random_suffix), "w") as f:
    json.dump(score, f)

In [None]:
epochs_to_look_at = agents_to_sample

for epoch in epochs_to_look_at:
    path = full_model_path + model_name + "_" + str(epoch) + ".keras"
    model = ConvNet(board_size, path)

    # Will have a length equal to the sum of the numer of rows in the positive and negative cases arrays
    # And will contain 1s for positive cases and 0s for negative cases
    # Is used as labels/targets for the regression
    all_cases = np.concatenate([positive_cases, negative_cases])
    all_labels = [1] * positive_cases.shape[0] + [0] * negative_cases.shape[0]
    all_labels = np.array(all_labels)
    shuffled_indices = np.arange(all_labels.shape[0])

    np.random.shuffle(shuffled_indices)

    all_cases = all_cases[shuffled_indices]
    all_labels = all_labels[shuffled_indices]

    concept_presences = {}
    
    outputs = model.get_all_activation_values(all_cases)

    # Merge outputs
    merged_outputs = []
    for output_batch in outputs:
        for i, output_layer in enumerate(output_batch):
            if len(merged_outputs) <= i:
                merged_outputs.append([])
            merged_outputs[i].extend(output_layer)

    for i, layer_output in enumerate(merged_outputs):
        merged_outputs[i] = np.array(merged_outputs[i])
    
    outputs = merged_outputs

    # Perform regression
    concept_presence_per_layer = []
    for (i, output) in enumerate(outputs):
        points = output.reshape((output.shape[0], np.prod(output.shape[1:])))
        # So one has (n, k) samples where n is the number of positions, and k is the total number of activation values in layer i.
        print(f"Performing regression for layer {i}")
        score = linear_regression.perform_regression(
            points=points[:POSITIONS_TO_CONSIDER], 
            targets=all_labels[:POSITIONS_TO_CONSIDER], 
            validation_points=points[POSITIONS_TO_CONSIDER:], 
            validation_targets=all_labels[POSITIONS_TO_CONSIDER:], 
            is_binary=BINARY,
            epochs=10
        )
        concept_presence_per_layer.append(score)

        print(f"The presence of {CONCEPT_NAME} in resblock {i} is {score}")
    concept_presences[CONCEPT_NAME] = concept_presence_per_layer

    # Remove the files if they exist
    if os.path.exists("../concept_presences/static/{}/{}/{}/".format(board_name, session_name, CONCEPT_NAME)):
        # Test if epoch folder exists
        if os.path.exists("../concept_presences/static/{}/{}/{}/{}".format(board_name, session_name, CONCEPT_NAME, epoch)):
            # Remove all files in the epoch folder
            # Fist save the 
            for file in os.listdir("../concept_presences/static/{}/{}/{}/{}".format(board_name, session_name, CONCEPT_NAME, epoch)):
                os.remove("../concept_presences/static/{}/{}/{}/{}/{}".format(board_name, session_name, CONCEPT_NAME, epoch, file))

    os.makedirs("../concept_presences", exist_ok=True)
    os.makedirs("../concept_presences/static", exist_ok=True)
    os.makedirs("../concept_presences/static/{}".format(board_name), exist_ok=True)
    os.makedirs("../concept_presences/static/{}/{}".format(board_name, session_name), exist_ok=True)
    os.makedirs("../concept_presences/static/{}/{}/{}".format(board_name, session_name, CONCEPT_NAME), exist_ok=True)
    os.makedirs("../concept_presences/static/{}/{}/{}/{}".format(board_name, session_name, CONCEPT_NAME, epoch), exist_ok=True)

    random_suffix = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))

    # Save concept presences in json file
    with open("../concept_presences/{}/{}/{}/{}/{}.json".format(board_name, session_name, CONCEPT_NAME, epoch, random_suffix), "w") as f:
        json.dump(concept_presences[CONCEPT_NAME], f)
