A while ago I had an idea for an optimization algorithm which uses a "local" search heuristic and a "global" search heuristic and switches between them automatically depending on how long it has been since each algorithm has seen an improvement. That way we don't waste time exploring new points with random search when we've clearly found a good reigon we should exploit with SGD, and similarly we don't let SGD just fall to a local optimum and then sit there doing nothing when we could explore with RS.

The heuristic of "switch to an algorithm with probability proportional to its rate of improvement" is a good enough heuristic but probably isn't quite optimal (win-stay-lose-switch performs well on multi-armed bandits, maybe that could work here?) but in this code I'm curious about using human control with multithreading. The human has access to a a GUI which shows an updating graph of the best loss over time. At any point the human can enter the command "S" for "Switch" and the algoritm will switch to the other search heuristic. Similarly they can enter "L" for the "left" heuristic and "R" for the "right" one. The point is, does a human-controlled two-handed optimizer beat the current switching heuristic? How does it compare to win-stay-lose-switch?

We're assuming that the bottle neck is *objective function calls* here rather than time itself. Maybe it's just really expensive (in terms of time, money, or some other important resource) to call the objective function. We have to slow the HCTH to give the human enough time to respond so it will perform worse in terms of runtime, but possibly better in terms of objective function calls.

#### Imports

In [1]:
import numpy as np
import random as r
import matplotlib.pyplot as plt
import threading
import time

#For GUI construction
%matplotlib tk

In [2]:
#How many iterations the algorithms get
numIt = 250

#### Objective Function

Michalewicz is an interesting optimization function. It has a large number of local minima, which can cause algorithms to become trapped in local minima. It can also be defined on any number of dimensions, which makes it an interesting function for high-dimensional optimization problems.

Reference: https://www.sfu.ca/~ssurjano/michal.html

In this task we'll maximize -1 times Michalewicz.

In [3]:
lowBound = 0
highBound = np.pi
numDimensions = 10 #Usually 10+, but 2 is better for plotting
trigPower = 10 #"m" in the definition
ranges = []
for i in range(numDimensions):
    ranges.append([-lowBound, highBound])
#Note: global minima values are known for some specific values of trigPower

def michalewicz(x):
    output = 0
    for i in range(len(x)):
        output -= np.sin(x[i]) * (np.sin((i + 1) * (x[i] ** 2) / np.pi)) ** (2 * trigPower)
    return(output)

def objective(x):
    return(-michalewicz(x))

# Algorithms

### Automated Two-Hand

In [4]:
class TwoHanded:
    def __init__(
        self,
        ranges,
        leftPoints = 1,
        rightPoints = 1
    ):
        self.ranges = ranges
        self.leftPoints = leftPoints
        self.rightPoints = rightPoints

        #GUI Graph
        self.bestHistory = []
        self.fig, self.ax = plt.subplots()
        self.ax.set_xlabel("Iteration")
        self.ax.set_ylabel("Best objective value so far")
        self.line, = self.ax.plot([], [], "b-")
        plt.ion()
        plt.show()

    def _updateGraph(self):
        self.line.set_xdata(list(range(len(self.bestHistory))))
        self.line.set_ydata(self.bestHistory)
        self.ax.relim()
        self.ax.autoscale_view()
        plt.pause(0.0001)

    def selectLeftOrRight(self):
        probLeft = self.leftPoints / (self.leftPoints + self.rightPoints)
        if r.uniform(0, 1) < probLeft:
            return("L")
        else:
            return("R")

    #Random search's hypothesis selector.
    def generateRandomHypothesis(self):
        output = []
        for x in self.ranges:
            output.append(r.uniform(x[0], x[1]))
        return(output)

    #One iteration of gradient descent
    def gradientStep(self, objectiveFunction, currentX, dx = 10 ** (-10),
                     learnRate = 0.01):
        gradientVector = []
        for i in range(len(currentX)):
            cloneX = [x for x in currentX]
            Y1 = objectiveFunction(cloneX)
            cloneX[i] += dx
            Y2 = objectiveFunction(cloneX)
            gradientVector.append((Y2 - Y1)/dx)
        outputX = [x for x in currentX]
        for i in range(len(outputX)):
            outputX[i] += gradientVector[i] * learnRate
        return(outputX)

    def optimize(self, objectiveFunction, numIterations = numIt):
        currentX = [np.mean(x) for x in self.ranges]
        currentY = objectiveFunction(currentX)

        for i in range(numIterations):
            leftRightDecision = self.selectLeftOrRight()

            if leftRightDecision == "L":
                newX = self.generateRandomHypothesis()
                newY = objectiveFunction(newX)

                if newY > currentY:
                    currentY = newY
                    currentX = newX
                    self.leftPoints += 1
                else:
                    self.rightPoints += 1

            else:
                if currentX == None:
                    currentX = [np.mean(x) for x in self.ranges]

                newX = self.gradientStep(objectiveFunction, currentX)
                newY = objectiveFunction(newX)

                if newY > currentY:
                    currentY = newY
                    currentX = newX
                    self.rightPoints += 1
                else:
                    self.leftPoints += 1

            #Update graph
            self.bestHistory.append(currentY)
            self._updateGraph()

        return(currentX)

### Human-Controlled Two-Hand

In [5]:
class HumanControlledTwoHanded:
    def __init__(
        self,
        ranges,
        leftPoints = 1,
        rightPoints = 1
    ):
        self.ranges = ranges
        self.leftPoints = leftPoints
        self.rightPoints = rightPoints

        self.bestHistory = []
        self.fig, self.ax = plt.subplots()
        self.ax.set_xlabel("Iteration")
        self.ax.set_ylabel("Best objective value so far")
        self.line, = self.ax.plot([], [], "b-")
        plt.ion()
        plt.show()

        self.humanCommand = None
        self.running = True
        self.currentChoice = "L"

        self.speed = 1

        self.inputThread = threading.Thread(target=self._listenForInput)
        self.inputThread.daemon = True
        self.inputThread.start()

    def _listenForInput(self):
        while self.running:
            cmd = input().strip().lower()
            if cmd in ["s", "l", "r", "x"]:
                self.humanCommand = cmd
            elif len(cmd) == 1 and cmd.isdigit():
                self.humanCommand = cmd

    def _updateGraph(self):
        self.line.set_xdata(list(range(len(self.bestHistory))))
        self.line.set_ydata(self.bestHistory)
        self.ax.relim()
        self.ax.autoscale_view()
        plt.pause(0.0001)

    def _applyHumanCommand(self):
        if self.humanCommand == None:
            return

        if self.humanCommand == "x":
            self.running = False

        elif self.humanCommand == "s":
            if self.currentChoice == "L":
                self.currentChoice = "R"
            else:
                self.currentChoice = "L"

        elif self.humanCommand == "l":
            self.currentChoice = "L"

        elif self.humanCommand == "r":
            self.currentChoice = "R"

        elif self.humanCommand.isdigit():
            self.speed = int(self.humanCommand)

        self.humanCommand = None

    def generateRandomHypothesis(self):
        output = []
        for x in self.ranges:
            output.append(r.uniform(x[0], x[1]))
        return(output)

    def gradientStep(self, objectiveFunction, currentX, dx = 10 ** (-10),
                     learnRate = 0.01):
        gradientVector = []
        for i in range(len(currentX)):
            cloneX = [x for x in currentX]
            Y1 = objectiveFunction(cloneX)
            cloneX[i] += dx
            Y2 = objectiveFunction(cloneX)
            gradientVector.append((Y2 - Y1)/dx)
        outputX = [x for x in currentX]
        for i in range(len(outputX)):
            outputX[i] += gradientVector[i] * learnRate
        return(outputX)

    def optimize(self, objectiveFunction, numIterations = numIt):
        currentX = [np.mean(x) for x in self.ranges]
        currentY = objectiveFunction(currentX)

        for i in range(numIterations):
            if not self.running:
                break

            self._applyHumanCommand()

            if self.currentChoice == "L":
                newX = self.generateRandomHypothesis()
                newY = objectiveFunction(newX)

                if newY > currentY:
                    currentY = newY
                    currentX = newX

            else:
                newX = self.gradientStep(objectiveFunction, currentX)
                newY = objectiveFunction(newX)

                if newY > currentY:
                    currentY = newY
                    currentX = newX

            self.bestHistory.append(currentY)
            self._updateGraph()

            time.sleep(max(0, 1 - 0.1 * self.speed))

        self.running = False
        return(currentX)

### Win-Stay-Lose-Switch

In [6]:
class TwoHandedWSLS:
    def __init__(
        self,
        ranges,
        leftPoints = 1,
        rightPoints = 1
    ):
        self.ranges = ranges
        self.leftPoints = leftPoints
        self.rightPoints = rightPoints

        self.bestHistory = []
        self.fig, self.ax = plt.subplots()
        self.ax.set_xlabel("Iteration")
        self.ax.set_ylabel("Best objective value so far")
        self.line, = self.ax.plot([], [], "b-")
        plt.ion()
        plt.show()

        self.currentChoice = "L"

    def _updateGraph(self):
        self.line.set_xdata(list(range(len(self.bestHistory))))
        self.line.set_ydata(self.bestHistory)
        self.ax.relim()
        self.ax.autoscale_view()
        plt.pause(0.0001)

    def generateRandomHypothesis(self):
        output = []
        for x in self.ranges:
            output.append(r.uniform(x[0], x[1]))
        return(output)

    def gradientStep(self, objectiveFunction, currentX, dx = 10 ** (-10),
                     learnRate = 0.01):
        gradientVector = []
        for i in range(len(currentX)):
            cloneX = [x for x in currentX]
            Y1 = objectiveFunction(cloneX)
            cloneX[i] += dx
            Y2 = objectiveFunction(cloneX)
            gradientVector.append((Y2 - Y1)/dx)
        outputX = [x for x in currentX]
        for i in range(len(outputX)):
            outputX[i] += gradientVector[i] * learnRate
        return(outputX)

    def optimize(self, objectiveFunction, numIterations = numIt):
        currentX = [np.mean(x) for x in self.ranges]
        currentY = objectiveFunction(currentX)

        for i in range(numIterations):
            if self.currentChoice == "L":
                newX = self.generateRandomHypothesis()
                newY = objectiveFunction(newX)
            else:
                newX = self.gradientStep(objectiveFunction, currentX)
                newY = objectiveFunction(newX)

            if newY > currentY:
                currentY = newY
                currentX = newX
            else:
                if self.currentChoice == "L":
                    self.currentChoice = "R"
                else:
                    self.currentChoice = "L"

            self.bestHistory.append(currentY)
            self._updateGraph()

        return(currentX)

In [None]:
print("Welcome!")
print("The commands are:")
print("l: switch to random search")
print("r: switch to gradient decent")
print("s: switch to whichever algorithm is not being used")
print("0-9: set the speed")
print("x: end the optimization process early")
print("Press any key to continue!")
user_responsive = input()
th_human = HumanControlledTwoHanded(ranges)
print(th_human.optimize(objective))

th = TwoHanded(ranges)
print(th.optimize(objective))

th_wsls = TwoHandedWSLS(ranges)
print(th_wsls.optimize(objective))

Welcome!
The commands are:
l: switch to random search
r: switch to gradient decent
s: switch to whichever algorithm is not being used
0-9: set the speed
x: end the optimization process early
Press any key to continue!


 w
 l
 s
 9
 l
 r
 l
 0
