Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Demonstrate that get_reader and get_writer can defer.

  • Loading branch information...
commit 9d70905eb6f3e128d3d89e0358e4a0811bf92673 1 parent b13e7d6
Gavin Panella authored
Showing with 52 additions and 1 deletion.
  1. +52 −1 tftp/test/test_protocol.py
53 tftp/test/test_protocol.py
View
@@ -1,7 +1,7 @@
'''
@author: shylent
'''
-from tftp.backend import FilesystemSynchronousBackend
+from tftp.backend import FilesystemSynchronousBackend, IReader, IWriter
from tftp.bootstrap import RemoteOriginWriteSession, RemoteOriginReadSession
from tftp.datagram import (WRQDatagram, TFTPDatagramFactory, split_opcode,
ERR_ILLEGAL_OP, RRQDatagram, ERR_ACCESS_VIOLATION, ERR_FILE_EXISTS,
@@ -192,3 +192,54 @@ def cb(ign):
def tearDown(self):
self.tftp.transport.stopListening()
self.client.transport.stopListening()
+
+
+class FilesystemAsyncBackend(FilesystemSynchronousBackend):
+
+ def __init__(self, base_path, clock):
+ super(FilesystemAsyncBackend, self).__init__(
+ base_path, can_read=True, can_write=True)
+ self.clock = clock
+
+ def get_reader(self, file_name):
+ reader = super(FilesystemAsyncBackend, self).get_reader(file_name)
+ d = Deferred()
+ self.clock.callLater(0, d.callback, reader)
+ return d
+
+ def get_writer(self, file_name):
+ writer = super(FilesystemAsyncBackend, self).get_writer(file_name)
+ d = Deferred()
+ self.clock.callLater(0, d.callback, writer)
+ return d
+
+
+class SuccessfulAsyncDispatch(unittest.TestCase):
+
+ def setUp(self):
+ self.clock = Clock()
+ self.tmp_dir_path = tempfile.mkdtemp()
+ with FilePath(self.tmp_dir_path).child('nonempty').open('w') as fd:
+ fd.write('Something uninteresting')
+ self.backend = FilesystemAsyncBackend(self.tmp_dir_path, self.clock)
+ self.tftp = TFTP(self.backend, self.clock)
+
+ def test_get_reader_can_defer(self):
+ rrq_datagram = RRQDatagram('nonempty', 'NetASCiI', {})
+ rrq_addr = ('127.0.0.1', 1069)
+ rrq_mode = "octet"
+ d = self.tftp._startSession(rrq_datagram, rrq_addr, rrq_mode)
+ self.assertFalse(d.called)
+ self.clock.advance(1)
+ self.assertTrue(d.called)
+ self.assertTrue(IReader.providedBy(d.result.backend))
+
+ def test_get_writer_can_defer(self):
+ wrq_datagram = WRQDatagram('foobar', 'NetASCiI', {})
+ wrq_addr = ('127.0.0.1', 1069)
+ wrq_mode = "octet"
+ d = self.tftp._startSession(wrq_datagram, wrq_addr, wrq_mode)
+ self.assertFalse(d.called)
+ self.clock.advance(1)
+ self.assertTrue(d.called)
+ self.assertTrue(IWriter.providedBy(d.result.backend))
Please sign in to comment.
Something went wrong with that request. Please try again.