-
Notifications
You must be signed in to change notification settings - Fork 54
/
test-extra.py
121 lines (77 loc) · 2.84 KB
/
test-extra.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import asyncio
import contextlib
import nose
import aioftp
from common import *
class MyClient(aioftp.Client):
async def collect(self, count):
collected = []
async with self.get_stream("COLL " + str(count), "1xx") as stream:
async for line in stream.iter_by_line():
collected.append(line)
return collected
class MyServer(aioftp.Server):
@aioftp.ConnectionConditions(
aioftp.ConnectionConditions.login_required,
aioftp.ConnectionConditions.passive_server_started)
async def coll(self, connection, rest):
@aioftp.ConnectionConditions(
aioftp.ConnectionConditions.data_connection_made,
wait=True,
fail_code="425",
fail_info="Can't open data connection")
@aioftp.server.worker
async def coll_worker(self, connection, rest):
stream = connection.data_connection
del connection.data_connection
async with stream:
for i in range(count):
await stream.write(str.encode(str(i) + "\n"))
connection.response("200", "coll transfer done")
return True
count = int(rest)
coro = coll_worker(self, connection, rest)
task = connection.loop.create_task(coro)
connection.extra_workers.add(task)
connection.response("150", "coll transfer started")
return True
def test_stream_iter_by_line():
async def worker():
server = MyServer(loop=loop)
client = MyClient(loop=loop)
await server.start("127.0.0.1", 8021)
await client.connect("127.0.0.1", 8021)
await client.login()
count = 20
expect = list(map(lambda i: str.encode(str(i) + "\n"), range(count)))
collected = await client.collect(count)
nose.tools.eq_(collected, expect)
await client.quit()
client.close()
await server.close()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
loop.run_until_complete(worker())
class CustomException(Exception):
pass
@aioftp_setup()
@with_connection
async def test_stream_close_without_finish(loop, client, server):
def fake_finish(*args, **kwargs):
raise Exception("Finish called")
await client.login()
with contextlib.suppress(CustomException):
async with client.get_stream() as stream:
stream.finish = fake_finish
raise CustomException()
@nose.tools.raises(ConnectionRefusedError)
@aioftp_setup()
async def test_no_server(loop, *args, **kwargs):
async with aioftp.ClientSession("127.0.0.1", loop=loop):
pass
@aioftp_setup()
@with_connection
async def test_syst_command(loop, client, server):
await client.login()
code, info = await client.command("syst", "215")
nose.tools.eq_(info, [" UNIX Type: L8"])