In [5]:
import yaml
import re
import enum
from pprint import pprint, pformat
import itertools
import math
import time

from dataclasses import dataclass
from typing import List, Tuple, Set
from copy import copy


def parse_spec(file_path):
  with open(file_path, 'r') as file_handle:
    spec = yaml.safe_load(file_handle)
  return spec

In [6]:
ex_spec = parse_spec('video-editor.yaml')

## Comparing Objects

In [7]:
def process_listable(target, func):
  if target is None:
    return

  if type(target) == list:
    for target_item in target:
      func(target_item)
  else:
    func(target)

def strip_type(obj):
  return re.sub(r'\((\w|-)*\) ', '', obj)


In [8]:
class ObjectRegistry(object):
  registry: dict[str, Set[str]] = {}

  def __init__(self):
    self.registry = {}

  def __str__(self):
    return pformat(self.registry)

  def register_object(self, obj):
    # -> means mapto
    # . means subset
    # / means component  # TODO: actually do this!!
    assert(type(obj) is str)
    obj = strip_type(obj)

    arrow_array = obj.split('->')
    for arrow_idx, arrow_term in enumerate(arrow_array):
      
      dot_array = arrow_term.split('.')
      for dot_idx, dot_term in enumerate(dot_array):
        subject = '.'.join(dot_array[:dot_idx + 1]) # join up to idx
        if not self.registry.get(subject):
          self.registry[subject] = set()
        
        if dot_idx > 0:
          # every sequence maps to the previous element eg. a.b.c => {a.b.c: {a.b}, a.b: {a}}
          previous = '.'.join(dot_array[:dot_idx])
          self.registry[subject].add(previous)
      
      if arrow_idx > 0:
        # a pair of arrows lhs->rhs => {lhs: {rhs}}
        lhs = self.registry[arrow_array[arrow_idx - 1]]
        rhs = arrow_array[arrow_idx]
        lhs.add(rhs)

  def register_struct_objects(self, struct, process_func):
    if struct.get('type') == 'group':
      # Groups also behave as objects, so register them
      name = struct.get('name')
      if name is None:
        print('Warning! No name provided for group. Using `NO NAME PROVIDED` instead. TODO: generate ID.')
        name = 'NO NAME PROVIDED'

      if self.registry.get(name) is None:
        self.registry[name] = set()
      
      # TODO: do groups map to their elements? Really, does the transitive property apply? My hunch is no, but need to think more

    process_listable(struct.get('affects'), process_func)
    process_listable(struct.get('covered-by'), process_func)
    # TODO: relate all of the objects affected/covered by a structure. 

    for derivative in struct.get('structures', []):
      self.register_struct_objects(derivative, registry=self.registry, process_func=process_func)


  def register_repr_objects(self, repr, process_func):
    repr_type = repr.get('type', '')
    # print(repr_type)
    for repr_item in repr.get('objects', []):
      assert(len(repr_item.values()) == 1)
      repr_obj, target_objs = list(repr_item.items())[0]

      process_listable(target_objs, process_func)

      # Map every item to the associated representational object
      # eg. {message: {textbox}, author: {textbox}}
      def register_repr_maps(target):
        # note: this is a bit backwards compared to the syntax!
        if self.registry.get(target) is None:
          self.registry[target] = set()
        self.registry[target].add(repr_obj)
        # objects[target].add(repr_type + '/' + repr_obj)  # when we prefix the core stuff
      process_listable(target_objs, register_repr_maps)

  def register_spec_objects(self, spec): # {object: [mapto-targets]}
    # TODO: deal with `objects` block
    def register_object_here(target):
      self.register_object(target)
    
    process_listable(spec.get('objects'), register_object_here)

    for struct in spec.get('structures', []):
      self.register_struct_objects(struct, register_object_here)
      
    for repr in spec.get('representations', []):
      self.register_repr_objects(repr, register_object_here)


In [9]:
# Test in Object Registry
print('testing: a')
test = ObjectRegistry()
print('expected:')
pprint({'a': set()})
print('got:')
test.register_object('a')
print(test)

print()

print('testing: a.b.c')
print('expected:')
pprint({'a': set(), 'a.b': {'a'}, 'a.b.c': {'a.b'}})
print('got:')
test = ObjectRegistry()
test.register_object('a.b.c')
print(test)

print()

print('testing: a->b->c')
print('expected:')
pprint({'a': {'b'}, 'b': {'c'}, 'c': set()})

print('got:')
test = ObjectRegistry()
test.register_object('a->b->c')
print(test)

print()

print('testing: a.b->x->z.w')
print('expected:')
print({'a': set(), 'a.b': {'x', 'a'}, 'x': {'z.w'}, 'z': set(), 'z.w': {'z'}})

print('got:')
test = ObjectRegistry()
test.register_object('a.b->x->z.w')
print(test)

testing: a
expected:
{'a': set()}
got:
{'a': set()}

testing: a.b.c
expected:
{'a': set(), 'a.b': {'a'}, 'a.b.c': {'a.b'}}
got:
{'a': set(), 'a.b': {'a'}, 'a.b.c': {'a.b'}}

testing: a->b->c
expected:
{'a': {'b'}, 'b': {'c'}, 'c': set()}
got:
{'a': {'b'}, 'b': {'c'}, 'c': set()}

testing: a.b->x->z.w
expected:
{'a': set(), 'a.b': {'a', 'x'}, 'x': {'z.w'}, 'z': set(), 'z.w': {'z'}}
got:
{'a': set(), 'a.b': {'a', 'x'}, 'x': {'z.w'}, 'z': set(), 'z.w': {'z'}}


In [10]:
ex_obj_registry = ObjectRegistry()
ex_obj_registry.register_spec_objects(ex_spec)
print(ex_obj_registry)

{'playhead': {'vlines', 'videos.in-editor/images'},
 'playhead->videos.in-editor/images': {'rects'},
 'timestamps': {'vlines'},
 'tracks': {'regions'},
 'videos': set(),
 'videos.in-editor': {'videos', 'rects'},
 'videos.in-editor/images': {'videos'},
 'videos/first-frame': {'regions'}}


In [11]:
core_spec = parse_spec('core.yaml')
core_objs = ObjectRegistry()

# TODO: probably need to consider prefixing gui on these
for repr_type in core_spec.get('representation-types', []):
  # print(repr_type['name'])
  core_objs.register_spec_objects(repr_type)

# register_spec_objects(core_spec, core_objs)
print(core_objs)

{'hlines': {'lines'},
 'icons': {'regions'},
 'lines': {'regions'},
 'points': set(),
 'rects': {'regions'},
 'regions': set(),
 'vlines': {'lines'}}


In [12]:
# NOTE: This is general! Assuming your relations/registry is shaped properly
def get_node_join(left, right, relations, visited=set()):
  assert(type(left) is str)
  assert(type(right) is str)

  # Trace print
  # print(f"{left:<8} {right:<}")

  if left in visited or right in visited:
    # To avoid cycles, abort if we've already visited a node
    # NOTE: Not like 100% sure this is clean
    print("WARNING! Cycle detected in the relations.")
    return None

  if left == right:
    return left
  
  for next_left in relations[left]:
    left_res = get_node_join(next_left, right, relations, visited=visited)
    if left_res is None:
      # When you reach the leaf of a spanning tree, pop back up to recurse
      for next_right in relations[right]:
        return get_node_join(left, next_right, relations, visited=visited)
    else:
      return left_res

  if len(relations[right]) == 0 and len(relations[left]) == 0:
    return None

print()
print('vlines, hlines => ' + str(get_node_join('vlines', 'hlines', core_objs.registry)))
print('rects, hlines => ' + str(get_node_join('rects', 'hlines', core_objs.registry)))
print('points, hlines => ' + str(get_node_join('points', 'hlines', core_objs.registry)))


vlines, hlines => lines
rects, hlines => regions
points, hlines => None


## Comparing Structures
- need to show that two structures are "the same" based on their types
- do structures map to other structures

In [13]:
class StructRegistry(object):
  # Keep track of the type of each structure
  # TODO: I guess you only ever have the one type huh? keeping it a set for now
  # so that I can use get_node_join() without changing it.
  typeRegistry: dict[str, Set[str]] = {}

  # Keep track of which structure maps to which
  mapRegistry: dict[str, Set[str]] = {}

  def __init__(self):
    typeRegistry = {}
    mapRegistry = {}

  def register_struct(self, struct):
    name = struct.get('name')
    assert(name is not None)

    # Register type
    if self.typeRegistry.get(name) is None:
      self.typeRegistry[name] = set()
    
    if struct_type := struct.get('type'):
      self.typeRegistry[struct_type] 

    # Register map
    if self.mapRegistry.get(name) is None:
      self.mapRegistry[name] = set()
    
    if struct_maps := struct.get('mapto'):
      def process_struct_map(target):
        self.mapRegistry[name].add(target)
      
      process_listable(struct_maps, process_struct_map)
    
    # Registrer connections for objects
    # TODO
  
  def register_spec(self, spec):
    for struct in spec.get('structures', []):
      self.register_struct(struct)
      

## Construct Graph for Pattern Matching

How does this thing even work?  

Nodes are structures.  
A directed edge between nodes if S1 mapsto S2.
An undirected edge between nodes if they refer to 


In [14]:
__next_id = 0
def new_id():
  global __next_id
  current = __next_id
  __next_id += 1
  return current

def reset_id_gen():
  global __next_id
  print("WARNING: Resetting ID Gen, any previously generated ids are no longer meaningful.")
  __next_id = 0

In [15]:
class NodeKind(enum.Enum):
  Structure = enum.auto()

class EdgeKind(enum.Enum):
  # Directed
  StructMap = enum.auto()

  # Undirected
  ObjRef = enum.auto()


@dataclass
class Node:
  id: int
  kind: NodeKind
  def __str__(self):
    return f'{self.kind}: {self.id}'

@dataclass
class Edge:
  id: int
  kind: EdgeKind
  label: str
  source: int
  target: int

  def __str__(self):
    return f'{self.kind}: ({self.source}-{self.target})'

@dataclass
class Graph:
  nodes: List[Node]
  edges: List[Edge]

  def __str__(self):
    res = 'Nodes:\n'
    res += ', '.join([str(n) for n in self.nodes])
    res += '\n'
    res += 'Edges: \n'
    res += '\n'.join([str(e) for e in self.edges])
    return res

In [16]:
def register_id(name: str, registry: dict[str, int]) -> int:
  if existing_id := registry.get(name) is None:
    return existing_id
  
  new_id = new_id()
  registry[name] = new_id
  return new_id
  

def make_graph(spec_file: str) -> Graph:
  graph = Graph([],[])
  spec = parse_spec(spec_file)  
  
  obj_reg = ObjectRegistry()
  obj_reg.register_spec_objects(spec)

  struct_reg = StructRegistry()
  struct_reg.register_spec(spec)
  
  ids: dict[str, int] = {} # { spec_name: id }
  for struct in spec.get('structures', []):
    struct_name = struct.get('name')
    assert(struct_name is not None)

    struct_node = Node(register_id(struct_name), NodeKind.Structure)
    graph.nodes.append(struct_node)

    
    


## Error Tolerant Graph Matching

In [17]:
# Returns
# [
#   [first pairing],
#   [second pairing],
#   ...
# ]
def list_pairings(s1: Set, s2: Set):
  assert(len(s1) <= len(s2))
  l1 = list(s1)
  mappings = []

  for subset_choice in itertools.combinations(s2, len(s1)):
    # Pick a subset that you want to look at
    for permutation in itertools.permutations(subset_choice):
      # try all permutations of s2 against (static) s1
      mappings.append(list(zip(l1, permutation)))
  
  return mappings

# def count_list_pairings(s1: Set, s2: Set):
#   assert(len(s1) <= len(s2))
#   return math.comb(len(s2), len(s1)) * math.perm(len(s1))

In [18]:
def subset_pairings(s1: Set, s2: Set, size):
  l1 = list(s1)
  l2 = list(s2)

  assert(size <= len(l1) <= len(l2))

  res = []
  for subset1 in itertools.combinations(s1, size):
    for subset2 in itertools.combinations(s2, size):
      # Pick out a subset for s1 and s2, and list their pairings
      # print(len(list_pairings(subset1, subset2)))
      res += list_pairings(subset1, subset2)

  return res

# def count_subset_pairings(s1, s2, size: int):
#   assert(size <= len(s1) <= len(s2))
#   new_s1_size = len(s1) - size
#   new_s2_size = len(s2) - size
#   print('new size', new_s1_size)
#   n1 = list(range(new_s1_size))
#   n2 = list(range(new_s2_size))
#   print(math.comb(len(s1), size))
#   print(math.comb(len(s2), size))
#   print(count_list_pairings(n1, n2))
#   print('TODO: I think this is wrong')
#   # return math.comb(len(s1), size) * math.comb(len(s2), size) * count_list_pairings(n1, n2)


In [19]:
s1 = set({1,2,3})
s2 = set({'a','b', 'c'})

subset_pairings(s1, s2, 2)

[[(1, 'c'), (2, 'a')],
 [(1, 'a'), (2, 'c')],
 [(1, 'c'), (2, 'b')],
 [(1, 'b'), (2, 'c')],
 [(1, 'a'), (2, 'b')],
 [(1, 'b'), (2, 'a')],
 [(1, 'c'), (3, 'a')],
 [(1, 'a'), (3, 'c')],
 [(1, 'c'), (3, 'b')],
 [(1, 'b'), (3, 'c')],
 [(1, 'a'), (3, 'b')],
 [(1, 'b'), (3, 'a')],
 [(2, 'c'), (3, 'a')],
 [(2, 'a'), (3, 'c')],
 [(2, 'c'), (3, 'b')],
 [(2, 'b'), (3, 'c')],
 [(2, 'a'), (3, 'b')],
 [(2, 'b'), (3, 'a')]]

In [20]:
def all_subset_pairings(s1: Set, s2: Set, verbose=False):
  acc = []
  for i in range(min(len(s1), len(s2))):
    size = i + 1
    inter_res = subset_pairings(s1, s2, size)
    acc += inter_res

    if verbose:
      num_res = len(inter_res)
      print(f'size = {size}')
      print(inter_res)
      print(f'len = {num_res}')
      print()
  return acc

# def count_all_subset_pairings(s1, s2, verbose=False):
#   count = 0
#   max_size = min(len(s1), len(s2))
#   if verbose:
#     print(f'Going up to size {max_size}')
#   for i in range(max_size):
#     size = i + 1
#     inter_res = len(subset_pairings(s1, s2, size))
#     count += inter_res

#     if verbose:
#       print(f'size = {size}')
#       print(f'len = {inter_res}')
#       print()
#   return count

# count_all_subset_pairings(g1.nodes, g2.nodes, True)

print('total: ' + str(len(all_subset_pairings(s1, s2, verbose=True))))

size = 1
[[(1, 'c')], [(1, 'a')], [(1, 'b')], [(2, 'c')], [(2, 'a')], [(2, 'b')], [(3, 'c')], [(3, 'a')], [(3, 'b')]]
len = 9

size = 2
[[(1, 'c'), (2, 'a')], [(1, 'a'), (2, 'c')], [(1, 'c'), (2, 'b')], [(1, 'b'), (2, 'c')], [(1, 'a'), (2, 'b')], [(1, 'b'), (2, 'a')], [(1, 'c'), (3, 'a')], [(1, 'a'), (3, 'c')], [(1, 'c'), (3, 'b')], [(1, 'b'), (3, 'c')], [(1, 'a'), (3, 'b')], [(1, 'b'), (3, 'a')], [(2, 'c'), (3, 'a')], [(2, 'a'), (3, 'c')], [(2, 'c'), (3, 'b')], [(2, 'b'), (3, 'c')], [(2, 'a'), (3, 'b')], [(2, 'b'), (3, 'a')]]
len = 18

size = 3
[[(1, 'c'), (2, 'a'), (3, 'b')], [(1, 'c'), (2, 'b'), (3, 'a')], [(1, 'a'), (2, 'c'), (3, 'b')], [(1, 'a'), (2, 'b'), (3, 'c')], [(1, 'b'), (2, 'c'), (3, 'a')], [(1, 'b'), (2, 'a'), (3, 'c')]]
len = 6

total: 33


In [None]:
last_time = time.time()
for k in range(10):
  s1 = set(range(k))
  print('subset size =', k)
  print(len(all_subset_pairings(s1, s1)))
  new_time = time.time()
  print(f"time taken: {new_time - last_time} seconds\n")
  last_time = new_time


len = 0
0
time taken: 0.0005543231964111328 seconds

len = 1
1
time taken: 4.673004150390625e-05 seconds

len = 2
6
time taken: 4.982948303222656e-05 seconds

len = 3
33
time taken: 8.034706115722656e-05 seconds

len = 4
208
time taken: 0.0005238056182861328 seconds

len = 5
1545
time taken: 0.0016562938690185547 seconds

len = 6
13326
time taken: 0.02014780044555664 seconds

len = 7
130921
time taken: 0.41059303283691406 seconds

len = 8
1441728
time taken: 3.935447931289673 seconds

len = 9


Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x1076a0450>>
Traceback (most recent call last):
  File "/Users/mattbl/dev/computable-interface-schema/venv/lib/python3.11/site-packages/ipykernel/ipkernel.py", line 775, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(

KeyboardInterrupt: 
