forked from praw-dev/praw
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_util.py
156 lines (129 loc) · 4.68 KB
/
test_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""Test praw.models.util."""
from collections import namedtuple
from praw.models.util import (
BoundedSet,
ExponentialCounter,
permissions_string,
stream_generator,
)
from .. import UnitTest
class TestBoundedSet(UnitTest):
def test_bound(self):
bset = BoundedSet(max_items=10)
[bset.add(i) for i in range(11)]
assert len(bset._set) == 10
assert 0 not in bset
def test_contains(self):
bset = BoundedSet(max_items=10)
bset.add(1)
assert 1 in bset
def test_lru_add(self):
bset = BoundedSet(max_items=10)
[bset.add(i) for i in range(10)]
bset.add(0)
bset.add(10)
assert 0 in bset
assert 1 not in bset
def test_lru_contains(self):
bset = BoundedSet(max_items=10)
[bset.add(i) for i in range(10)]
assert 0 in bset
bset.add(10)
assert 0 in bset
assert 1 not in bset
class TestExponentialCounter(UnitTest):
MAX_DELTA = 1.0 / 32
def test_exponential_counter__counter(self):
def assert_range(number, exponent):
assert number >= 2**exponent * (1 - self.MAX_DELTA)
assert number <= 2**exponent * (1 + self.MAX_DELTA)
counter = ExponentialCounter(1024)
prev_value = counter.counter()
assert_range(prev_value, 0)
for i in range(9):
value = counter.counter()
assert_range(value, 1 + i)
assert value > prev_value
prev_value = value
def test_exponential_counter__max_value(self):
counter = ExponentialCounter(5)
max_value = 5 * (1 + self.MAX_DELTA)
for _ in range(100):
value = counter.counter()
assert value <= max_value
def test_exponential_counter__reset(self):
counter = ExponentialCounter(1024)
for _ in range(100):
value = counter.counter()
assert value >= 1 - self.MAX_DELTA
assert value <= 1 + self.MAX_DELTA
counter.reset()
class TestPermissionsString(UnitTest):
PERMISSIONS = {"a", "b", "c"}
def test_permissions_string__all_explicit(self):
assert "-all,+b,+a,+c" == permissions_string(
known_permissions=self.PERMISSIONS, permissions=["b", "a", "c"]
)
def test_permissions_string__empty_list(self):
assert "-all" == permissions_string(known_permissions=set(), permissions=[])
assert "-all,-a,-b,-c" == permissions_string(
known_permissions=self.PERMISSIONS, permissions=[]
)
def test_permissions_string__none(self):
assert "+all" == permissions_string(known_permissions=set(), permissions=None)
assert "+all" == permissions_string(
known_permissions=self.PERMISSIONS, permissions=None
)
def test_permissions_string__with_additional_permissions(self):
assert "-all,+d" == permissions_string(
known_permissions=set(), permissions=["d"]
)
assert "-all,-a,-b,-c,+d" == permissions_string(
known_permissions=self.PERMISSIONS, permissions=["d"]
)
class TestStream(UnitTest):
def test_stream(
self,
):
Thing = namedtuple("Thing", ["fullname"])
initial_things = [Thing(n) for n in reversed(range(100))]
counter = 99
def generate(limit, **kwargs):
nonlocal counter
counter += 1
if counter % 2 == 0:
return initial_things
return [Thing(counter)] + initial_things[:-1]
stream = stream_generator(generate)
seen = set()
for _ in range(400):
thing = next(stream)
assert thing not in seen
seen.add(thing)
def test_stream_start_after(
self,
):
Thing = namedtuple("Thing", ["fullname"])
initial_things = [Thing(n) for n in reversed(range(100))]
counter = 99
def generate(limit, params=None, **kwargs):
nonlocal counter
counter += 1
sliced_things = initial_things
if params:
sliced_things = initial_things[
: next(
i
for i, thing in enumerate(initial_things)
if thing.fullname == params["before"]
)
]
if counter % 2 == 0:
return sliced_things
return [Thing(counter)] + sliced_things[:-1]
stream = stream_generator(generate, start_after=49)
expected_fullname = 50
for _ in range(50):
thing = next(stream)
assert thing.fullname == expected_fullname, thing
expected_fullname += 1