In [6]:
import time
from threading import RLock
import logging

In [7]:
class Lww:
    '''
    LWW class for CRDTs
    '''
    def __init__(self):
        self.add_set = {}
        self.remove_set = {}
        self.add_lock = RLock()
        self.remove_lock = RLock()

In [8]:
def add(self, element):
        '''
        This method adds the element into the add_set dictionary
        of the Lww, where key is the element and value is the current unix timestamp
        :param element: Element to be add into LWW
        :return: None
        '''

        with self.add_lock:
            try:
                if self.add_set.get(element, 0) < time.time():
                    self.add_set[element] = time.time()
            except TypeError as error:
                logging.error(str(error))

In [9]:
def lookup(self, element):
        '''
        This method check whether a given element is in LWW
        :param element: Element whose presence is the checked in LWW
        :return: Boolean
        '''

        with self.add_lock, self.remove_lock:
            if element not in self.add_set:
                # Element not in add_set
                return False

            if element not in self.remove_set:
                # Element in add_set and not in remove_set
                return True

            if self.remove_set[element] < self.add_set[element]:
                # Element in both add_set and remove_set, but addition is after removal
                return True

            # Element in both add_set and remove_set, but addition is before removal
            return False

In [11]:
def remove(self, element):
        '''
        This method removes the element from the LWW
        :param element: Element to be removed
        :return: None
        '''

        with self.remove_lock:
            try:
                if self.remove_set.get(element, 0) < time.time():
                    self.remove_set[element] = time.time()
            except TypeError as error:
                logging.error(str(error))

In [12]:
def compare(self, other):
        '''
        This method checks whether the LWW is subset of the given LWW
        :param other: LWW object to be compared with
        :return: Boolean
        '''

        with self.add_lock, self.remove_lock, other.add_lock, other.remove_lock:

            # Check add_set is subset of other.add_set
            add_subset = set(self.add_set.keys()).issubset(other.add_set.keys())

            # Check remove_set is subset of other.remove_set
            remove_subset = set(self.remove_set.keys()).issubset(other.remove_set.keys())


        return add_subset and remove_subset

In [13]:
def merge(self, other):
        '''
        This method merge the LWW with the given LWW and returns a new LWW
        without affecting the original LWW
        :param other:
        :return: Lww
        '''

        lww = Lww()

        with self.add_lock, self.remove_lock, other.add_lock, other.remove_lock:

            # Merge add_set
            lww.add_set = {**self.add_set, **other.add_set}

            # Merge remove_set
            lww.remove_set = {**self.remove_set, **other.remove_set}

            # Update lww with latest timestamp in add_set
            for element, timestamp in self.add_set.items():
                lww.add_set[element] = max(lww.add_set[element], timestamp)

            # Update lww with latest timestamp in remove_set
            for element, timestamp in self.remove_set.items():
                lww.remove_set[element] = max(lww.remove_set[element], timestamp)


        return lww

In [18]:
def test_add():
    '''
    This functions test the successful addition of the element
    into the LWW
    :return: None
    '''

    lww = Lww()

    lww.add(1)
    lww.add("test_element")

    assert lww.lookup(1)
    assert lww.lookup("test_element")
    assert not lww.lookup("test")

In [19]:
def test_remove():
    '''
    This function test the successful removal of the element
    from the LWW
    :return: None
    '''

    lww = Lww()

    lww.remove(1)
    assert not lww.lookup(1)

    lww.add(1)
    assert lww.lookup(1)

In [20]:
def test_compare():
    '''
    This function test the compare method of Lww
    :return: None
    '''

    lww1 = Lww()
    lww2 = Lww()

    lww1.add(1)
    lww1.add(2)

    lww2.add(1)
    lww2.add(2)
    lww2.add(3)

    lww1.remove(1)
    lww1.remove(2)

    lww2.remove(1)
    lww2.remove(2)
    lww2.remove(3)

    assert lww1.compare(lww2)

    assert not lww2.compare(lww1)

In [21]:
def test_merge():
    '''
    This function test the merge method of Lww
    :return: None
    '''

    lww1 = Lww()
    lww2 = Lww()

    lww1.add(1)
    lww1.add(2)
    lww1.remove(1)

    lww2.add(1)
    lww2.add(3)

    lww1.remove(3)

    lww2.remove(1)

    lww = lww1.merge(lww2)

    assert {1, 2, 3}.issubset(lww.add_set.keys())
    assert {1, 3}.issubset(lww.remove_set.keys())
    assert lww.add_set[1] == lww2.add_set[1]
    assert lww.add_set[1] > lww1.add_set[1]
    assert lww.add_set[3] < lww1.remove_set[3]
    assert lww.remove_set[1] == lww2.remove_set[1]

In [22]:
def test_add_exception(caplog):
    '''
    This function test the exception handling of the add
    :param caplog: pytest fixture
    :return: None
    '''

    lww = Lww()
    lww.add([1, 2, 3])

    assert "unhashable type: 'list'" in caplog.text

In [23]:
def test_remove_exception(caplog):
    '''
    This function test the exception handling of the add
    :param caplog: pytest fixture
    :return: None
    '''

    lww = Lww()
    lww.remove({})

    assert "unhashable type: 'dict'" in caplog.text

In [24]:
def test_key_internal():
    '''
    This function validates the state of the elements in Lww
    :return: None
    '''
    lww = Lww()

    lww.add(1)
    lww.add(2)
    lww.add(3)
    lww.remove(4)
    lww.remove(2)

    assert {1, 2, 3}.issubset(lww.add_set.keys())
    assert {2, 4}.issubset(lww.remove_set.keys())

    assert 4 not in lww.add_set.keys()
    assert 1 not in lww.remove_set.keys()
    assert 3 not in lww.remove_set.keys()

In [25]:
def test_value_internal():
    '''
    This function validates internal element timestamps
    :return: None
    '''
    lww = Lww()

    lww.add(1)
    lww.add(2)
    lww.add(3)
    lww.remove(4)
    lww.remove(2)

    assert lww.remove_set[2] > lww.add_set[2]
    assert lww.add_set[3] > lww.add_set[2]
    assert lww.remove_set[4] > lww.add_set[3]
    assert lww.remove_set[2] > lww.remove_set[4]

In [26]:
def test_re_add():
    '''
    This function validates repeated addtition of same element
    :return: None
    '''
    lww = Lww()

    lww.add(1)
    lww.remove(1)
    lww.add(1)

    assert lww.lookup(1)

In [27]:
def test_re_remove():
    '''
    This function validates repeated removal of same element
    :return: None
    '''
    lww = Lww()

    lww.add(1)
    lww.remove(1)
    lww.add(1)
    lww.remove(1)

    assert not lww.lookup(1)

In [28]:
def test_remove_add():
    '''
    This function validates remove element followed by addition
    :return: None
    '''
    lww = Lww()

    lww.remove(1)
    lww.add(1)

    assert lww.lookup(1)


In [29]:
def test_rep_remove_add():
    '''
    This function validates repeated remove of element
    :return: None
    '''
    lww = Lww()

    lww.remove(1)
    lww.add(1)
    lww.remove(1)

    assert not lww.lookup(1)


In [30]:
def test_empty_lookup():
    '''
    This function validates empty lookup
    :return: None
    '''
    lww = Lww()

    assert not lww.lookup(1)