/
test_ohneio.py
140 lines (104 loc) · 3.25 KB
/
test_ohneio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import pytest
import ohneio
BUFFER_SIZES = [1, 2, 3, 5, 7, 16, 100, 210, 256]
@ohneio.protocol
def echo_n_bytes(nbytes):
while True:
data = yield from ohneio.read(nbytes)
yield from ohneio.write(data)
@pytest.mark.parametrize('read_len', BUFFER_SIZES)
@pytest.mark.parametrize('write_len', BUFFER_SIZES)
def test_buffer(read_len, write_len):
buf = ohneio.Buffer()
data = bytes(write_len)
buf.write(data)
assert len(buf.read(read_len)) == min(write_len, read_len)
assert len(buf.read(read_len)) == min(max(write_len - read_len, 0), read_len)
def test_buffer_read_same_segment_multiple_times():
buf = ohneio.Buffer()
data = [b'Hello', b'world']
for segment in data:
buf.write(segment)
for b in b''.join(data):
assert buf.read(1) == bytes([b])
def test_buffer_read_chunks_over_different_segments():
buf = ohneio.Buffer()
for segment in [b'Hello', b'World', b'No?']:
buf.write(segment)
assert buf.read(3) == b'Hel'
assert buf.read(3) == b'loW'
assert buf.read(3) == b'orl'
assert buf.read(4) == b'dNo?'
@pytest.mark.parametrize('nbytes', BUFFER_SIZES)
@pytest.mark.parametrize('data_len', BUFFER_SIZES)
def test_echo_n_bytes(nbytes, data_len):
conn = echo_n_bytes(nbytes)
data = b'\x00' * data_len
sent = 0
while sent < nbytes:
assert len(conn.read()) == 0
conn.send(data)
sent += data_len
assert len(conn.read(nbytes)) == nbytes
def wait_for(s):
while True:
data = yield from ohneio.peek()
pos = data.find(s)
if pos >= 0:
return pos
yield from ohneio.wait()
def read_until(s):
pos = yield from wait_for(LINE_SEPARATOR)
if pos > 0:
data = yield from ohneio.read(pos)
else:
data = b''
return data
LINE_SEPARATOR = b'\n'
@ohneio.protocol
def line_reader():
line = yield from read_until(LINE_SEPARATOR)
return line
@pytest.mark.parametrize('segment_len', BUFFER_SIZES)
@pytest.mark.parametrize('input_,expected', [
(b'\nhello', b''),
(b'hello\n', b'hello'),
(b'hello\nhello', b'hello'),
])
def test_line_reader(segment_len, input_, expected):
conn = line_reader()
for start in range(0, len(input_) + 1, segment_len):
end = start + segment_len
segment = input_[start:end]
conn.send(segment)
assert conn.has_result
assert conn.get_result() == expected
def test_line_reader_no_result():
conn = line_reader()
conn.send(b'hello')
assert not conn.has_result
@ohneio.protocol
def echo():
while True:
line = yield from read_until(LINE_SEPARATOR)
yield from ohneio.read(len(LINE_SEPARATOR))
yield from ohneio.write(line)
yield from ohneio.write(LINE_SEPARATOR)
def test_echo():
conn = echo()
conn.send(b'hello')
assert conn.read() == b''
conn.send(b'\nworld')
assert conn.read() == b'hello\n'
conn.send(b'\nand the rest\n')
assert conn.read() == b'world\nand the rest\n'
@ohneio.protocol
def hello():
yield from ohneio.write(b"Hello")
return "Hello"
def test_get_result():
conn = hello()
with pytest.raises(ohneio.NoResult):
conn.get_result()
assert conn.read(5) == b"Hello"
assert conn.get_result() == "Hello"