# Visual experiment source code
Used to gather quantitative data on visual pattern recognition

In [None]:
import numpy as np
import ipywidgets as wd
import matplotlib.pyplot as plt
import os
import pickle
from IPython.display import clear_output
from enum import Enum, unique
from datetime import datetime
from random import choice

## Shape enumeration class
Defines possible shapes

To add a new shape, create a new enumeration and a corresponding method which returns an array with shape (\*, 2) containing the points outlining the shape

In [None]:
@unique
class Shape(Enum):
    TRIANGLE = 'triangle'
    RECTANGLE = 'rectangle'
    NOTHING = 'nothing'
    
    def get_shape(self):
        return getattr(self, f"get_{self.value}")
    
    def get_rectangle(self):
        x, y = self.get_orthogonal_vectors()
        y1, y2 = (x[:,0]+y).reshape(-1, 1), (x[:,1]+y).reshape(-1, 1)
        x = np.concatenate([y1, x, y2], axis=1)
        return x
    
    def get_triangle(self):
        x, y = self.get_orthogonal_vectors()
        y = (x[:,0]+x[:,1])/2 + y
        x = np.append(x, y.reshape(2, 1), axis=1)
        return x

    def get_orthogonal_vectors(self):
        x = self.create_line()
        y = self.get_orthogonal(x[:,0]-x[:,1])*(np.random.rand(1)/2+0.3)
        return x, y
        
    # Create a random line used to build up other shapes
    @staticmethod
    def create_line():
        x = np.random.rand(2, 2)
        while(np.linalg.norm(x[:,0]-x[:,1]) < .3):
            x = np.random.rand(2, 2)
        return x
    
    @staticmethod
    def get_orthogonal(x):
        x /= np.linalg.norm(x) # normalize x
        y = np.random.rand(2)
        y = y - np.dot(x, y) * x
        return y/np.linalg.norm(y) # return orthonormal vector to x
    
    def get_nothing(self):
        return np.asarray([[], []])

## ShapeType enumeration class

Used to modify the type of the shape in the figure. The current shapes offer these transformations:
* Single - plot single salient points outlining a shape
* Multiple - stochastically add points on the lines connecting outlining points
* Invisible - remove points in close vicinity to the lines connecting the outlining points

In [None]:
# Every <ShapeType>_shape_type method must return noise and shape_points
# If original points of the shape should not be plotted, shape_points
# should either not include them or return empty array with shape (2, 0)
@unique
class ShapeType(Enum):
    SINGLE = 'single'
    MULTIPLE = 'multiple'
    INVISIBLE = 'invisible'
    
    # Refactor the points defining a shape into a specific type
    def transform_data(self, noise, points):
        return getattr(self, f"{self.value}_shape_type")(noise, points)
    
    def single_shape_type(self, noise, points):
        return noise, points
    
    def multiple_shape_type(self, noise, points):
        size = points.shape[-1]
        for i in range(size):
            interpolated = self.interpolate(points[:,i], points[:,(i+1)%size])
            shape_points = self.add_noise(self.select_points(interpolated))
            points = np.concatenate([points, shape_points], axis=1)
        return noise, points
    
    def add_noise(self, points):
        return points + np.random.randn(*points.shape)*self.std
    
    def select_points(self, line):
        return np.transpose([point for point in line if np.random.rand() > self.acceptance])
    
    def invisible_shape_type(self, noise, points):
        size = points.shape[-1]
        for i in range(size):
            interpolated = self.interpolate(points[:,i], points[:,(i+1)%size])
            noise = self.remove_points(noise, interpolated)
        return noise, np.asarray([[], []])
    
    def interpolate(self, x, y):
        return np.linspace(x, y, int(np.linalg.norm(x - y)*self.density))
    
    def remove_points(self, noise, line):
        for local in line:
            noise = np.delete(noise, self.find_in_local(noise, local), axis=1)
        return noise
    
    # return indices of points for deletion
    def find_in_local(self, noise, local):
        return [i for i, x in enumerate(np.transpose(noise)) if self.in_local(x, local)]
    
    def in_local(self, point, local):
        return point[0] > local[0] - self.margin and point[0] < local[0] + self.margin \
            and point[1] > local[1] - self.margin and point[1] < local[1] + self.margin

ShapeType.std = 0.005
ShapeType.acceptance = 0.75
ShapeType.margin = 0.011
ShapeType.density = 100

## FigureManager class
Used to manage the generation of plots and figures

In [None]:
class FigureManager():
    MARGIN = 0.05
    FONT_SIZE = 24
    def __init__(self, shape_type=ShapeType.SINGLE, multiple=False, shape=(12,10), s=60, a=0.6, color='b', size=300):
        self.type = shape_type
        self.multiple = multiple
        self.figsize = shape
        self.s, self.a, self.color = s, a, color
        self.num_of_points = size
        self.show_lines = False
    
    # Method used to create a scatter plot of random points with hidden shape
    def create_figure(self):
        shape = choice(list(Shape))
        noise, shapes, shape_points = self.create_shape(shape)
        current = {'noise': noise,
                   'shapes': shapes,
                   'shape_points': shape_points,
                   'real': shape.value,
                   'time': datetime.now(),
                   'type': self.type}
        self.plot_figure(noise, shapes, shape_points)
        plt.show()
        return current, shape
    
    def create_shape(self, shape):
        noise = np.random.rand(2, self.num_of_points)
        if self.multiple:
            noise, shapes, shape_points = self.generate_multiple_points(shape, noise)
        else:
            shape = self.generate_points(shape.get_shape())
            noise, shape_points = self.type.transform_data(noise, shape)
            shapes, shape_points = [shape], [shape_points]
        return noise, shapes, shape_points
        
    def generate_multiple_points(self, shape, noise):
        x, shape_points = [], []
        for _ in range(np.random.randint(2, 5)):
            x_i = self.generate_points(shape.get_shape())
            noise, shape_points_i = self.type.transform_data(noise, x_i)
            x.append(x_i)
            shape_points.append(shape_points_i)
        return noise, x, shape_points
    
    def generate_points(self, func):
        x = func()
        # While loop makes sure none of the points constituting 
        # the shape are too close to the edge of the plot
        while any(x.reshape(-1) > 1-self.MARGIN) or any(x.reshape(-1) < self.MARGIN):
            x = func()
        return x

    def plot_figure(self, noise, shapes, shape_points, fig=None):
        if fig is None: _, fig = plt.subplots(1, 1, figsize=self.figsize)
        fig.scatter(*noise, color=self.color, alpha=self.a)
        for shape, shape_points in zip(shapes, shape_points):
            self.plot_shape(fig, shape, shape_points)
    
    def plot_shape(self, fig, shape, shape_points):
        fig.scatter(*shape_points, color=self.color, s=self.s)
        if self.show_lines: self.draw_lines(fig, shape)
        fig.axis('off')
    
    # The lines connecting the 'hidden' points constituting the shape
    @staticmethod
    def draw_lines(fig, x):
        size = x.shape[-1]
        for i in range(size):
            i = (np.arange(2) + i)%size
            fig.plot(x[0,i], x[1,i], linewidth=2, color='r')
    
    def show_incorrect(self, axs, incorrect):
        axs = axs.reshape(-1, 2)
        self.show_lines = False
        self.show_figures(incorrect, axs[:,0], 'guess', template='Guessed: {}')
        self.show_lines = True
        self.show_figures(incorrect, axs[:,1], 'real', template='Real answer: {}')
            
    def show_figures(self, incorrect, axs, title_key, template="{}"):
        for incorrect, ax in zip(incorrect, axs):
            self.plot_figure(incorrect['noise'], incorrect['shapes'], shape_points=incorrect['shape_points'], fig=ax)
            ax.set_title(template.format(incorrect[title_key]), size=self.FONT_SIZE)
    
    def line_switch(self, b):
        if type(b['new']) == bool: self.show_lines = b['new']

## WidgetManager class
Used to manage the creation and handling of widgets in an attempt to break apart the Experiment class

In [None]:
class WidgetManager:
    def __init__(self):
        self.widgets = []
    
    def create_begin_button(self):
        self.begin_button = wd.Button(description="Begin", layout=wd.Layout(width='10cm', height='2cm'))
        self.begin_button.on_click(self.begin_button_clicked)
        self.output = wd.Output()
        display(self.output, self.begin_button)
        return self.begin_button
    
    def begin_button_clicked(self, b):
        with self.output:
            self.begin_button.layout.display = 'none'
    
    def create_buttons(self):
        buttons = [wd.Button(description=shape.value.title(),
                             layout=wd.Layout(width='6cm', height='2cm'))
                   for shape in Shape]
        self.output = wd.Output()
        self.widgets += buttons
        return buttons
    
    def create_checkbox(self):
        lines = wd.Checkbox(description="Lines", indent=False)
        self.widgets.append(lines)
        return lines
    
    def place_layout(self):
        display(self.output, wd.HBox(self.widgets))
    
    def clear_widgets(self):
        for widget in self.widgets:
            widget.layout.display = 'none'

## ExperimentInitializer
Helper class to initialize some parameters of the experiment

In [None]:
class ExperimentInitializer:
    DEFAULT_TEST_NUM = 30
    MODES = ['mixed', 'single', 'multiple', 'invisible', 'debug']
    LEGAL_ARGS = ['shape_type', 'shape', 's', 'a', 'color', 'size']
    OPTIMIZED_SINGLE_DICT = [{'shape_type': ShapeType.SINGLE, 's': 60, 'a': 0.7}]
    OPTIMIZED_MULTIPLE_DICT = [{'shape_type': ShapeType.MULTIPLE, 's': None, 'a': None, 'size': 650}]
    OPTIMIZED_INVISIBLE_DICT = [{'shape_type': ShapeType.INVISIBLE, 's': None, 'a': None, 'size': 1000}]
    
    def initialze(self, mode, multiple, **kwargs):
        self.line_checkbox = False
        self.num_of_tests = self.DEFAULT_TEST_NUM
        if mode in self.MODES: args = getattr(self, f"{mode}_init")()
        else: args = self.custom_init(**kwargs)
        fms = [FigureManager(multiple=multiple, **args) for args in args]
        return fms, self.num_of_tests, self.line_checkbox
    
    def single_init(self):
        return self.OPTIMIZED_SINGLE_DICT
    
    def multiple_init(self):
        return self.OPTIMIZED_MULTIPLE_DICT
    
    def invisible_init(self):
        return self.OPTIMIZED_INVISIBLE_DICT
    
    def mixed_init(self):
        self.num_of_tests = 50 
        return self.create_all()
    
    def debug_init(self):
        self.num_of_tests = 999999
        self.line_checkbox = True 
        return self.create_all()
    
    # ---------ONLY PARTIALLY IMPLEMENTED---------
    def custom_init(self, **kwargs):
        if 'num_of_tests' in kwargs.keys(): self.num_of_tests = kwargs['num_of_tests']
        args = {}
        for key in set(kwargs.keys()).intersection(self.LEGAL_ARGS):
            args[key] = kwargs[key]
        return [args]
    
    # Returns the optimized attributes of all ShapeTypes
    def create_all(self):
        return np.squeeze([getattr(self, key)() for key in dir(self) if self.is_valid(key)])
    
    @staticmethod
    def is_valid(key):
        return key.endswith('_init') \
            and not key.startswith('mixed') \
            and not key.startswith('debug') \
            and not key.startswith('custom')

## Experiment class
Responsible for creating and running an experiment as well as calculating and displaying the results.

Different modes correspond to different ShapeTypes and parameters tuned for each ShapeType. The available modes are:
* single
* multiple
* invisible
* mixed
* debug (provides the ability to turn lines on and runs indefinitely)

If multiple is set to True, then between 2 and 4 shapes will be generated withing one figure.

If none of the modes are satisfactory, parameters can be provided instead for a custom experiment

In [None]:
class Experiment:
    def __init__(self, mode='custom', multiple=False, **kwargs):
        self.save_args(mode, multiple, **kwargs)
        self.wm = WidgetManager()
        self.num_of_completed = 0
        self.line_checkbox = False
        self.current = None
        self.incorrect = []
        self.generated_shapes, self.guessed_shapes, self.response_times = *np.asarray([[],[],[]]),
        self.fms, self.num_of_tests, self.line_checkbox = ExperimentInitializer().initialze(mode, multiple, **kwargs)
    
    def save_args(self, mode, multiple, **kwargs):
        self.args = kwargs
        self.args['mode'] = mode
        self.args['multiple'] = multiple
    
    def start(self):
        button = self.wm.create_begin_button()
        button.on_click(self.begin_experiment)
        
    def begin_experiment(self, b):
        with self.wm.output:
            buttons = self.wm.create_buttons()
            if self.line_checkbox: self.set_observe(self.wm.create_checkbox())
            self.wm.place_layout()
        
        for button, shape in zip(buttons, Shape):
            button.on_click(getattr(self, f"{shape.value}_button_clicked"))
        
        # Create a shape with the new output
        with self.wm.output:
            self.current, shape = self.create_figure()
        self.generated_shapes = np.append(self.generated_shapes, shape)
    
    def set_observe(self, checkbox):
        for fm in self.fms:
            checkbox.observe(fm.line_switch)
    
    def triangle_button_clicked(self, b):
        self.button_clicked(Shape.TRIANGLE)

    def rectangle_button_clicked(self, b):
        self.button_clicked(Shape.RECTANGLE)

    def nothing_button_clicked(self, b):
        self.button_clicked(Shape.NOTHING)
    
    def button_clicked(self, shape):
        self.add_answer(shape)
        self.num_of_completed += 1
        with self.wm.output:
            clear_output()
            self.next_test()
    
    def next_test(self):
        if self.tests_completed():
            self.wm.clear_widgets()
            print('Tests completed')
        else:
            self.current, shape = self.create_figure()
            self.generated_shapes = np.append(self.generated_shapes, shape)
    
    def tests_completed(self):
        return self.num_of_completed >= self.num_of_tests
    
    def create_figure(self):
        return choice(self.fms).create_figure()
    
    def add_answer(self, answer):
        self.current['time'] = (datetime.now() - self.current['time']).total_seconds()
        # If incorrect, add to incorrect guess list
        if answer.value != self.current['real']:
            self.current['guess'] = answer.value
            self.incorrect.append(self.current)
        self.guessed_shapes = np.append(self.guessed_shapes, answer)
        self.response_times = np.append(self.response_times, self.current['time'])
        
    def results(self):
        print(f"Accuracy: {self.get_score():.3f}")
        if len(self.response_times) > 0:
            print(f"Mean response time: {self.response_times.mean():.3f}")
            print(f"Response time standard deviation: {self.response_times.std():.3f}")
        
        if len(self.incorrect) > 0: self.show_incorrect()
    
    def get_score(self):
        if len(self.guessed_shapes) > 0:
            gs = self.generated_shapes if len(self.generated_shapes) == len(self.guessed_shapes) else self.generated_shapes[:-1]
            return sum(gs == self.guessed_shapes)/len(self.guessed_shapes)
        return 0
        
    def show_incorrect(self):
        _, axs = plt.subplots(len(self.incorrect), 2, figsize=(26, 10*len(self.incorrect)))
        axs = axs.reshape(-1, 2)
        if hasattr(self, 'fms'):
            self.show_incorrect_individually(axs)
        else:
            self.fm.show_incorrect(axs, self.incorrect)
        plt.show()
    
    def show_incorrect_individually(self, axs):
        for i, incorrect in enumerate(self.incorrect):
            self.find_fm(incorrect).show_incorrect(axs[i], [incorrect])
                
    def find_fm(self, incorrect):
        for fm in self.fms:
            if incorrect['type'] == fm.type:
                return fm
    
    def clear_score(self):
        self.generated_shapes = np.asarray([self.generated_shapes[-1]])
        self.guessed_shapes, self.response_times = *np.asarray([[],[]]),
        self.incorrect = []
        
    def pickle(self, path):
        if not os.path.exists(path): os.makedirs(path)
        if self.args['multiple']: path = os.path.join(path, f"multiple {self.args['mode']}.pkl")
        else: path = os.path.join(path, f"{self.args['mode']}.pkl")
        with open(path, 'wb') as out:
            pickle.dump(self.args, out, pickle.DEFAULT_PROTOCOL)
            pickle.dump(self.generated_shapes, out, pickle.DEFAULT_PROTOCOL)
            pickle.dump(self.guessed_shapes, out, pickle.DEFAULT_PROTOCOL)
            pickle.dump(self.response_times, out, pickle.DEFAULT_PROTOCOL)
            pickle.dump(self.incorrect, out, pickle.DEFAULT_PROTOCOL)
    
    @staticmethod
    def from_pickle(path):
        with open(path, 'rb') as inp:
            kwargs = pickle.load(inp)
            e = Experiment(**kwargs)
            e.generated_shapes = pickle.load(inp)
            e.guessed_shapes = pickle.load(inp)
            e.response_times = pickle.load(inp)
            e.incorrect = pickle.load(inp)
        return e