<a href="https://colab.research.google.com/github/MathMayhem/4644_final/blob/main/DeepGeneticAlgorithm/Keyboard_Optimization_Genetic_Algorithm.ipynb" target="_parent">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

In [None]:
# Setup the API Server

# Installing Dependences
!sudo apt-get update -qq
!sudo apt-get install -y build-essential libmicrohttpd-dev libjson-c-dev xz-utils
!pip install requests -q

# Installing API Server
!wget https://github.com/RusDoomer/SVOBODA/archive/refs/tags/v0.1.tar.gz -O svoboda.tar.gz -q
!tar -xzf svoboda.tar.gz
!cd SVOBODA-0.1 && make
!wget https://colemak.com/pub/corpus/iweb-corpus-samples-cleaned.txt.xz -q
!unxz iweb-corpus-samples-cleaned.txt.xz
!mv iweb-corpus-samples-cleaned.txt SVOBODA-0.1/data/english/corpora/shai.txt

# Starting API Server
import subprocess
import os
import time
import requests
import signal

# --- Configuration ---
server_directory = 'SVOBODA-0.1'
executable_name = './svoboda'
api_url = "http://localhost:8888/"
startup_timeout_seconds = 60

# --- Process Handling ---
server_process = None

if not os.path.exists(os.path.join(server_directory, executable_name)):
    print("Error: Server executable not found. Please run the setup cell first.")
else:
    print("Starting C server in the background...")
    # Launch the server in the background
    server_process = subprocess.Popen(
        [executable_name],
        cwd=server_directory,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )

    # --- Verification Loop ---
    server_ready = False
    start_time = time.time()
    while time.time() - start_time < startup_timeout_seconds:
        # Check if the process has already terminated
        if server_process.poll() is not None:
            print(f"Error: Server process terminated unexpectedly with exit code {server_process.poll()}.")
            # Print stderr for debugging
            for line in server_process.stderr.readlines():
                print(f"[SERVER_STDERR] {line.strip()}")
            break

        # Attempt to connect
        try:
            response = requests.get(api_url, timeout=1)
            print(f"\nSuccess: Server is responsive at {api_url}.")
            server_ready = True
            break
        except requests.exceptions.ConnectionError:
            print(".", end="")
            time.sleep(1)

    if not server_ready and server_process.poll() is None:
        print("\nError: Server did not become responsive within the timeout period.")
        print("The process is still running but may be stuck. Terminating.")
        server_process.terminate()
    elif server_ready:
        print(f"Server is running with PID: {server_process.pid}")

In [117]:
# Required Imports for the Neural Network

import random
import copy
import numpy as np
import time
import pickle
import os
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers, optimizers

# The symbols whose keyboard positions we are optimizing
# Layouts will be represented as lists of indices from this symbol set
symbols = "abcdefghijklmnopqrstuvwxyz;,./"

In [2]:
# Mount your drive to use our data files or generate your own
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# The Basic Unit of our Neural Network. This defines a custom Fully Connected Layer with a Leaky ReLU activation function and skip connection around the entire layer.
class CustomLayer(layers.Layer):
    """
    A custom Keras layer that combines a Dense layer, Leaky ReLU activation,
    and a conditional skip connection.

    The skip connection adds the original input to the output of the
    Dense -> Leaky ReLU sequence ONLY if the last dimension of the input
    matches the number of units in the Dense layer.
    """
    def __init__(self, units, negative_slope=0.5, l2_reg=0.0, **kwargs):
        """
        Initializes the custom layer.

        Args:
            units (int): The number of output units for the Dense layer.
            alpha (float): The negative slope coefficient for the Leaky ReLU activation.
            **kwargs: Additional keyword arguments to pass to the base Layer class.
        """
        super().__init__(**kwargs)
        self.units = units
        self.negative_slope = negative_slope
        self.l2_reg = l2_reg

        kernel_regularizer = regularizers.l2(self.l2_reg)


        # Initialize the Dense layer and Leaky ReLU activation
        # These will create their weights when first called (or in build if explicitly built)
        self.dense_layer = layers.Dense(self.units, kernel_regularizer=kernel_regularizer, name="dense_part")
        self.leaky_relu_activation = layers.LeakyReLU(negative_slope=self.negative_slope, name="leaky_relu_part")

    def build(self, input_shape):
        """
        Builds the layer's weights. This method is called automatically
        the first time the layer is run.

        Args:
            input_shape (tf.TensorShape): The shape of the input tensor.
                                          Typically (batch_size, ..., input_dim).
        """
        # The dense layer's weights are implicitly built when it's called
        # with an input of known shape. We explicitly build it here to
        # ensure its weights are created before the first call, which can
        # be useful for inspection or if the layer is part of a larger model
        # that needs to know all its weights upfront.
        self.dense_layer.build(input_shape)

        # Determine if a skip connection is possible based on input and output dimensions.
        # The skip connection adds the original input to the processed output.
        # For this to work, the last dimension of the input must match the number of units.
        self.can_skip_connect = (input_shape[-1] == self.units)
        if not self.can_skip_connect:
            print(f"Warning: Skip connection not possible for layer '{self.name}'.")
            print(f"Input last dimension ({input_shape[-1]}) does not match Dense units ({self.units}).")
            print("The layer will function as a standard Dense + Leaky ReLU.")

        super().build(input_shape) # Call the base class's build method

    def call(self, inputs):
        """
        Defines the forward pass logic of the layer.

        Args:
            inputs (tf.Tensor): The input tensor to the layer.

        Returns:
            tf.Tensor: The output tensor after applying Dense, Leaky ReLU,
                       and the conditional skip connection.
        """
        # Step 1: Pass through the Dense layer
        dense_output = self.dense_layer(inputs)

        # Step 2: Apply Leaky ReLU activation
        activated_output = self.leaky_relu_activation(dense_output)

        # Step 3: Implement the conditional skip connection
        # We check the actual shape of the `inputs` tensor at call time.
        # Note: input_shape from build might have None for batch size,
        # but inputs.shape will have the concrete batch size.
        # We only care about the feature dimension (last dimension).
        if inputs.shape[-1] == self.units:
            # If dimensions match, add the original input to the activated output
            output = activated_output + inputs
            # print(f"Info: Skip connection applied for layer '{self.name}'.")
        else:
            # If dimensions don't match, no skip connection is applied
            output = activated_output
            # The warning about skip connection not being possible is already printed in build,
            # but we can add a runtime message here if needed for debugging.
            # print(f"Info: Skip connection NOT applied for layer '{self.name}' due to dimension mismatch.")

        return output

    def get_config(self):
        """
        Returns the serializable configuration of the layer.
        This is important for saving and loading models that use custom layers.
        """
        config = super().get_config()
        config.update({
            "units": self.units,
            "negative_slope": self.negative_slope,
            "l2_reg": self.l2_reg
        })
        return config

In [9]:
# Genetic Algorithm Methods

def crossover(a, b, n):
    c = [None for i in range(n)]
    while any(s == None for s in c):
        parent_choice = random.choice(['a', 'b'])
        if parent_choice == 'a':
            p1 = a
            p2 = b
        else:
            p1 = b
            p2 = a

        starting_key = random.randint(0, n - 1)
        while c[starting_key] != None:
            starting_key = random.randint(0, n - 1)

        c[starting_key] = p1[starting_key]
        current_key = p2.index(p1[starting_key])
        while current_key != starting_key:
            c[current_key] = p1[current_key]
            current_key = p2.index(p1[current_key])

    return c


def mutate(a, max_swaps, n):
    b = copy.deepcopy(a)

    num_swaps = random.randint(0, max_swaps)
    for i in range(num_swaps):
        key_1 = random.randint(0, n - 1)
        key_2 = random.randint(0, n - 1)
        b[key_1], b[key_2] = b[key_2], b[key_1]

    return b

# Calls the API Server to evaluate a batch of layouts which utilize the provided string of symbols
# If metric weights is set to None, then this function will return each of the five individual metrics for each of the input layouts
# Otherwise, it will return the weighted sum of these metrics using the provided metric weights
def true_evaluation(batch, metric_weights, symbols):
  layouts = []
  for a in batch:
    layouts.append("")
    for index in a:
      layouts[-1] += symbols[index]

  api_url = "http://localhost:8888/"

  payload = [{
    "layout": layout,
    "weights":  {"sfb": 0.0, "sfs": 0.0, "lsb": 0.0, "alt": 0.0, "rolls": 0.0} if metric_weights == None else metric_weights
  } for layout in layouts]

  try:
    response = requests.post(api_url, json=payload)

    if response.status_code != 200:
      print(f"Error: Server responded with status code {response.status_code}")
      print("Response text:", response.text)
    else:
      if metric_weights == None:
        return [[r["stat_values"]["sfb"], r["stat_values"]["sfs"], r["stat_values"]["lsb"], r["stat_values"]["alt"], r["stat_values"]["rolls"]] for r in response.json()]
      else:
        return [r["score"] for r in response.json()]

  except requests.exceptions.ConnectionError as e:
    print("Connection Error: Could not connect to the C server.")
    print("Please ensure the server is running correctly from the previous cell.")


# Similar to MKLOGA genetic algorithm, but with some small design differences and no fine-tuning of the model
# Note: copy rate should be at least the crossover cutoff and the random rate should be at most the pop minus the copy rate
def evolve(pop, epochs, random_rate, copy_rate, evaluation_rate, crossover_cutoff, mutation_rate, delete_duplicates, mode, model, metric_weights, symbols):
  w = np.array([[metric_weights["sfb"]], [metric_weights["sfs"]], [metric_weights["lsb"]], [metric_weights["alt"]], [metric_weights["rolls"]]])
  n = len(symbols)

  # Layouts in the generation are split between those whose scores have been estimated by the model, those who have received a true evaluation, and those with neither of these
  # This makes the method more efficient by avoiding repeated evaluations
  current_generation = {"no scores": [], "estimated scores": [], "true scores": []}

  # The first generation is created randomly
  a = [i for i in range(n)]
  for i in range(pop):
    random.shuffle(a)
    current_generation["no scores"].append(copy.deepcopy(a))

  for epoch in range(epochs):
    ## ESTIMATED/TRUE EVALUATIONS PERFORMED ##

    if mode == 1:
      # In this mode the passed in model is ignored and every layout is given a true evaluation
      # The crossover cutoff controls how many of the top layouts given true evaluations are used in the crossover/mutation phase
      # Since layouts past this cutoff will no longer affect the algorithm, they are deleted at this stage
      new_scores = true_evaluation(current_generation["no scores"], metric_weights, symbols)
      current_generation["true scores"] += list(zip(current_generation["no scores"], new_scores))
      current_generation["no scores"] = []
      current_generation["true scores"] = sorted(current_generation["true scores"], key = lambda x : x[1], reverse = True)[:crossover_cutoff]
    elif mode == 2:
      # In this mode true evaluations are not used and the model is treated as the ground truth
      new_scores = list(np.matmul(model(np.array(current_generation["no scores"])), w).squeeze(-1))
      current_generation["true scores"] += list(zip(current_generation["no scores"], new_scores))
      current_generation["no scores"] = []
      current_generation["true scores"] = sorted(current_generation["true scores"], key = lambda x : x[1], reverse = True)[:crossover_cutoff]
    else:
      # If a model is given, score estimations are done using the model for all layouts which currently have no scores at all
      # Notice, w has the user provided weights to place on each metric and the weighted sums are taken here
      new_estimated_scores = list(np.matmul(model(np.array(current_generation["no scores"])), w).squeeze(-1))
      current_generation["estimated scores"] += list(zip(current_generation["no scores"], new_estimated_scores))
      current_generation["no scores"] = []
      current_generation["estimated scores"] = sorted(current_generation["estimated scores"], key = lambda x : x[1], reverse = True)

      # The evaluation rate controls how many of the top layouts with only estimated scores recieve a full evaluation
      # The crossover cutoff controls how many of the top layouts given true evaluations are used in the crossover/mutation phase
      # Since layouts past this cutoff will no longer affect the algorithm, they are deleted at this stage
      layouts_to_evaluate = [a for a, s in current_generation["estimated scores"][:evaluation_rate]]
      new_scores = true_evaluation(layouts_to_evaluate, metric_weights, symbols)
      current_generation["true scores"] += list(zip(layouts_to_evaluate, new_scores))
      current_generation["true scores"] = sorted(current_generation["true scores"], key = lambda x : x[1], reverse = True)[:crossover_cutoff]
      del current_generation["estimated scores"][:evaluation_rate]

    if delete_duplicates:
      for i in range(len(current_generation["true scores"]) - 1, 0, -1):
        if current_generation["true scores"][i - 1][0] == current_generation["true scores"][i][0]:
          del current_generation["true scores"][i]

    ## COPIES FROM CURRENT GENERATION ADDED TO NEXT GENERATION ##

    # All top layouts with true scores from the current generation are carried over to the next
    # The copy rate controls the total number of layouts carried over to the next generation
    # The rest of this quota is filled by the top layouts with high estimated scores
    next_generation = {"no scores": [], "estimated scores": current_generation["estimated scores"][:copy_rate - len(current_generation["true scores"])], "true scores": current_generation["true scores"]}

    # ADDITIONAL RANDOM LAYOUTS ADDED TO NEXT GENERATION ##

    # The random rate controls the number of random layouts added to the next generation
    a = [i for i in range(n)]
    for i in range(random_rate):
      random.shuffle(a)
      next_generation["no scores"].append(copy.deepcopy(a))

    ## CROSSOVER/MUTATED LAYOUTS FROM CURRENT GENERATION ADDED TO NEXT GENERATION ##

    # The pop variable controls the constant population of each generation
    # The remaining quota is filled at this point by crossing over two layouts from the current generation and mutating the result
    for i in range(pop - random_rate - len(next_generation["estimated scores"]) - len(next_generation["true scores"])):
      # MKLOGA code shows that they picked the crossovers using a beta distribution with these parameters
      index_a , index_b = np.floor(min(crossover_cutoff, len(current_generation["true scores"]))*np.random.beta(a = 0.5, b = 2, size = 2)).astype(int)

      a, b = current_generation["true scores"][index_a][0], current_generation["true scores"][index_b][0]
      next_generation["no scores"].append(copy.deepcopy(mutate(crossover(a, b, n), mutation_rate, n)))

    current_generation = copy.deepcopy(next_generation)

  return current_generation["true scores"][0]

In [10]:
# Creates our Neural Network and its Optimizer
# For the sake of efficiency, the network assumes you have already created the one-hot encodings of the layouts
architecture = [
  CustomLayer(900),
  CustomLayer(900),
  CustomLayer(900),
  CustomLayer(900),
  CustomLayer(900),
  CustomLayer(900),
  CustomLayer(900),
  CustomLayer(900),
  layers.Dense(5)
]

network = models.Sequential(architecture)
network.compile(loss="mse")
network.build(input_shape = (None, 900))

# These are the means (offsets) and mean absolute deviations (scalars) of each of the five metrics
# These were calculated from a set of 10,000 uniformly random layouts
scalars = np.array([2.37272293, 1.77984022, 1.84450506, 2.34851446, 2.97247169])
offsets = np.array([10.53548093, 10.47626445,  3.99833419, 17.61765303, 35.32753762])

L1 = layers.CategoryEncoding(num_tokens = 30, output_mode = "one_hot")
L2 = layers.Flatten()

def model(layouts):
  return scalars*network(L2(L1(layouts))) + offsets

In [11]:
# Running this will require having your Google Drive attached!
# Loads the network pretrained weights
save_path = '/content/drive/My Drive/colab_data/'
with open(os.path.join(save_path, 'eight_hidden_layer_network_weights.pkl'), 'rb') as f:
  network.set_weights(pickle.load(f))

In [74]:
# The tests we ran on the Genetic Algorithm using a A100 GPU
# The random seed will often be set at the top of each cell to ensure you can reproduce the results found in our paper
# These seed values were themselves chosen randomly with no cherry picking

## Test 1 weights ##
# metric_weights = {"sfb": -1.5, "sfs": -0.7, "lsb": -0.8, "alt": 0.2, "rolls": 0.3}

# mode = 0
# random.seed(795)

# mode = 1
# random.seed(345)

# mode = 2
# random.seed(679)



## Test 2 weights ##
# metric_weights = {"sfb": -1.0, "sfs": -0.2, "lsb": -0.2, "alt": 0.5, "rolls": 0.5}

# mode = 0
# random.seed(268)

# mode = 1
# random.seed(837)

# mode = 2
#random.seed(132)

worse_time = 0.0
average_time = 0.0
best_time = float('infinity')

best_layout = []
worse_score = float('infinity')
average_score = 0.0
best_score = -float('infinity')

# Runs the Genetic Algorithm ten times with this same set of weights and mode, and reports back the worse, average, and best scores/runtimes
for i in range(10):
  print(i)
  start_time = time.process_time()
  layout, score = evolve(5000, 30, 1000, 250, 250, 100, 5, True, mode, model, metric_weights, symbols)
  end_time = time.process_time()
  duration = end_time - start_time

  if mode == 2:
    score = true_evaluation([layout], metric_weights, symbols)[0]

  worse_time = max(worse_time, duration)
  average_time += duration/10.0
  best_time = min(best_time, duration)

  if score > best_score:
    best_layout = copy.deepcopy(layout)

  worse_score = min(worse_score, score)
  average_score += score/10.0
  best_score = max(best_score, score)

0
1
2
3
4
5
6
7
8
9


In [81]:
# Random Search tests we ran on a A100 GPU

# Test 1 Random Search
# metric_weights = {"sfb": -1.5, "sfs": -0.7, "lsb": -0.8, "alt": 0.2, "rolls": 0.3}
# random.seed(223)



# Test 2 Random Search
# metric_weights = {"sfb": -1.0, "sfs": -0.2, "lsb": -0.2, "alt": 0.5, "rolls": 0.5}
# random.seed(869)

worse_time = 0.0
average_time = 0.0
best_time = float('infinity')

worse_score = float('infinity')
average_score = 0.0
best_score = -float('infinity')

# Runs the Random Search ten times with this same set of weights and mode, and reports back the worse, average, and best scores/runtimes
for i in range(10):
  print(i)
  start_time = time.process_time()
  layouts = [[i for i in range(30)] for j in range(150000)]
  for a in layouts:
    random.shuffle(a)
  scores = true_evaluation(layouts, metric_weights, symbols)
  score = max(scores)
  end_time = time.process_time()
  duration = end_time - start_time

  worse_time = max(worse_time, duration)
  average_time += duration/10.0
  best_time = min(best_time, duration)

  worse_score = min(worse_score, score)
  average_score += score/10.0
  best_score = max(best_score, score)

0
1
2
3
4
5
6
7
8
9
