In [1]:
import numpy as np

In [2]:
def duplicit_mask(points, pass_additional_info=False):
    # Returns boolean mask:
    #   True if an element in his place for the first time (in the sequence)
    #   False - already encountered this element
    
    # data[ui] = up
    # up[ui] = data
    assert len(points.shape) == 2
    
    if points.shape[0] == 0:
        result = np.zeros((0,), dtype=np.bool)
        if pass_additional_info:
            return result, (
                np.zeros((0,points.shape[1]), dtype=points.dtype),
                np.zeros((0,), np.int64),
                np.zeros((0,), np.int64)
            )
        else:
            return result
    
    unique_points, unique_index, unique_inverse = \
            np.unique(points, axis=0, \
            return_index=True, return_inverse=True)
    
    assert(unique_points.shape[0] == unique_index.shape[0])
    assert(unique_inverse.shape[0] == points.shape[0])
    
    ira = np.arange(points.shape[0])
    first_occ_of_el = unique_index[unique_inverse]
    
    result = ira <= first_occ_of_el
    
    assert len(result.shape) == 1
    
    if pass_additional_info:
        info = (unique_points, unique_index, unique_inverse)
        return result, info
    else:
        return result

In [3]:
def duplicit_mask_with_hot_elements(points, hot):
    '''
        Same as duplicit_mask but only checks if 'hot' elements are previously encountered
        Check the test for more info
        
        points - (N,Dim) of np.* type
        hot - (N,) of np.bool type
    '''
    assert len(hot.shape) == 1
    assert len(points.shape) == 2
    assert hot.shape[0] == points.shape[0]
    
    ind = np.arange(points.shape[0])
    
    hot_points, hot_ind = points[hot, ...], ind[hot]
    nothot_points, nothot_ind = points[~hot, ...], ind[~hot]
    
    hot_mask, info = duplicit_mask(hot_points, pass_additional_info=True)
    (hot_unique, hot_index, hot_inverse) = info
    
    # SEARCH MATCHES: (NH_data, UH_data)
    hot_points_real_index = hot_ind[hot_index]
    
    matches = np.all(np.equal(
        hot_unique[np.newaxis, ...],
        nothot_points[:, np.newaxis, ...]
        ), axis=2)
    
    matches_positive = np.any(matches, axis=1)
    matches = matches[matches_positive, ...]
    
    if matches.shape[0]:
        matches = np.argmax(matches, axis=1)
        matches_first = hot_points_real_index[matches]

        matches_current = nothot_ind[matches_positive]

        matches_result = matches_first >= matches_current
    else:
        matches_result = np.zeros((0,), dtype=np.bool)
        matches_current = np.zeros((0,), dtype=np.int64)
    
    # PUTTING IT TOGETHER... 
    result = np.ones(shape=(points.shape[0],), dtype=np.bool)
    result[matches_current] = matches_result
    result[hot_ind] = hot_mask
    return result

In [4]:
def move_indices_given_boolean_mask(mask, indices):
    assert len(mask.shape) == 1
    assert len(indices.shape) == 1
    
    
    index_removed = np.arange(len(mask))[~mask]
    b = np.sum(np.less_equal(
            index_removed[np.newaxis, ...], indices[..., np.newaxis]
        ), axis=-1)
    
    return np.maximum(indices - b, 0)

In [5]:
if __name__ == '__main__':
    import unittest
    import random
    
    class TestMoveIndeces(unittest.TestCase):
        def test_move_indeces(self):
            mask    = np.array([0,1,0,1,1,0,0], dtype=np.bool)
            indices = np.array([0,1,2,3,4,5,6], dtype=np.int64)
            gt      = np.array([0,0,0,1,2,2,2], dtype=np.int64)
            
            new_indices = move_indices_given_boolean_mask(mask, indices)
            
            self.assertEqual(gt.shape, new_indices.shape)
            
            for t,n in zip(new_indices, gt):
                self.assertEqual(t,n)
                
            mask    = np.array([1,1,0,1,0,1,1], dtype=np.bool)
            indices = np.array([0,1,2,3,4,5,6], dtype=np.int64)
            gt      = np.array([0,1,1,2,2,3,4], dtype=np.int64)
            
            new_indices = move_indices_given_boolean_mask(mask, indices)
            
            self.assertEqual(gt.shape, new_indices.shape)
            
            for t,n in zip(new_indices, gt):
                self.assertEqual(t,n)

In [6]:
if __name__ == '__main__':
    class TestDuplicitMask(unittest.TestCase):
        def test_easy(self):
            data = np.array([
                [1, 2],
                [1, 4],
                [1, 2],
                [3, 3],
                [1, 2],
                [2, 2],
                [3, 3]
            ])
            vysledek = np.array([True, True, False, True, False, True, False])
            v = duplicit_mask(data)
            self.assertTrue(np.all(np.equal(v, vysledek)))

        def test_random(self):
            data = np.random.rand(100,3)
            g = duplicit_mask(data)
            self.assertTrue(np.all(g == True))

        def test_ones(self):
            data = np.ones((100,3))
            g = duplicit_mask(data)
            self.assertTrue(np.all(g[0] == True))
            self.assertTrue(np.all(g[1:] == False))

        def test_two_parts(self):
            data = np.random.rand(100,3)
            data_full = np.concatenate((data, data), axis=0)
            g = duplicit_mask(data)

            self.assertTrue(np.all(g[:100] == True))
            self.assertTrue(np.all(g[100:] == False))

        def test_two_parts_2(self):
            data = np.random.rand(100,3)
            data_full = np.concatenate((data, data), axis=0)
            s = np.arange(data_full.shape[0])
            np.random.shuffle(s)
            data_full = data_full[s, ...]

            g = duplicit_mask(data)
            self.assertEqual(np.sum(g), 100)

        def test_empty(self):
            data = np.random.random((0,3))
            g = duplicit_mask(data)
            self.assertTrue(g.shape[0] == 0)
            self.assertTrue(len(g.shape) == 1)


In [7]:
if __name__ == '__main__':
    class TestDuplicitMaskWithHotElements(unittest.TestCase):
        def test_only_hot(self):
            data = np.array([
                [1, 2], [1, 4], [1, 2], [3, 3], [1, 2], [2, 2], [3, 3]
            ])
            hotness = np.ones((data.shape[0],), dtype=np.bool)

            vysledek = np.array([True, True, False, True, False, True, False])
            v = duplicit_mask_with_hot_elements(data, hotness)
            self.assertTrue(np.all(np.equal(v, vysledek)))

        def test_only_cold(self):
            data = np.array([
                [1, 2], [1, 4], [1, 2], [3, 3], [1, 2], [2, 2], [3, 3]
            ])
            hotness = np.zeros((data.shape[0],), dtype=np.bool)

            v = duplicit_mask_with_hot_elements(data, hotness)
            self.assertTrue(np.all(v == True))

        def test_custom(self):
            # [1,2] - N T N T
            # [1,4] - N N
            # [3,3] - T T
            # [2,2] - T T
            data = np.array([
                [1, 2], [1, 4], [1, 2], [3, 3], [1, 2], [2, 2], [3, 3], [2, 2], [1, 2], [1, 4]
            ])
            hotness = np.array([
                False, False, True, True, False, True, True, True, True, False
            ])
            result = np.array([
                True, True, True, True, False, True, False, False, False, True
            ])
            v = duplicit_mask_with_hot_elements(data, hotness)
            self.assertTrue(np.all(v == result))


        def test_random(self):
            ELEMENTS = 1000
            DIMENSIONS = 3

            COLDS_BEFORE_MIN = 0
            COLDS_BEFORE_MAX = 2
            HOTS_MIN = 1 
            HOTS_MAX = 3 
            COLDS_AFTER_MIN = 0
            COLDS_AFTER_MAX = 2

            N = ELEMENTS*(COLDS_AFTER_MAX + COLDS_BEFORE_MAX + HOTS_MAX)
            uniq_elems = np.random.rand(ELEMENTS, DIMENSIONS)

            empty_indexes = list(range(N))
            random.shuffle(empty_indexes)

            used = np.zeros((N,), dtype=np.bool)
            data = np.empty((N,DIMENSIONS))
            hotness = np.zeros((N,), dtype=np.bool)
            result = np.ones((N,), dtype=np.bool)

            for elem in uniq_elems:
                c_before = random.randint(COLDS_BEFORE_MIN, COLDS_BEFORE_MAX)
                c_after = random.randint(COLDS_AFTER_MIN, COLDS_AFTER_MAX)
                hots = random.randint(HOTS_MIN, HOTS_MAX)
                size = c_before + c_after + hots

                indexes = empty_indexes[:size]
                empty_indexes = empty_indexes[size:]
                uniq_elems = uniq_elems[size:]
                #del uniq_elems[:size]

                indexes.sort()

                for cbi in indexes[:c_before]:
                    # colds
                    used[cbi] = True
                    data[cbi, ...] = elem
                    hotness[cbi] = False
                    result[cbi] = True
                indexes = indexes[c_before:]
                #del indexes[:c_before]

                # first_hot
                used[indexes[0]] = True
                data[indexes[0], ...] = elem
                hotness[indexes[0]] = True
                result[indexes[0]] = True

                #del indexes[0]
                indexes = indexes[1:]

                random.shuffle(indexes)
                assert len(indexes) == hots - 1 + c_after
                for caf in indexes[:c_after]:
                    # colds after
                    used[caf] = True
                    data[caf, ...] = elem
                    hotness[caf] = False
                    result[caf] = False
                    pass
                indexes = indexes[c_after:]
                #del indexes[:c_after]

                assert len(indexes) == hots - 1
                for other_hots in indexes:
                    # other hots
                    used[other_hots] = True
                    data[other_hots, ...] = elem
                    hotness[other_hots] = True
                    result[other_hots] = False
                    pass

            data = data[used, ...]
            hotness = hotness[used]
            result = result[used]

            tested_res = duplicit_mask_with_hot_elements(data, hotness)

            self.assertTrue( np.all(tested_res == result) )


In [8]:
if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False, verbosity=2)   

test_easy (__main__.TestDuplicitMask) ... ok
test_empty (__main__.TestDuplicitMask) ... ok
test_ones (__main__.TestDuplicitMask) ... ok
test_random (__main__.TestDuplicitMask) ... ok
test_two_parts (__main__.TestDuplicitMask) ... ok
test_two_parts_2 (__main__.TestDuplicitMask) ... ok
test_custom (__main__.TestDuplicitMaskWithHotElements) ... ok
test_only_cold (__main__.TestDuplicitMaskWithHotElements) ... ok
test_only_hot (__main__.TestDuplicitMaskWithHotElements) ... ok
test_random (__main__.TestDuplicitMaskWithHotElements) ... ok
test_move_indeces (__main__.TestMoveIndeces) ... ok

----------------------------------------------------------------------
Ran 11 tests in 0.331s

OK
