# RedisTimeseriesManager Tests
Tests can be run individually by uncommenting the last line of each test, or all at once in the last cell.

In [1]:

from _path import add_root
add_root(up_levels=1)

from src.redis_timeseries_manager import RedisTimeseriesManager

class RedisTimeseriesManagerTests(RedisTimeseriesManager):
    _name = 'test1'
    _lines = ['l1', 'l2']
    _timeframes = {
        'raw': {'retention_secs': 100000}, #unlimited
        '5s': {'retention_secs': 10000, 'duration': 5},
        '10s': {'retention_secs': 10000, 'duration': 10},
    }

    def _create_rule(self, c1:str, c2:str, line:str, timeframe_name:str, timeframe_specs:str, source_key:str, dest_key:str):
        if line == 'l1':
            aggregation_type = 'sum'
        else:
            aggregation_type = 'avg'
        bucket_size_secs = timeframe_specs['duration']
        self._set_rule(source_key, dest_key, aggregation_type, bucket_size_secs)


default_test_settings = {
    'host': 'localhost',
    'port': 6379,
    'db': 0,
    'password': None,
}
try:
    from settings import test_settings
except:
    test_settings = None
test_settings = test_settings or default_test_settings

tl = RedisTimeseriesManagerTests(**test_settings)

In [2]:
# set/reset test data
def reset_test_data():
    # clear previous data
    keys = tl.query_index(return_key_names=True)
    for key in keys[1]:
        tl.client.delete(key)
    # set test data
    data1 = [
        ['btc',100, 1, 2],
        ['btc',101, 3, 4],
        ['btc',102, 5, 6],
        ['eth', 101, 7, 8],
        ['eth', 102, 9, 10],
        ['eth', 103, 11, 12],
        ['eth', 104, 13, 14],
    ]
    data2 = [
        ['usd', 100, 50, 51],
        ['usd', 101, 52, 53],
        ['usd', 102, 54, 55],
        ['eur', 101, 56, 57],
        ['eur', 102, 58, 59],
        ['eur', 103, 60, 61],
    ]
    v = 500
    for i in range(100, 200):
        data2.append(
            ['jpy', i, v, v+1]
        )
        v += 2

    tl.insert(
        data=data1,
        c1='crypto',
        c2_position=0,
        create_inplace=True,
        extra_labels={'iscrypto': 'yes'}
    )
    tl.insert(
        data=data2,
        c1='irx',
        c2_position=0,
        create_inplace=True
    )

In [3]:
# test add/remove line
def test_add_remove_line():
    reset_test_data()
    key_count0 = len(tl.query_index(return_key_names=True)[1])
    syms_count = key_count0 / (len(tl._timeframes) * len(tl._lines))
    keys_per_line = syms_count * len(tl._timeframes)
    tl.add_line(
        line='l3',
        default_value=3,
    )
    key_count1 = len(tl.query_index(return_key_names=True)[1])
    assert key_count1 - key_count0 == keys_per_line, f'{keys_per_line} keys must be added'
    tl.delete_line(
        line='l3',
    )
    key_count2 = len(tl.query_index(return_key_names=True)[1])
    assert key_count2 == key_count0, 'final key count must be equal to initail key count'
    return True
#test_add_remove_line()

In [4]:
# test labels
def test_labels():
    reset_test_data()
    tl.add_line(
        line='l3',
        default_value=3,
        extra_labels={
            'foo': 'bar',
            'test': 'ok'
        }
    )
    labels = tl.stats(c1='crypto', c2='btc', timeframe='5s', line='l2').labels
    assert labels['c1'] == 'crypto', 'core label was not set'
    assert labels['iscrypto'] == 'yes', 'extra label utilizing insert() with inplace was not set'
    labels = tl.stats(c1='crypto', c2='btc', timeframe='5s', line='l3').labels
    assert labels['foo'] == 'bar', 'extra label for new line was not set'
    assert labels['test'] == 'ok'
    assert labels['c1'] == 'crypto'
    return True
# test_labels()

In [5]:
# test rules
def test_rules():
    reset_test_data()
    # adding a test line
    tl.add_line(
        line='l3',
        default_value=3,
    )
    # rules must exist between timeframes of each line, including the new line created
    # test scenario:
    #   all keys belong to timeframes other than the smallest timeframe,
    #   must have `source_key` set to the equivalent key in the smallest timeframe
    first_timeframe = list(tl._timeframes.keys())[0]
    keys = tl.query_index(return_key_names=True)[1]
    for key in keys:
        key_parts = tl._extract_key_name(key)
        if key_parts['timeframe'] == first_timeframe:
            continue
        source_key = tl._get_key_name(key_parts['c1'], key_parts['c2'], first_timeframe, key_parts['line'])
        key_info = tl.ts.info(key)
        assert key_info.source_key.decode() == source_key, f'{key} must have source_key set to {source_key}'
        #print(f'{key} have source_key set to {source_key}')
    return True
#test_rules()

In [6]:
def test_last_n_records():
    reset_test_data()
    c12 = {
        "c1": "crypto",
        "c2": "btc",
    }
    all = tl.read(**c12)[1]
    assert len(all) == 3, '3 records must be returned'
    last_1 = tl.read_last_n_records(**c12, minimum_timestamp=0, n=1)[2]
    assert len(last_1) == 1 and last_1 == all[-1:], '1 last record must be returned'
    last_2 = tl.read_last_n_records(**c12, minimum_timestamp=0, n=2)[2]
    assert len(last_2) == 2 and last_2 == all[-2:], '2 last record must be returned'
    return True
# test_last_n_records()

In [7]:
def test_last_record():
    reset_test_data()
    c12 = {
        "c1": "crypto",
        "c2": "btc",
    }
    all = tl.read(**c12)[1]
    assert len(all) == 3, '3 records must be returned'
    l = tl.read_last(**c12)[1]
    assert l == all[-1], 'last record must be returned'
    return True
#test_last_record()

In [8]:
def test_last_nth_record():
    reset_test_data()
    c12 = {
        "c1": "crypto",
        "c2": "btc",
    }
    all = tl.read(**c12)[1]
    assert len(all) == 3, '3 records must be returned'
    l1 = tl.read_last_nth_record(**c12, minimum_timestamp=0, n=1)[1]
    assert l1 == all[-1], 'last record must be returned'
    l2 = tl.read_last_nth_record(**c12, minimum_timestamp=0, n=2)[1]
    assert l2 == all[-2], '2nd last record must be returned'
    return True
# test_last_nth_record()

In [9]:
def test_delete_classifier_keys():
    reset_test_data()
    total_keys_before = len(tl.query_index(return_key_names=True)[1])
    c12 = {
        "c1": "crypto",
        "c2": "btc",
    }
    c12_keys_before = len(tl.query_index(**c12, return_key_names=True)[1])
    tl.delete_classifier_keys(**c12)
    c12_keys_after = len(tl.query_index(**c12, return_key_names=True)[1])
    total_keys_after = len(tl.query_index(return_key_names=True)[1])
    assert c12_keys_after == 0, 'all c12 keys must be deleted'
    assert total_keys_before - total_keys_after == c12_keys_before, 'only c12 keys must be deleted'
    return True
#test_delete_classifier_keys()

In [10]:
def test_clear_data():
    reset_test_data()
    c12 = {
        "c1": "crypto",
        "c2": "eth",
    }
    before = tl.read(**c12)[1]
    tl.clear_data(**c12, from_timestamp=102, to_timestamp=103)
    after = tl.read(**c12)[1]
    assert [before[0], after[-1]] == after, 'all inside data must be cleared'
    return True
#test_clear_data()

In [11]:
# test update values
def test_update():
    c1 = "crypto"
    c2 = "eth"
    timestamp = 103
    reset_test_data()
    success, result1 = tl.read(
        c1=c1,
        c2=c2,
        from_timestamp=timestamp,
        to_timestamp=timestamp,
        from_timestamp_inclusive=True,
        return_as='dict'
    )
    assert success is True, result1
    success, updated = tl.update(
        c1=c1,
        c2=c2,
        timestamp=timestamp,
        values={
            'l2': 120
        }
    )
    assert success is True, updated
    success, result2 = tl.read(
        c1=c1,
        c2=c2,
        from_timestamp=timestamp,
        to_timestamp=timestamp,
        from_timestamp_inclusive=True,
        return_as='dict'
    )
    assert success is True, result2
    assert result1['l1'][0] == result2['l1'][0], f"`l1` has been changed to {result2['l1'][0]}"
    assert result2['l2'][0] == 120, f"`l2` must be 120"
    return True
# test_update()

In [12]:
def test_all():
    assert test_add_remove_line() == True
    assert test_labels() == True
    assert test_rules() == True
    assert test_last_n_records() == True
    assert test_last_record() == True
    assert test_last_nth_record() == True
    assert test_delete_classifier_keys() == True
    assert test_clear_data() == True
    assert test_update() == True
    print("All tests have been passed.")

In [13]:
test_all()

All tests have been passed.
