# Prelude



[**Resonators**](https://github.com/0xmaddie/0xmaddie/blob/main/resonators)
is a Python library that uses data races and particle swarm dynamics
to approximate functions. You can train a resonator like a transformer
but it uses Python objects instead of vectors.

You can do program evolution with a stack of "genetic transformer" blocks each containing fitness, crossover, and mutation operations. attach weighted coins to race all of the operations and train this with a particle swarm on data showing the evolutionary paths you want.

You can increase representational capacity by expressing conditional dependence between the weights. so per block there are F fitness weights, FxC crossover weights, FxCxM mutation weights representing the probaility that operation will be next in line during its race, along with 1 weight for a type of residual connection.

The crossover operations combine several states in to one and the mutation operations transform one state in to several. This acts as a type of information bottleneck and provides an analogy to continuity in that it "smooths out" the details between states.


In [None]:
import google
google.colab.drive.mount('/content/drive')
!cp /content/drive/MyDrive/secret.py .
import secret

In [None]:
!pip install bitsandbytes accelerate

In [None]:
import huggingface_hub
huggingface_hub.login()

# Llama

In [None]:
import transformers
import torch

class Llama:
  tokenizer: transformers.AutoTokenizer
  model: transformers.AutoModelForCausalLM
  system_prompt: str
  temperature: float
  max_new_tokens: int
  terminators: list
  purity: int
  quota: int

  def __init__(
    self,
    system_prompt: str = 'You are a helpful assistant.',
    temperature: float = 0.7,
    max_new_tokens: int = 4096,
    purity: int = 100,
    quota: int = 1000,
    tokenizer = None,
    model = None,
  ):
    model_name = 'meta-llama/Meta-Llama-3-8B-Instruct'
    self.tokenizer = tokenizer or transformers.AutoTokenizer.from_pretrained(
      model_name,
      #cache_dir=secret.home,
      #local_files_only=True,
    )
    self.model = model or transformers.AutoModelForCausalLM.from_pretrained(
      model_name,
      device_map='auto',
      load_in_8bit=True,
      #cache_dir=secret.home,
      #local_files_only=True,
    )
    self.terminators = [
      self.tokenizer.eos_token_id,
      self.tokenizer.convert_tokens_to_ids('<|eot_id|>')
    ]
    self.system_prompt  = system_prompt
    self.temperature    = temperature
    self.max_new_tokens = max_new_tokens
    self.purity         = purity
    self.quota          = quota

  def __apply_chat_template(self, state):
    prompt = [{ 'role': 'system', 'content': self.system_prompt }]
    for index, content in enumerate(state):
      if index%2 == 0:
        role = 'user'
      else:
        role = 'assistant'
      prompt.append({ 'role': role, 'content': content })
    print(f'Llama.prompt={prompt}')
    input_ids = self.tokenizer.apply_chat_template(
      prompt,
      #tokenize=True,
      add_generation_prompt=True,
      return_tensors='pt',
    ).to(self.model.device)

    return input_ids

  def get(self, state):
    source_ids = self.__apply_chat_template(state)
    targets    = self.model.generate(
      source_ids,
      max_new_tokens=self.max_new_tokens,
      do_sample=True,
      temperature=self.temperature,
      eos_token_id=self.terminators,
    )
    target_ids = targets[0][source_ids.shape[-1]:]
    target     = self.tokenizer.decode(target_ids, skip_special_tokens=True)
    print(f'Llama.get target={target}')
    return target

  def reduce(self, input, map, reduce):
    quota   = self.quota
    samples = []
    while quota > 0 and len(samples) < self.purity:
      quota -= 1
      samples.clear()
      try:
        value = map(self.get(input))
        print(f'Llama.reduce value={value}')
        samples.append(value)
      except ValueError:
        pass
    if len(samples) < self.purity and quota == 0:
      raise ValueError(f'Llama.reduce quota consumed')
    output = reduce(samples)
    return output

In [None]:
#_model     = llm.model
#_tokenizer = llm.tokenizer
llm = Llama(
  system_prompt=f'''
You are a helpful assistant.
'''.strip(),
  temperature=0.7,
  max_new_tokens=1024,
  purity=1,
  quota=3,
  #model=_model,
  #tokenizer=_tokenizer,
)

# Lisp

In [None]:
from typing import Callable
import dataclasses

class Object:
  pass

class State:
  pass

Context = Callable[[Object], State]

@dataclasses.dataclass(frozen=True)
class Nil(Object):
  pass

@dataclasses.dataclass(frozen=True)
class Pair(Object):
  fst: Object
  snd: Object

@dataclasses.dataclass(frozen=True)
class Constant(Object):
  name: str

@dataclasses.dataclass(frozen=True)
class Variable(Object):
  name: str

@dataclasses.dataclass(frozen=True)
class Boolean(Object):
  value: bool

@dataclasses.dataclass(frozen=True)
class Number(Object):
  value: float

@dataclasses.dataclass(frozen=True)
class String(Object):
  value: str

class Environment(Object):
  body: dict[str, Object]
  next: 'Environment'

  def __init__(self, body=None, next=None):
    self.body = body or {}
    self.next = next

@dataclasses.dataclass(frozen=True)
class Lift(Object):
  body: Callable[[Object, Object, Context, 'Interpreter'], State]

@dataclasses.dataclass(frozen=True)
class Abstract(Object):
  head: Object
  body: Object
  dynamic: Object
  lexical: Object

@dataclasses.dataclass(frozen=True)
class Wrap(Object):
  body: Object

@dataclasses.dataclass(frozen=True)
class Ok(State):
  object: Object

@dataclasses.dataclass(frozen=True)
class Eval(State):
  object: Object
  environment: Object
  ok: Context

@dataclasses.dataclass(frozen=True)
class Evlis(State):
  object: Object
  environment: Object
  ok: Context

@dataclasses.dataclass(frozen=True)
class Exec(State):
  object: Object
  environment: Object
  ok: Context

@dataclasses.dataclass(frozen=True)
class Apply(State):
  procedure: Object
  arguments: Object
  environment: Object
  ok: Context

def __pr_list(args, env, ok, ctx):
  return ok(args)

def __pr_vau(args, env, ok, ctx):
  head    = args.fst
  body    = args.snd.snd
  dynamic = args.snd.fst
  lexical = env
  target  = ctx.abstract(head, body, dynamic, lexical)
  return ok(target)

class Lisp:
  constants: dict[str, Object]

  def __init__(self):
    self.constants = {}

  def next(self, state: State) -> State:
    match state:
      case Ok(_):
        return state
      case Eval(value, env, ok):
        match value:
          case Variable(_) | Constant(_):
            if self.has(value, env):
              return ok(self.get(value, env))
            msg = f'{value} is undefined'
            raise ValueError(msg)
          case Pair(proc, args):
            def proc_ok(proc):
              return self.apply(proc, args, env, ok)
            return self.eval(proc, env, proc_ok)
          case _:
            return ok(value)
      case Evlis(value, env, ok):
        match value:
          case Nil():
            return ok(value)
          case Pair(fst, snd):
            def fst_ok(fst):
              def snd_ok(snd):
                return ok(self.pair(fst, snd))
              return self.evlis(snd, env, snd_ok)
            return self.eval(fst, env, fst_ok)
          case _:
            msg = f'Expected a list, but got {value}'
            raise ValueError(msg)
      case Exec(value, env, ok):
        match value:
          case Nil():
            return ok(value)
          case Pair(fst, snd):
            def fst_ok(fst):
              def snd_ok(snd):
                if not self.is_nil(snd):
                  return ok(snd)
                return ok(fst)
              return self.exec(snd, env, snd_ok)
            return self.eval(fst, env, fst_ok)
          case _:
            msg = f'Expected a list, but got {value}'
            raise ValueError(msg)
      case Apply(proc, args, env, ok):
        match proc:
          case Lift(body):
            return body(args, env, ok, self)
          case Abstract(head, body, dynamic, lexical):
            head_len = self.len(head)
            args_len = self.len(args)
            if head_len != args_len:
              msg = f'Expected {head_len} arguments, but got {args_len}'
              raise ValueError(msg)
            local = self.environment(lexical)
            while not self.is_nil(head):
              if not self.is_variable(head.fst):
                msg = f'Expected a variable, but got {head.fst}'
                raise ValueError(msg)
              if self.has(head.fst, local):
                msg = f'{head.fst} is already defined'
                raise ValueError(msg)
              self.put(head.fst, args.fst, local)
              head = head.snd
              args = args.snd
            if self.has(dynamic, local):
              msg = f'{dynamic} is already defined.'
              raise ValueError(msg)
            self.put(dynamic, env, local)
            return self.exec(body, local, ok)
          case Wrap(proc):
            def args_ok(args):
              return self.apply(proc, args, env, ok)
            return self.evlis(args, env, args_ok)
          case _:
            msg = f'Expected a procedure, but got {proc}.'
            raise ValueError(msg)

  @property
  def nil(self) -> Object:
    return Nil()

  def pair(self, fst, snd) -> Object:
    return Pair(fst, snd)

  def list(self, *args) -> Object:
    return self.from_list(args)

  def constant(self, name) -> Object:
    return Constant(name)

  def variable(self, name) -> Object:
    return Variable(name)

  def boolean(self, value) -> Object:
    return Boolean(value)

  def number(self, value) -> Object:
    return Number(value)

  def string(self, value) -> Object:
    return String(value)

  def environment(self, parent = None) -> Object:
    return Environment(parent)

  def standard_environment(self) -> Object:
    body = {}

    body['True']  = self.boolean(True)
    body['False'] = self.boolean(False)

    return Environment(body=body)

  def lift(self, body) -> Object:
    return Lift(body)

  def abstract(self, head, body, dynamic, lexical) -> Object:
    assert self.is_list(head)
    assert self.is_list(body)
    assert self.is_variable(dynamic)
    assert self.is_environment(lexical)
    return Abstract(head, body, dynamic, lexical)

  def wrap(self, body) -> Object:
    assert self.is_procedure(body)
    return Wrap(body)

  def ok(self, object) -> State:
    return Ok(object)

  def eval(self, value, env=None, ok=None) -> State:
    env = env or self.standard_environment()
    ok  = ok  or (lambda x: self.ok(x))
    assert self.is_environment(env)
    return Eval(value, env, ok)

  def evlis(self, value, env, ok) -> State:
    env = env or self.standard_environment()
    ok  = ok  or (lambda x: self.ok(x))
    assert self.is_list(value)
    assert self.is_environment(env)
    return Evlis(value, env, ok)

  def exec(self, value, env, ok) -> State:
    env = env or self.standard_environment()
    ok  = ok  or (lambda x: self.ok(x))
    assert self.is_list(value)
    assert self.is_environment(env)
    return Exec(value, env, ok)

  def apply(self, proc, args, env, ok) -> State:
    env = env or self.standard_environment()
    ok  = ok  or (lambda x: self.ok(x))
    assert self.is_procedure(proc)
    assert self.is_list(args)
    assert self.is_environment(env)
    return Apply(proc, args, env, ok)

  def is_nil(self, obj) -> bool:
    match obj:
      case Nil():
        return True
      case _:
        return False

  def is_pair(self, obj) -> bool:
    match obj:
      case Pair(_, _):
        return True
      case _:
        return False

  def is_list(self, obj) -> bool:
    match obj:
      case Nil():
        return True
      case Pair(fst, snd):
        return self.is_list(snd)
      case _:
        return False

  def is_constant(self, obj) -> bool:
    match obj:
      case Constant(_):
        return True
      case _:
        return False

  def is_variable(self, obj) -> bool:
    match obj:
      case Variable(_):
        return True
      case _:
        return False

  def is_number(self, obj) -> bool:
    match obj:
      case Number(_):
        return True
      case _:
        return False

  def is_string(self, obj) -> bool:
    match obj:
      case String(_):
        return True
      case _:
        return False

  def is_environment(self, obj) -> bool:
    match obj:
      case Environment():
        return True
      case _:
        return False

  def is_procedure(self, obj) -> bool:
    match obj:
      case Lift() | Wrap() | Abstract():
        return True
      case _:
        return False

  def is_ok(self, state) -> bool:
    match state:
      case Ok():
        return True
      case _:
        return False

  def is_halt(self, state) -> bool:
    match state:
      case Ok():
        return True
      case _:
        return False

  def has(self, key, env) -> bool:
    match key:
      case Constant(name):
        return name in self.constants
      case Variable(name):
        while env is not None:
          if name in env.body:
            return True
          env = env.next
        return False
      case str(name):
        if name[0].isupper():
          name = self.constant(name)
        else:
          name = self.variable(name)
        return self.has(env, name)
      case _:
        return False

  def get(self, key, env) -> Object:
    match key:
      case Constant(name):
        if name in self.constants:
          return self.constants[name]
        msg = f'{name} is undefined.'
        raise ValueError(msg)
      case Variable(name):
        while env is not None:
          if name in env.body:
            return env.body[name]
          env = env.next
        msg = f'{name} is undefined.'
        raise ValueError(msg)
      case str(name):
        if name[0].isupper():
          name = self.constant(name)
        else:
          name = self.variable(name)
        return self.get(env, name)

  def put(self, key, value, env):
    match key:
      case Constant(name):
        msg = f'{name} is a constant and cannot be redefined.'
        raise ValueError(msg)
      case Variable(name):
        if name in env.body:
          msg = f'{name} already has a value and cannot be redefined.'
          raise ValueError(msg)
        env.body[name] = value
      case str(name):
        if name[0].isupper():
          name = self.constant(name)
        else:
          name = self.variable(name)
        return self.put(env, name, value)

  def read(self, source: str):
    stack = []
    build = []
    index = 0

    def seek_while(fn):
      nonlocal source
      nonlocal index
      while index < len(source) and fn(source[index]):
        index += 1

    def seek_until(fn):
      nonlocal source
      nonlocal index
      while index < len(source) and not fn(source[index]):
        index += 1

    is_lparen     = lambda x: x == '('
    is_rparen     = lambda x: x == ')'
    is_paren      = lambda x: x in ['(', ')']
    is_quote      = lambda x: x == '"'
    is_whitespace = lambda x: x in [' ', '\t', '\r', '\n']
    is_separator  = lambda x: is_paren(x) or is_quote(x) or is_whitespace(x)
    is_unreadable = lambda x: x.startswith('#<')
    is_constant   = lambda x: x[0].isupper()

    while index < len(source):
      if is_lparen(source[index]):
        stack.append(build)
        build = []
        index += 1
      elif is_rparen(source[index]):
        if len(stack) == 0:
          msg = f'Unbalanced parentheses.'
          raise ValueError(msg)
        xs = self.from_list(build)
        build = stack.pop()
        build.append(xs)
        index += 1
      elif is_quote(source[index]):
        index += 1
        start  = index
        seek_until(is_quote)
        body = source[start:index]
        build.append(self.string(body))
        index += 1
      elif is_whitespace(source[index]):
        seek_while(is_whitespace)
      else:
        start = index
        seek_until(is_separator)
        body = source[start:index]
        if is_unreadable(body):
          msg = f'{body} is unreadable.'
          raise ValueError(msg)
        try:
          value = float(body)
          build.append(self.number(value))
        except ValueError:
          if is_constant(body):
            build.append(self.constant(body))
          else:
            build.append(self.variable(body))
    return build

  def print(self, obj) -> str:
    match obj:
      case Nil():
        return '()'
      case Pair(fst, snd):
        if self.is_list(obj):
          buf = []
          while not self.is_nil(obj):
            hidden = self.print(obj.fst)
            buf.append(hidden)
            obj = obj.snd
          body = ' '.join(buf)
          return f'({body})'
        else:
          fst_ = self.print(fst)
          snd_ = self.print(snd)
          return f'(Pair {fst_} {snd_})'
      case Constant(name):
        return name
      case Variable(name):
        return name
      case Boolean(value):
        return str(value)
      case Number(value):
        return str(value)
      case String(value):
        # todo: escape quotes
        return f'"{value}"'
      case Environment():
        return '#<environment>'
      case Lift() | Wrap() | Abstract():
        return '#<procedure>'
      case _:
        msg = f'unexpected object: {obj}'
        raise ValueError(msg)

  def from_list(self, xs) -> Object:
    state = self.nil
    for child in reversed(xs):
      state = self.pair(child, state)
    return state

  def from_dict(self, xs) -> Object:
    buf = []
    for key, value in sorted(xs.items()):
      buf.append(self.pair(self.string(key), value))
    return self.from_list(buf)

import unittest

class SanityTest(unittest.TestCase):
  def test_read(self):
    examples = [
      'foo',
      'Bar',
      '3.14',
      'True',
      'False',
      '"Hello, world."',
      '(+ 1.0 2.0 3.0 4.0)',
    ]
    ctx = Lisp()
    for example in examples:
      obj = ctx.read(example)[0]
      self.assertEqual(example, ctx.print(obj))

  '''
  def test_eval(self):
    ctx = Lisp()
    obj = ctx.read('(+ 1 2 3 4)')[0]
    print(ctx.print(obj))
    gas   = 100
    state = ctx.eval(obj)
    print(f'--- {gas} ---\n{state}')
    while gas > 0 and not ctx.is_halt(state):
      gas  -= 1
      state = ctx.next(state)
      print(f'--- {gas} ---\n{state}')
    self.assertTrue(ctx.is_ok(state))
    print(state.object)
  '''

In [None]:
unittest.main(argv=[''], exit=False)

# Resonator

## Decoder

In [None]:
import functools
import numpy
import scipy
from typing import Optional

class Resonator:
  fitness: list[callable]
  crossover: list[callable]
  mutation: list[callable]
  weights: numpy.ndarray
  state_capacity: int
  state_bottleneck: int

  def __init__(
    self,
    fitness: list[callable],
    crossover: list[callable],
    mutation: list[callable],
    weights: numpy.ndarray,
    state_capacity: int,
    state_bottleneck: int,
  ):
    self.fitness          = fitness
    self.crossover        = crossover
    self.mutation         = mutation
    self.weights          = weights
    self.state_capacity   = state_capacity
    self.state_bottleneck = state_bottleneck

    assert len(weights) == self.layers*self.weights_per_layer

  @staticmethod
  def num_weights_for_components(fitness, crossover, mutation, layers):
    fitl   = len(fitness)
    crossl = len(fitness)*len(crossover)
    mutl   = len(fitness)*len(crossover)*len(mutation)
    return layers*(fitl+crossl+mutl+1)

  @property
  def weights_per_layer(self):
    return self.fitness_weights_per_layer+self.crossover_weights_per_layer+self.mutation_weights_per_layer+1

  @property
  def layers(self):
    return len(self.weights)//self.weights_per_layer

  @property
  def fitness_weights_per_layer(self):
    return len(self.fitness)

  @property
  def crossover_weights_per_layer(self):
    return len(self.fitness)*len(self.crossover)

  @property
  def mutation_weights_per_layer(self):
    return len(self.fitness)*len(self.crossover)*len(self.mutation)

  def fitness_weights(self, layer):
    lhs = layer*self.weights_per_layer
    return self.weights[lhs:lhs+self.fitness_weights_per_layer]

  def crossover_weights(self, layer, fitness):
    lhs = layer*self.weights_per_layer+self.fitness_weights_per_layer
    return self.weights[lhs:lhs+self.crossover_weights_per_layer]

  def mutation_weights(self, layer, fitness, crossover):
    lhs = layer*self.weights_per_layer+self.fitness_weights_per_layer+self.crossover_weights_per_layer
    return self.weights[lhs:lhs+self.mutation_weights_per_layer]

  def residual_weights(self, layer):
    lhs = layer*self.weights_per_layer+self.fitness_weights_per_layer+self.crossover_weights_per_layer+1
    return self.weights[lhs]

  def race(self, components, weights):
    sorted_components = numpy.random.choice(
      components,
      size=len(components),
      replace=False,
      p=self.__softmax(weights),
    )
    return sorted_components

  def __softmax(self, array):
    return scipy.special.softmax(array)

  def __coin(self, weight):
    return numpy.random.random() < weight

  def __choice(self, components):
    return numpy.random.choice(components)

  def __call__(self, states):
    for layer in range(self.layers):
      hidden_states = []
      for i in range(self.state_capacity):
        sorted_fitness = self.race(
          self.fitness,
          self.fitness_weights(layer),
        )
        fitness_hidden      = []
        current_fitness_id  = 0
        selected_fitness_id = None
        while selected_fitness_id is None and current_fitness_id < len(self.fitness):
          local_fitness = sorted_fitness[current_fitness_id]
          fitness_hidden.clear()
          for state in states:
            if local_fitness(state):
              print(f'Decoder current_fitness_id {current_fitness_id} {state} True')
              fitness_hidden.append(state)
              if len(fitness_hidden) >= self.state_bottleneck:
                selected_fitness_id = current_fitness_id
                break
            else:
              print(f'Decoder current_fitness_id {current_fitness_id} {state} False')
          current_fitness_id += 1
        if selected_fitness_id is None:
          raise ValueError(f'No fitness operation available')
        assert len(fitness_hidden) == self.state_botteneck
        print(f'Decoder.fitness_hidden = {fitness_hidden} Ok')
        sorted_crossover = self.race(
          self.crossover,
          self.crossover_weights(layer, selected_fitness_id),
        )
        crossover_hidden      = None
        current_crossover_id  = 0
        selected_crossover_id = None
        while selected_crossover_id is None and current_crossover_id < len(self.crossover):
          local_crossover = sorted_crossover[current_crossover_id]
          try:
            crossover_hidden      = functools.reduce(local_crossover, fitness_hidden)
            selected_crossover_id = current_crossover_id
          except ValueError as err:
            print(f'Decoder.current_crossover_id {current_crossover_id} Err')
            current_crossover_id += 1
        if selected_crossover_id is None:
          raise ValueError('No crossover operation available')
        assert crossover_hidden is not None
        print(f'Decoder.crossover_hidden = {crossover_hidden} Ok')
        sorted_mutation = self.race(
          self.mutation,
          self.mutation_weights(layer, selected_fitness_id, selected_crossover_id)
        )
        mutation_hidden      = []
        current_mutation_id  = 0
        selected_mutation_id = None
        while selected_mutation_id is None and current_mutation_id < len(self.mutation):
          local_mutation = sorted_mutation[current_mutation_id]
          try:
            mutation_hidden.clear()
            for j in range(self.state_bottleneck):
              point = local_mutation(crossover_hidden)
              mutation_hidden.append(point)
            selected_mutation_id = current_mutation_id
          except ValueError as err:
            print(f'Decoder.current_mutation_id {current_mutation_id} Err')
            current_mutation_id += 1
        if selected_mutation_id is None:
          raise ValueError('No mutation operator available')
        assert len(mutation_hidden) == self.state_bottleneck
        print(f'Decoder.mutation_hidden {mutation_hidden}')
        hidden_states += mutation_hidden
      target_states = []
      residual      = self.residual_weights(layer)
      for i in range(self.state_capacity):
        if self.__coin() < residual:
          state = self.__choice(states)
          print(f'Decoder residual state {state}')
        else:
          state = self.__choice(hidden_states)
          print(f'Decoder hidden state {state}')
        target_states.append(state)
      print(f'Decoder.target_states {target_states}')
      states = target_states
    return states

## Tuning

In [None]:
@dataclasses.dataclass(frozen=True)
class Optimizer:
  facc: float
  fres: float
  rate: float
  decay: float

  def __call__(self, pos, acc, grad):
    hacc  = self.interp(grad, acc, self.facc)
    res   = self.interp(grad, acc, self.fres)
    mix   = pos*self.decay+sign(res)
    delta = mix*self.rate
    return delta, hacc

class Tuner:
  quota: int
  purity: int
  density: int
  update: callable

  def measure(self, machine, data):
    energy = 0
    for i in range(self.purity):
      (init, eval)  = data()
      energy       += eval(machine(init))
    return energy/self.purity

  def __call__(self, init, cons, data):
    pos  = []
    acc  = []
    loss = []
    for _ in range(self.quota):
      for i in range(self.density):
        machine = cons(pos[i])
        loss[i] = self.measure(machine, data)
      center  = self.average(pos, loss)
      teacher = cons(center)
      cutoff  = self.measure(teacher, data)
      for i in range(self.density):
        if loss[i] <= cutoff:
          continue
        compass  = pos[i]-center
        sort     = compass*self.coherence
        search   = (compass@compass)*self.noise()
        gradient = sort+search
        vel, acc = self.update(pos[i], acc[i], gradient)
        pos[i]  += vel
        acc[i]   = acc
    return self.average(pos, loss)

## Examples

In [None]:
from bs4 import BeautifulSoup

def get_tag_body(document, tag_name):
  # Perform string replacement to transform <value> tags into <div class="value">
  document = document.replace(f"<{tag_name}>", f'<div class="{tag_name}">').replace(f"</{tag_name}>", "</div>")
  # Parse the modified HTML document with BeautifulSoup
  soup = BeautifulSoup(document, 'html.parser')
  # Find all divs with the class "value"
  value_divs = soup.find_all('div', class_=tag_name)
  # Ensure there is exactly one div with class "value"
  if len(value_divs) != 1:
    raise ValueError(f"Expected exactly one <div class='{tag_name}'> tag, but found {len(value_divs)}.")
  # Return the text content of the <div class="value">
  return value_divs[0].get_text()

class Repository:
  llm: Llama

  def __init__(self, llm):
    self.llm = llm

  def get_fitness(self, operation):
    def map_state(state):
      value = get_tag_body(state, 'value')
      match value:
        case 'True':
          return 1
        case 'False':
          return 0
        case _:
          raise ValueError(f'Expected a bool, but got {state}')
      return value
    def reduce_states(states):
      average = sum(states)/len(states)
      return average > 0.5
    def hidden(state):
      prompt = f'''
Consider the following generative text-to-image prompt:

<prompt>{state}</prompt>

{operation} Think carefully, then respond with either True or False
within <value></value> tags.
'''.strip()
      target = self.llm.reduce([prompt], map_state, reduce_states)
      return target
    return hidden

  def get_crossover(self, operation):
    def map_state(state):
      prompt = get_tag_body(state, 'prompt')
      return prompt
    def reduce_states(states):
      return states[0]
    def hidden(fst, snd):
      prompt = f'''
Consider these two generative text-to-image prompts:

<prompt>{fst}</prompt>
<prompt>{snd}</prompt>

{operation} Think carefully, then respond with the final
prompt within <prompt></prompt> tags.
'''.strip()
      target = self.llm.reduce([prompt], map_state, reduce_states)
      return target
    return hidden

  def get_mutation(self, operation):
    def map_state(state):
      prompt = get_tag_body(state, 'prompt')
      return prompt
    def reduce_states(states):
      return states[0]
    def hidden(state):
      prompt = f'''
Consider the following generative text-to-image prompt:

<prompt>{state}</prompt>

{operation} Think carefully, then respond with the final
prompt within <prompt></prompt> tags.
'''.strip()
      target = self.llm.reduce([prompt], map_state, reduce_states)
      return target
    return hidden

In [None]:
repo = Repository(llm)

fitness = [
  repo.fitness('Does this prompt mention anything related to anime?'),
  repo.fitness('Does this prompt mention any famous artists?'),
  repo.fitness('Does this prompt mention a famous location?'),
  repo.fitness('Does this prompt mention flowers?'),
  repo.fitness('Does this prompt mention animals?'),
  repo.fitness('Would this prompt be popular on Reddit?'),
  repo.fitness('Do you think images generated by this prompt would have people in them?'),
  repo.fitness('Do you think images generated by this prompt would have animals in them?'),
  repo.fitness('Does this prompt mention any specific historical art styles?'),
  repo.fitness('Does this prompt include a famous artist from before 1912?'),
  repo.fitness('Does this prompt involve a modern artist or art movement?'),
  repo.fitness('Does this prompt specify an artistic medium, like oil painting or sculpture?'),
  repo.fitness('Does this prompt mention a specific time period for the art style?'),
  repo.fitness('Does this prompt refer to a well-known historical event?'),
  repo.fitness('Does this prompt mention a specific location known for its artistic heritage?'),
  repo.fitness('Does this prompt involve mythological figures or stories?'),
  repo.fitness('Does this prompt feature any fictional characters?'),
  repo.fitness('Does this prompt mention specific colors that should dominate the artwork?'),
  repo.fitness('Does this prompt include specific elements of nature like mountains or rivers?'),
  repo.fitness('Does this prompt mention urban settings or cityscapes?'),
  repo.fitness('Does this prompt feature musical elements or musicians?'),
  repo.fitness('Does this prompt include elements that are surreal or abstract?'),
  repo.fitness('Does this prompt reference specific cultural symbols or national heritage?'),
  repo.fitness('Is the prompt likely to generate interest among art historians?'),
  repo.fitness('Does this prompt mention elements that would appeal to children?'),
  repo.fitness('Does this prompt specify a mood or atmosphere, like melancholy or joy?'),
  repo.fitness('Does this prompt suggest a narrative or scene with action?'),
  repo.fitness('Does this prompt include technological elements or futuristic themes?'),
  repo.fitness('Does this prompt have potential educational value?'),
  repo.fitness('Would images generated from this prompt likely be controversial?'),
  repo.fitness('Is the imagery likely to be peaceful or calming?'),
  repo.fitness('Would this prompt be considered unique or rare?'),
  repo.fitness('Does this prompt involve seasonal themes like winter or autumn?'),
  repo.fitness('Does this prompt involve water elements like oceans or lakes?'),
  repo.fitness('Does this prompt encourage exploring dark or horror themes?'),
  repo.fitness('Does this prompt involve celestial elements like stars or planets?'),
  repo.fitness('Is this prompt likely to engage amateur artists?'),
  repo.fitness('Does this prompt involve famous fictional stories or books?')
]

crossover = [
  repo.crossover('Combine these prompts in the most natural way possible.'),
  repo.crossover('Combine these prompts in a surprising way.'),
  repo.crossover('Think of another prompt that is a logical consequence of these prompts.'),
  repo.crossover('Combine these prompts to create a harmonious blend of their themes.'),
  repo.crossover('Merge these prompts while emphasizing any historical elements present in both.'),
  repo.crossover('Create a new prompt that juxtaposes the primary subjects of these prompts in an unexpected way.'),
  repo.crossover('Integrate the artistic styles mentioned in these prompts to form a new hybrid style.'),
  repo.crossover('Combine these prompts in a way that tells a story or creates a narrative.'),
  repo.crossover('Fuse these prompts while focusing on a dominant element from each.'),
  repo.crossover('Combine these prompts to highlight contrast, such as modern versus ancient or natural versus man-made.'),
  repo.crossover('Synthesize these prompts into a single prompt that would appeal to a specific audience or age group.'),
  repo.crossover('Create a crossover that results in an artwork focusing on the interaction of elements from each prompt.'),
  repo.crossover('Develop a prompt that would result in an artwork featuring a clash of the cultures or themes mentioned.'),
  repo.crossover('Construct a new prompt that would feature elements of fantasy or surrealism from the original prompts.'),
  repo.crossover('Merge these prompts to enhance the visual complexity of the resulting image.'),
  repo.crossover('Combine these prompts to emphasize environmental or natural elements present in both.'),
  repo.crossover('Formulate a new prompt that integrates the emotional or atmospheric tones of both prompts.'),
  repo.crossover('Design a crossover that results in an educational or informative artwork combining elements of both prompts.'),
  repo.crossover('Create a prompt that combines technological and historical themes from the original prompts.'),
  repo.crossover('Construct a prompt that merges the specific locations mentioned in the original prompts into a single scene.'),
  repo.crossover('Combine these prompts to create an artwork that could serve as a metaphor or allegory.'),
  repo.crossover('Synthesize these prompts in a way that would result in an abstract or minimalistic artwork.'),
  repo.crossover('Develop a prompt that balances elements of action and calm from the original prompts.')
]

mutation = [
  repo.mutation('Make the theme of this prompt anime.'),
  repo.mutation('Shorten this prompt to the absolute bare minimum while keeping the same meaning.'),
  repo.mutation('Make the theme of this prompt ancient Greece.'),
  repo.mutation('Make the theme of this prompt ancient Rome.'),
  repo.mutation('Reinterpret this prompt as an oil painting.'),
  repo.mutation('Reinterpret this prompt as a watercolor painting.'),
  repo.mutation('Reinterpret this prompt as experimental photography.'),
  repo.mutation('Transform this prompt into a futuristic or sci-fi theme.'),
  repo.mutation('Convert this prompt into a children’s book illustration style.'),
  repo.mutation('Adapt this prompt to the style of a famous artist not previously mentioned.'),
  repo.mutation('Modify this prompt to include surreal or abstract elements.'),
  repo.mutation('Change this prompt to focus on a specific season like winter or summer.'),
  repo.mutation('Expand this prompt to include a detailed background setting.'),
  repo.mutation('Alter this prompt to include a specific animal or creature.'),
  repo.mutation('Make this prompt evoke a specific emotion like joy or sadness.'),
  repo.mutation('Add a mythical or legendary component to this prompt.'),
  repo.mutation('Shift this prompt’s setting from day to night.'),
  repo.mutation('Enhance this prompt with a focus on dynamic movement or action.'),
  repo.mutation('Incorporate a moral or philosophical question into this prompt.'),
  repo.mutation('Adjust this prompt to emphasize architectural elements.')
]

In [None]:
initial_states = [
  "A digital painting of a bustling cityscape in the Renaissance era, bustling with traders and artists.",
  "A charcoal drawing of a serene garden filled with various types of flowers and butterflies.",
  "A surrealistic scene featuring mythical creatures interacting in a futuristic urban setting.",
  "An abstract oil painting depicting the emotional intensity of a violin player in a dimly lit room.",
  "A watercolor painting of a quiet village by the sea during the golden hour, reflecting soft, warm lights.",
  "A graphite sketch of famous landmarks from ancient Rome, including the Colosseum and Roman Forum.",
  "A series of experimental photographs capturing the bustling nightlife of Tokyo.",
  "A large-scale mural design incorporating elements of Native American folklore and landscape.",
  "An ink drawing of a medieval battle scene, illustrating detailed armor and weaponry of the era.",
  "A digital collage representing a fusion of various modern art styles seen in New York’s contemporary galleries.",
  "A minimalist pastel drawing of a snowy landscape with subtle hints of wildlife presence.",
  "A vibrant anime-style poster featuring a heroic character poised in a dynamic action pose.",
  "A classical portrait of a famous historical figure from the Enlightenment period in a detailed study room.",
  "An experimental mixed media piece that combines classical Greek sculpture with modern abstract forms.",
  "A Victorian-era street scene depicted in the style of Impressionism, focusing on light and shadow effects."
]
initial_states = numpy.random.permutation(initial_state)
layers         = 1
num_weights    = Decoder.num_weights_for_components(fitness, crossover, mutation, layers)
decoder        = Decoder(
  fitness=fitness,
  crossover=crossover,
  mutation=mutation,
  weights=numpy.random.random(num_weights),
  state_capacity=len(initial_states),
  state_bottleneck=2,
)
final_states = decoder(initial_states)
print(final_states)

# Junk

In [None]:
import numpy as np
import scipy

class Decoder:
  def __init__(self, inputs, outputs, weights, purity=100, width=12, threshold=4):
    self.inputs    = inputs
    self.outputs   = outputs
    self.weights   = weights
    self.purity    = purity
    self.width     = width
    self.threshold = threshold

  @property
  def density(self):
    return len(self.inputs)*len(self.outputs)

  # The number of weights per layer.
  @property
  def layer_capacity(self):
    return len(self.inputs) + len(self.inputs) * len(self.outputs) + 1

  @property
  def layers(self):
    return len(self.weights)//self.layer_capacity

  @property
  def capacity(self):
    return len(self.weights)

  # The weights associated with the inputs for a certain layer.
  def win(self, layer):
    lhs = layer * self.capacity
    return self.weights[lhs:lhs+len(self.inputs)]

  # The weights associated with the outputs for a certain layer.
  def wout(self, layer, input_index):
    input_lhs  = layer * self.capacity
    output_lhs = input_lhs + len(self.inputs)
    target_lhs = output_lhs + input_index * len(self.outputs)
    return self.weights[target_lhs:target_lhs+len(self.outputs)]

  # The weights associated with the residual for a certain layer.
  def wres(self, layer):
    input_lhs    = layer * self.capacity
    residual_idx = input_lhs + len(self.inputs) + len(self.inputs) * len(self.outputs)
    return self.weights[residual_idx]

  def sort(self, components, weights):
    # Each weight is the probability that the corresponding
    # component will be next in line.
    indices = np.random.choice(
      len(self.inputs),
      size=len(self.inputs),
      replace=False,
      p=scipy.special.softmax(weights),
    )
    return indices

  def __call__(self, swarm):
    for time in range(self.layers):
      inputw    = self.win(time)
      residualw = self.wres(time)
      hidden    = []
      for i in range(self.purity):
        print(f'Decoder.__call__ iteration # {i}')
        # The inputs race to provide a measurement.
        sample = []
        inputbest = None
        inputlist = self.sort(self.inputs, inputw)
        inputlist = list(reversed(inputlist))
        while inputbest is None and len(inputlist) > 0:
          inputid = inputlist.pop()
          input = self.inputs[inputid]
          sample.clear()
          for state in swarm:
            if input(state):
              print(f'Decoder.__call__ inputid={inputid} {state} True')
              sample.append(state)
            else:
              print(f'Decoder.__call__ inputid={inputid} {state} False')
            if len(sample) >= self.threshold:
              inputbest = inputid
              break
        # If no inputs were able to provide a large enough measurement
        # then an exception is raised.
        if inputbest is None:
          print(f'Decoder.__call__ inputid={inputid} Err')
          raise ValueError()
        assert len(sample) == self.threshold
        print(f'Decoder.__call__ inputbest={inputbest} Ok')
        activity = False
        # The outputs condition on the winner of the race
        # and perform their own race to provide a state.
        outputw = self.wout(time, inputbest)
        print(f'\n\n------\n\nDecoder.__call__ OUTPUT\n\n-----\n\n')
        for outputid in self.sort(self.outputs, outputw):
          output = self.outputs[outputid]
          try:
            # The output is applied to *all* of the sample values.
            # It's important to mix states across the swarm to allow
            # the analogy to continuity.
            point    = output(sample)
            activity = True
            hidden.append(point)
            print(f'Decoder.__call__ outputid={outputid} {point} Ok')
            break
          except ValueError:
            print(f'Decoder.__call__ outputid={outputid} False')
        if not activity:
          print(f'Decoder.__call__ outputid={outputid} Err')
          raise ValueError()
      # The final states race with the initial states.
      target = []
      for i in range(self.purity):
        if np.random.random() < residualw:
          state = np.random.choice(hidden)
          print(f'Decoder.__call__ residual hidden {state}')
        else:
          state = np.random.choice(swarm)
          print(f'Decoder.__call__ residual initial {state}')
        target.append(state)
      swarm = target
    return swarm

In [None]:
class Repository:
  llm: Llama

  def __init__(self, llm):
    self.llm = llm

  def input(self, input):
    def map_state(x):
      value = get_tag_body(x, 'value')
      print(f'input/map_state: value={value}')
      match value:
        case 'True':
          return True
        case 'False':
          return False
        case _:
          raise ValueError(f'Repository.input unknown value {value}')
    def reduce_state(xs):
      numbers = [1 if bool(x) else 0 for x in xs]
      average = sum(numbers)/len(xs)
      print(f'input/reduce_state: xs={xs} average={average}')
      return average > 0.5
    def body(state):
      get_state = f'''
Consider the following text-to-image prompt:

```
{state}
```

{input}

Think about it carefully, then respond with either True or False
in <value> tags, like this: <value>True</value> or <value>False</value>.
'''.strip()
      state = self.llm.reduce([get_state], map_state, reduce_state)
      print(f'input/state={state}')
      return state
    return body

  def output(self, output):
    def map_output(x):
      value = get_tag_body(x, 'value')
      print(f'output/map_output: value={value}')
      return value
    def reduce_output(xs):
      print(f'output/reduce_output: xs={xs}')
      return xs[0]
    def body(input, sample):
      get_state = f'''
Consider the following text-to-image prompts:

```
{sample}
```

These prompts have something in common: they all satisfy the
following condition:

```
{input}
```

and they are going to be transformed according to the following
operation:

```
{output}
```

Think of an interesting way to combine all of these prompts in
to a single prompt based on this information. When you come up
with a prompt, put it in <value></value> tags. Keep your prompt
short and terse while including as much visual detail as
possible.
'''.strip()
      def map_state(x):
        value = get_tag_body(x, 'value')
        print(f'output/map_state: x={x}')
        return value
      def reduce_state(xs):
        print(f'output/map_state: xs={xs}')
        return xs[0]
      state = self.llm.reduce([get_state], map_state, reduce_state)
      print(f'output/state = {state}')
      return state
    return body

In [None]:
repo = Repository(llm)

inputs = [
    # repo.input('Does this prompt mention men other than the artist?'),
    # repo.input('Does this prompt mention women other than the artist?'),
    repo.input('Does this prompt mention a famous artist?'),
    repo.input('Does this prompt mention people other than the artist?'),
    repo.input('Does this prompt not mention people other than the artist?'),
    # repo.input('Is this a classical painting?'),
    repo.input('Is this an anime illustration?'),
    # repo.input('Does this artwork belong to the Renaissance period?'),
    repo.input('Is this a modern abstract artwork?'),
    repo.input('Is this piece in the style of Impressionism?'),
    # repo.input('Does this resemble a Baroque painting?'),
    # repo.input('Is this artwork in the style of Cubism?'),
    # repo.input('Is the artist associated with the Surrealism movement?'),
    repo.input('Does this image use techniques typical of watercolor painting?'),
    repo.input('Is this artwork a digital creation?'),
    repo.input('Does this prompt mention Pixiv?'),
    repo.input('Does this prompt mention a man?'),
    repo.input('Does this prompt mention a woman?'),
    repo.input('Does this prompt mention "illustration"?'),
    # repo.input('Does this represent a form of street art?'),
    # repo.input('Is this a piece of Gothic art?'),
    # repo.input('Is this image in the style of Art Nouveau?'),
    # repo.input('Is this artwork part of the Pop Art movement?'),
    repo.input('Does this belong to the Abstract Expressionism style?'),
    # repo.input('Is this a Realism artwork?'),
    # repo.input('Does this represent Minimalism?'),
    # repo.input('Is this artwork done in the Art Deco style?'),
    # repo.input('Does this piece use techniques of Neo-Impressionism?'),
    # repo.input('Is this artwork influenced by Futurism?'),
]
outputs = [
    repo.output('Make this an anime illustration.'),
    repo.output('Make this a classical painting.'),
    repo.output('Transform this image to reflect the Renaissance art style.'),
    repo.output('Create this image in a modern abstract style.'),
    repo.output('Convert this to an Impressionist style painting.'),
    repo.output('Reimagine this as a Baroque period painting.'),
    repo.output('Adapt this image to the Cubism art style.'),
    repo.output('Render this image in a Surrealist style.'),
    repo.output('Use watercolor techniques for this image.'),
    repo.output('Create this artwork using digital media.'),
    repo.output('Style this image as street art.'),
    repo.output('Transform this into a Gothic art piece.'),
    repo.output('Create this image in the style of Art Nouveau.'),
    repo.output('Convert this to the Pop Art style.'),
    repo.output('Adapt this image to Abstract Expressionism.'),
    repo.output('Render this image as a Realism artwork.'),
    repo.output('Style this artwork with Minimalist techniques.'),
    repo.output('Create this in the Art Deco style.'),
    repo.output('Use Neo-Impressionist techniques for this image.'),
    repo.output('Reimagine this artwork in the style of Futurism.'),
]

In [None]:
is_classical = inputs[0]
fst = 'A digital art illustration by Ilya Kuvshinov.'
snd = 'An oil painting by John Singer Sargent'
print(f'{fst} = {is_classical(fst)}')
print(f'{snd} = {is_classical(snd)}')

make_anime = outputs[0]
response = make_anime(
  'Is this a classical painting?',
  [
    'An oil painting of a woman holding a parasol by John Singer Sargent',
    'A still life of fruits on a table by Joaquin Sorolla',
    'A scene from Greek mythology by Leonardo da Vinci',
  ],
)