# New Class Definitions

<i>Version 3</i>

## Dependencies

In [1]:
from bitsets import bitset, bases
import json
import networkx as nx
import uuid
#from itertools import combinations, chain
import os

In [2]:
path = os.path.join(os.getenv('PYPROJ'), 'qualreas')

## Relation Sets

In [45]:
class RelSet(bases.BitSet):
    
    def __str__(self):
        return "|".join(self.members())
    
    def __add__(self, rs):
        return self.intersection(rs)

## Abbreviations for Algebra Summary

In [46]:
# TODO: Don't embed the abbreviation dictionary in code; create a file for it
def abbrev(term_list):
    abbrev_dict = {"Point": "Pt",
                   "ProperInterval": "PInt",
                   "Interval": "Int"}
    return '|'.join([abbrev_dict[term] for term in term_list])

## Temporal Entities

In [47]:
# See https://www.w3.org/TR/owl-time/
# types: "Point", "ProperInterval", "Duration"
class TemporalEntity(object):

    def __init__(self, types, name=None, start=None, end=None, dur=None):
        self.types = types
        self.name = name
        self.start = start
        self.end = end
        self.duration = dur

    def __repr__(self):
        if self.name:
            return f"<TemporalEntity {self.name} {self.types}>"
        else:
            return f"<TemporalEntity {self.types}>"

## Spatial Entities

In [48]:
# Don't have a good source yet for a spatial vocabulary,
# but see https://www.w3.org/2017/sdwig/bp/
class SpatialEntity(object):

    def __init__(self, types, name=None):
        self.types = types
        self.name = name

    def __repr__(self):
        if self.name:
            return f"<SpatialObject {self.name} {self.types}>"
        else:
            return f"<SpatialObject {self.types}>"

## Algebras

In [51]:
class Algebra(object):

    def __init__(self, filename):
        """An algebra is created from a JSON file containing the algebra's
        relation and transitivity table definitions.
        """
        with open(filename, 'r') as f:
            self.algebra_dict = json.load(f)

        self.name = self.algebra_dict["Name"]

        self.description = self.algebra_dict["Description"]

        # TODO: For consistency, rename relations_dict to rel_dict
        self.relations_dict = self.algebra_dict["Relations"]

        #self.elements_bitset = bitset(self.name, tuple(self.relations_dict.keys()))  # A class object
        self.elements_bitset = bitset('relset', tuple(self.relations_dict.keys()), base=RelSet)

        # TODO: Rename 'identity' to 'elements'
        self.elements = self.elements_bitset.supremum

        # The equality relations of the algebra
        self.__equality_relations = [rel for rel in self.elements if self.rel_equality(rel)]

        # Populate a dictionary that allows equality relations to be looked-up based on their domain/range.
        self.equality_relations_dict = dict()
        for eqrel in self.__equality_relations:
            dom = self.rel_domain(eqrel)[0]  # Get the single item out of the eqrel's domain set.
            self.equality_relations_dict[dom] = eqrel


        # Setup the transitivity table used by Relation Set multiplication
        self.transitivity_table = dict()
        tabledefs = self.algebra_dict["TransTable"]
        for rel1 in tabledefs:
            self.transitivity_table[rel1] = dict()
            for rel2 in tabledefs[rel1]:
                self.transitivity_table[rel1][rel2] = self.elements_bitset(tuple(tabledefs[rel1][rel2]))

    # Accessors for information about a given relation.

    def rel_name(self, rel):
        return self.relations_dict[rel]["Name"]

    def rel_domain(self, rel):
        return self.relations_dict[rel]["Domain"]

    def rel_range(self, rel):
        return self.relations_dict[rel]["Range"]

    def rel_reflexive(self, rel):
        return self.relations_dict[rel]["Reflexive"]

    def rel_symmetric(self, rel):
        return self.relations_dict[rel]["Symmetric"]

    def rel_transitive(self, rel):
        return self.relations_dict[rel]["Transitive"]

    def rel_equality(self, rel):
        return self.rel_reflexive(rel) & self.rel_symmetric(rel) & self.rel_transitive(rel)

    def converse(self, rel_or_relset):
        """Return the converse of a relation (str) or relation set (bitset)."""
        if isinstance(rel_or_relset, str):
            return self.relations_dict[rel_or_relset]["Converse"]
        else:
            return self.elements_bitset((self.converse(r) for r in rel_or_relset.members()))

    def __str__(self):
        """Return a string representation of the Algebra."""
        return f"<{self.name}: {self.description}>"

    @property
    def equality_relations(self):
        return self.__equality_relations

    def equality_relation(self, domain_or_range):
        return self.equality_relations_dict[domain_or_range]

    def relset(self, relations):
        """Return a relation set (bitset) for the given relations."""
        return self.elements_bitset(relations)
    
    def string_to_relset(self, string, delimiter='|'):
        """Take a string like 'B|M|O' and turn it into a relation set."""
        return self.relset(string.split(delimiter))

    def mult(self, relset1, relset2):
        """Multiplication is done, element-by-element, on the cross-product
        of the two sets using the algebra's transitivity table, and
        then reducing those results to a single relation set using set
        union.
        """
        result = self.elements_bitset.infimum  # the empty relation set
        for r1 in relset1:
            for r2 in relset2:
                result = result.union(self.transitivity_table[r1][r2])
        return result

    def check_multiplication_identity(self, verbose=False):
        """Check the validity of the multiplicative identity for every
        combination of singleton relset.  :param verbose: Print out
        the details of each test :return: True or False

        """
        count = 0
        result = True
        rels = self.elements
        for r in rels:
            r_rs = self.relset((r,))
            for s in rels:
                count += 1
                s_rs = self.relset((s,))
                prod1 = self.mult(r_rs, s_rs)
                prod2 = self.converse(self.mult(self.converse(s_rs), self.converse(r_rs)))
                if prod1 != prod2:
                    if verbose:
                        print("FAIL:")
                        print(f"      r    = {r_rs}")
                        print(f"      s    = {s_rs}")
                        print(f"( r *  s)  = {prod1}")
                        print(f"(si * ri)i = {prod2}")
                        print(f"{prod1} != {prod2}")
                    result = False
        if verbose:
            print(f"\n{self.name} -- Multiplication Identity Check:")
        if result:
            if verbose:
                print(f"PASSED . {count} products tested.")
        else:
            if verbose:
                print("FAILED. See FAILURE output above.")
        return result

    def summary(self):
        """Print out a summary of this algebra and its elements."""
        print(f"  Algebra Name: {self.name}")
        print(f"   Description: {self.description}")
        print(f" Equality Rels: {self.equality_relations}")
        print("     Relations:")
        print("{:>25s} {:>25s} {:>10s} {:>10s} {:>10s} {:>8s} {:>12s}".format("NAME (ABBREV)", "CONVERSE (ABBREV)",
                                                                              "REFLEXIVE", "SYMMETRIC", "TRANSITIVE",
                                                                              "DOMAIN", "RANGE"))
        # TODO: Vary spacing between columns based on max word lengths
        # For syntax used below see https://docs.python.org/3/library/string.html#format-string-syntax
        for r in self.elements:
            print(f"{self.rel_name(r):>19s} ({r:>3s}) "
                  f"{self.rel_name(self.converse(r)):>19s} ({self.converse(r):>3s}) "
                  f"{self.rel_reflexive(r)!s:>8} {self.rel_symmetric(r)!s:>10} {self.rel_transitive(r)!s:>10}"
                  f"{abbrev(self.rel_domain(r))!s:>11} {abbrev(self.rel_range(r))!s:>13}")
        # TODO: Don't hardcode the legend below; make it depend on an abbreviations file (JSON)
        print("\nDomain & Range Abbreviations:")
        print("   Pt = Point")
        print(" PInt = Proper Interval")

    def is_associative(self, verbose=False):
        result = True
        countskipped = 0
        countok = 0
        countfailed = 0
        counttotal = 0
        rels = self.elements
        for _a in rels:
            for _b in rels:
                for _c in rels:
                    if verbose:
                        print(f"{_a} x {_b} x {_c} :")
                    a_rs = self.relset((_a,))
                    b_rs = self.relset((_b,))
                    c_rs = self.relset((_c,))
                    if set(self.rel_range(_a)) & set(self.rel_domain(_b)):
                        prod_ab = a_rs * b_rs
                        prod_ab_c = prod_ab * c_rs
                        if set(self.rel_range(_b)) & set(self.rel_domain(_c)):
                            prod_bc = b_rs * c_rs
                            prod_a_bc = a_rs * prod_bc
                            if not (prod_ab_c == prod_a_bc):
                                if verbose:
                                    print(f"  Associativity fails for a = {a_rs}, b = {b_rs}, c = {c_rs}")
                                    print(
                                        f"    associativity check: {self.rel_range(_a)}::{self.rel_domain(_b)} \
                                        {self.rel_range(_b)}::{self.rel_domain(_c)}")
                                    print(f"    (a * b) * c = {prod_ab_c}")
                                    print(f"    a * (b * c) = {prod_a_bc}")
                                countfailed += 1
                                counttotal += 1
                                result = False
                            else:
                                if verbose:
                                    print("  Associativity OK")
                                countok += 1
                                counttotal += 1
                        else:
                            if verbose:
                                print(
                                    f"  Skipping check due to b x c: {self.rel_range(_b)}::{self.rel_domain(_c)}")
                            countskipped += 1
                            counttotal += 1
                    else:
                        if verbose:
                            print(f"  Skipping check due to a x b: {self.rel_range(_a)}::{self.rel_domain(_b)}")
                        countskipped += 1
                        counttotal += 1
        print(f"\nTEST SUMMARY: {countok} OK, {countskipped} Skipped, {countfailed} Failed ({counttotal} Total)")
        numrels = len(rels)
        totaltests = numrels * numrels * numrels
        if counttotal != totaltests:
            print(f"Test counts do not add up; Total should be {totaltests}")
        return result

## Instantiate An Algebra

In [52]:
#alg = Algebra(os.path.join(path, "Algebras/IntervalAlgebra.json"))  # Allen's algebra of proper time intervals
#alg = Algebra(os.path.join(path, "Algebras/LeftBranchingIntervalAndPointAlgebra.json"))
alg = Algebra(os.path.join(path, "Algebras/RightBranchingIntervalAndPointAlgebra.json"))
#alg = Algebra(os.path.join(path, "Algebras/rcc8Algebra.json"))

print(alg)

<RightBranchingTimeAlgebra: Reich's right-branching extension to Allen's time interval algebra (see TIME-94 paper)>


## Algebraic Elements

In [53]:
alg.elements

relset(['B', 'BI', 'D', 'DI', 'E', 'F', 'FI', 'M', 'MI', 'O', 'OI', 'PE', 'PF', 'PFI', 'PS', 'PSI', 'RB', 'RBI', 'RO', 'ROI', 'RS', 'R~', 'S', 'SI'])

In [54]:
before = alg.relset(['B'])
before

relset(['B'])

In [55]:
during = alg.relset(['D'])
during

relset(['D'])

In [56]:
alg.rel_name('B')

'Before'

In [57]:
bmoi = alg.relset(('B','M','OI'))
bmoi

relset(['B', 'M', 'OI'])

In [58]:
for rel in bmoi:
    print(rel)

B
M
OI


In [59]:
alg.equality_relations

['E', 'PE']

### relsets to strings & vice versa

In [60]:
bmo = str(alg.relset(['B','M','O']))
bmo

'B|M|O'

In [61]:
bmo_copy = alg.string_to_relset(bmo)
bmo_copy

relset(['B', 'M', 'O'])

## Converses

In [62]:
alg.converse('B')

'BI'

In [63]:
alg.converse(before)

relset(['BI'])

In [64]:
alg.converse(bmoi)

relset(['BI', 'MI', 'O'])

## Multiplication

In [65]:
BxD = alg.mult(before, during)
BxD

relset(['B', 'D', 'M', 'O', 'PS', 'S'])

## Addition

In [66]:
bmfsi = alg.relset(['D','M','F','SI'])
print(bmfsi.members())
BxD + bmfsi

('D', 'F', 'M', 'SI')


relset(['D', 'M'])

## Complements

In [67]:
print(before.complement().members())

('BI', 'D', 'DI', 'E', 'F', 'FI', 'M', 'MI', 'O', 'OI', 'PE', 'PF', 'PFI', 'PS', 'PSI', 'RB', 'RBI', 'RO', 'ROI', 'RS', 'R~', 'S', 'SI')


In [68]:
str(before.complement())

'BI|D|DI|E|F|FI|M|MI|O|OI|PE|PF|PFI|PS|PSI|RB|RBI|RO|ROI|RS|R~|S|SI'

In [69]:
print(BxD.complement().members())

('BI', 'DI', 'E', 'F', 'FI', 'MI', 'OI', 'PE', 'PF', 'PFI', 'PSI', 'RB', 'RBI', 'RO', 'ROI', 'RS', 'R~', 'SI')


## Check Multiplication Identity

In [70]:
alg.check_multiplication_identity()

True

## Check Associativity

In [71]:
alg.is_associative()


TEST SUMMARY: 9772 OK, 4052 Skipped, 0 Failed (13824 Total)


True

## Summary of Algebra

In [72]:
alg.summary()

  Algebra Name: RightBranchingTimeAlgebra
   Description: Reich's right-branching extension to Allen's time interval algebra (see TIME-94 paper)
 Equality Rels: ['E', 'PE']
     Relations:
            NAME (ABBREV)         CONVERSE (ABBREV)  REFLEXIVE  SYMMETRIC TRANSITIVE   DOMAIN        RANGE
             Before (  B)               After ( BI)    False      False       True    Pt|PInt       Pt|PInt
              After ( BI)              Before (  B)    False      False       True    Pt|PInt       Pt|PInt
             During (  D)            Contains ( DI)    False      False       True    Pt|PInt          PInt
           Contains ( DI)              During (  D)    False      False       True       PInt       Pt|PInt
             Equals (  E)              Equals (  E)     True       True       True       PInt          PInt
           Finishes (  F)         Finished-by ( FI)    False      False       True       PInt          PInt
        Finished-by ( FI)            Finishes (  F)    F

## Networks

In [73]:
class Network(nx.DiGraph):

    def __init__(self, algebra, name=None):
        self.algebra = algebra
        super().__init__(name=name)

    def __str__(self):
        return f"<Network--{self.name}--{self.algebra.name}>"

    def remove_constraint(self, entity1, entity2):
        # Remove edges in both directions
        if self.has_edge(entity1, entity2):
            self.remove_edge(entity1, entity2)
        if self.has_edge(entity2, entity1):
            self.remove_edge(entity2, entity1)

    def set_equality_constraint(self, entity, equality_rels):
        # Override any previous setting on this entity
        self.remove_constraint(entity, entity)
        self.add_edge(entity, entity, constraint=equality_rels)

    def add_constraint(self, entity1, entity2, relation_set=None):
        """Same as add_edge, except that two edges are added with converse constraints."""

        # Get the proper equality relation(s) for each of the two entities
        eq_rels1 = list(map(lambda x: self.algebra.equality_relation(x), entity1.types))
        eq_rels2 = list(map(lambda x: self.algebra.equality_relation(x), entity2.types))

        # Each entity must equal itself
        self.set_equality_constraint(entity1, eq_rels1)
        self.set_equality_constraint(entity2, eq_rels2)

        # Override any previously set constraints on this pair of entities
        self.remove_constraint(entity1, entity2)

        if relation_set:
            self.add_edge(entity1, entity2, constraint=relation_set)
            self.add_edge(entity2, entity1, constraint=self.algebra.converse(relation_set))
        else:
            self.add_edge(entity1, entity2, constraint=self.algebra.elements)
            self.add_edge(entity2, entity1, constraint=self.algebra.elements)

    @property
    def constraints(self):
        return self.edges

    @property
    def entities(self):
        return self.nodes

    def set_unconstrained_values(self):
        for ent1 in self.nodes():
            for ent2 in self.nodes():
                if not self.has_edge(ent1, ent2):
                    self.add_constraint(ent1, ent2, self.algebra.elements)

    def propagate(self, verbose=False):
        """Propagate constraints in the network.
        @param verbose: Print number of loops as constraints are propagated.
        """
        loop_count = 0
        self.set_unconstrained_values()
        something_changed = True  # Start off with this True so we'll loop at least once
        while something_changed:
            something_changed = False  # Immediately set to False; if nothing changes, we'll only loop once
            loop_count += 1
            for ent1 in self.__entities:
                for ent2 in self.__entities:
                    prod = self.__algebra.identity_relset
                    c12 = self.__constraints[ent1][ent2]
                    for ent3 in self.__entities:
                        c13 = self.__constraints[ent1][ent3]
                        c32 = self.__constraints[ent3][ent2]
                        prod = prod + (c13 * c32)
                    if prod != c12:
                        something_changed = True  # Need to continue top-level propagation loop
                    self.__constraints[ent1][ent2] = prod
        if verbose:
            print("Number of propagation loops: %d".format(loop_count))

    def summary(self):
        """Print out a summary of this network and its nodes, edges, and constraints."""
        print(f"{self.name}: {len(self.nodes)} nodes, {len(self.edges)} edges")
        # print("  Head")
        # print("    Tail: (Constraints)")
        for head in self.nodes:
            print(f"  {head.name}:")
            for tail in self.neighbors(head):
                print(f"    => {tail.name}: {list(self.edges[head, tail]['constraint'].members())}")