Permalink
Browse files

Seek support for read operations

CloudFilesFD seek implementation using byte range HTTP GET requests.

When a seek call is performed, we stop current transfer (if any) and
prepare the headers to start a new request using a Range header. This
allows REST requests (resume downloads).

Write operations aren't supported.
  • Loading branch information...
1 parent 253cab7 commit c889280270142077cfffe68901dda4cb416a827b @reidrac reidrac committed Oct 29, 2012
Showing with 121 additions and 6 deletions.
  1. +29 −6 ftpcloudfs/fs.py
  2. +92 −0 tests/test_fs.py
View
@@ -115,6 +115,7 @@ def __init__(self, cffs, container, obj, mode):
self.closed = False
self.total_size = 0
self.stream = None
+ self.headers = dict()
if not all([container, obj]):
self.closed = True
@@ -145,11 +146,11 @@ def close(self):
def read(self, size=65536):
'''Read data from the object.
- We can use just one request because 'seek' is not supported.
+ We can use just one request because 'seek' is not fully supported.
NB: It uses the size passed into the first call for all subsequent calls'''
if not self.stream:
- self.stream = self.obj.stream(size)
+ self.stream = self.obj.stream(size, hdrs=self.headers)
logging.debug("read size=%r, total_size=%r, obj.size=%r" % (size, self.total_size, self.obj.size))
try:
@@ -160,10 +161,32 @@ def read(self, size=65536):
else:
return buff
- def seek(self, *kargs, **kwargs):
- '''Seek in the object: FIXME doesn't work and raises an error'''
- logging.debug("seek args=%s, kargs=%s" % (str(kargs), str(kwargs)))
- raise IOSError(EPERM, "Seek not implemented")
+ def seek(self, offset, whence=None):
+ '''Seek in the object.
+
+ It's supported only for read operations because of the object storage limitations.'''
+ logging.debug("seek offset=%s, whence=%s" % (str(offset), str(whence)))
+ if 'r' in self.mode:
+ if not whence:
+ offs = offset
+ elif whence == 1:
+ offs = self.total_size + offset
+ elif whence == 2:
+ offs = self.obj.size - offset
+ else:
+ raise IOSError(EPERM, "Invalid file offset")
+
+ if offs < 0 or offs > self.obj.size:
+ raise IOSError(EPERM, "Invalid file offset")
+
+ # we need to start over after a seek call
+ if self.stream:
+ self.stream = None
+ self.obj = self.container.get_object(self.name)
+ self.headers['Range'] = "bytes=%s-" % offs
+ self.total_size = offs
+ else:
+ raise IOSError(EPERM, "Seek not available for write operations")
class ListDirCache(object):
'''
View
@@ -7,6 +7,7 @@
from datetime import datetime
import cloudfiles
from ftpcloudfs.fs import CloudFilesFS, ListDirCache
+from ftpcloudfs.errors import IOSError
import logging
#logging.getLogger().setLevel(logging.DEBUG)
@@ -369,6 +370,97 @@ def test_listdir_manifest(self):
for i in range(1, 5):
self.cnx.remove("testfile.part/%d" % i)
+ def test_seek_set_resume(self):
+ ''' seek/resume functionality (seek_set) '''
+ content_string = "This is a chunk of data"*1024
+ self.create_file("testfile.txt", content_string)
+ self.assertEquals(self.cnx.getsize("testfile.txt"), len(content_string))
+
+ fd = self.cnx.open("testfile.txt", "rb")
+ contents = fd.read(1024)
+ fd.close()
+
+ fd = self.cnx.open("testfile.txt", "rb")
+ fd.seek(1024)
+ contents += fd.read(512)
+ fd.close()
+
+ fd = self.cnx.open("testfile.txt", "rb")
+ fd.seek(1024+512)
+ contents += fd.read()
+ fd.close()
+
+ self.assertEqual(contents, content_string)
+ self.cnx.remove("testfile.txt")
+
+ def test_seek_end_resume(self):
+ ''' seek/resume functionality (seek_end) '''
+ content_string = "This is another chunk of data"*1024
+ self.create_file("testfile.txt", content_string)
+ self.assertEquals(self.cnx.getsize("testfile.txt"), len(content_string))
+
+ fd = self.cnx.open("testfile.txt", "rb")
+ contents = fd.read(len(content_string)-1024)
+ fd.close()
+
+ fd = self.cnx.open("testfile.txt", "rb")
+ fd.seek(1024, 2)
+ contents += fd.read()
+ fd.close()
+
+ self.assertEqual(contents, content_string)
+ self.cnx.remove("testfile.txt")
+
+ def test_seek_cur_resume(self):
+ ''' seek/resume functionality (seek_cur) '''
+ content_string = "This is another chunk of data"*1024
+ self.create_file("testfile.txt", content_string)
+ self.assertEquals(self.cnx.getsize("testfile.txt"), len(content_string))
+
+ fd = self.cnx.open("testfile.txt", "rb")
+ contents = fd.read(len(content_string)-1024)
+ fd.close()
+
+ fd = self.cnx.open("testfile.txt", "rb")
+ fd.seek(1024)
+ fd.read(512)
+ fd.seek(len(content_string)-1024-512-1024, 1)
+ contents += fd.read()
+ fd.close()
+
+ self.assertEqual(contents, content_string)
+ self.cnx.remove("testfile.txt")
+
+ def test_seek_invalid_offset(self):
+ ''' seek functionality, invalid offset '''
+ content_string = "0"*1024
+ self.create_file("testfile.txt", content_string)
+ self.assertEquals(self.cnx.getsize("testfile.txt"), len(content_string))
+
+ fd = self.cnx.open("testfile.txt", "rb")
+ self.assertRaises(IOSError, fd.seek, 1025)
+ fd.close()
+
+ fd = self.cnx.open("testfile.txt", "rb")
+ self.assertRaises(IOSError, fd.seek, -1)
+ fd.close()
+
+ fd = self.cnx.open("testfile.txt", "rb")
+ self.assertRaises(IOSError, fd.seek, -1, 2)
+ fd.close()
+
+ fd = self.cnx.open("testfile.txt", "rb")
+ self.assertRaises(IOSError, fd.seek, 1025, 2)
+ fd.close()
+
+ fd = self.cnx.open("testfile.txt", "rb")
+ fd.read(512)
+ self.assertRaises(IOSError, fd.seek, 513, 1)
+ self.assertRaises(IOSError, fd.seek, -513, 1)
+ fd.close()
+
+ self.cnx.remove("testfile.txt")
+
def tearDown(self):
# Delete eveything from the container using the API
fails = self.container.list_objects()

0 comments on commit c889280

Please sign in to comment.