Skip to content

Commit

Permalink
Minimally working Zipfiles.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmiller committed Feb 13, 2014
1 parent 9e67394 commit fc3bcd7
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 4 deletions.
10 changes: 10 additions & 0 deletions examples/zippy.py
@@ -0,0 +1,10 @@
import ffs
from ffs.contrib import archive

# with ffs.Path.temp() as tmp:
zipfile = archive.ZipPath('/tmp/my.zip')
content1 = zipfile/'content.txt'
content1 << "These are some contents"

zipfile/'my.csv' << "number,letter\n1,a\n2,b"
zipfile << ('second.csv', "number,letter\n3,c\n4,d")
143 changes: 142 additions & 1 deletion ffs/contrib/archive.py
Expand Up @@ -3,8 +3,11 @@
they were untarred, transparently.
"""
import tarfile
import zipfile

from ffs import exceptions, nix, Path
import six

from ffs import exceptions, nix, path
from ffs.filesystem import BaseFilesystem

class TarFilesystem(BaseFilesystem):
Expand Down Expand Up @@ -225,3 +228,141 @@ def getwd(self):

def cd(self, target):
raise exceptions.InappropriateError("Can't cd() on an Archive filesystem")


class ZipFilesystem(BaseFilesystem):
"""
Zip archive based filesystem.
"""
sep = '/'

def __init__(self, archive_path):
"""
Get ourself a handle to the archive, or raise NotAZipFileError
"""
if not zipfile.is_zipfile(str(archive_path)):
raise exceptions.NotAZipFileError(
'{0} is not a Zip file Larry :('.format(archive_path))
self.archive_path = archive_path
self.zipfile = zipfile.ZipFile(archive_path)


class ZipPath(path.LeafBranchPath):
"""
Top level entrypoint for working with Zipfiles ffs.
"""
fsflavour = ZipFilesystem

def __init__(self, archive_path):
"""
Create our filesystem, store value.
"""
try:
self.fs = self.fsflavour(archive_path)
except exceptions.NotAZipFileError:
with zipfile.ZipFile(archive_path, 'a') as z:
z.writestr('.ffs', 'Created by ffs Python')
self.fs = self.fsflavour(archive_path)
self._value = archive_path

def __add__(self, other):
"""
Add something to ourself, returning a new path object.
arguments:
- `other`: *
return: path
"""
return ZipContentsPath((self._value, other))

def __lshift__(self, contents):
"""
we overload the << operator to allow us easy file writing according to the
following rules:
if contents is a tuple of strings, treat the first string as
the path of the archive contents, and the second as the file
content.
otherwise, raise TypeError.
note::
if components of the path leading to self do not exist,
they will be created. it is assumed that the user knows their
own mind.
arguments:
- `contents`: stringtype
return: None
exceptions: TypeError
"""
if not isinstance(contents, tuple):
raise TypeError("You have to write with tuples Larry...")
self/contents[0] << contents[1]
return


class ZipContentsPath(path.LeafBranchPath):
"""
Path for the contents of a Zip archive
"""
fsflavour = ZipFilesystem

def __init__(self, *args):
"""
Create our filesystem
"""
archive_path, content = args[0]
try:
self.fs = self.fsflavour(archive_path)
except exceptions.NotAZipFileError:
with zipfile.ZipFile(archive_path, 'a') as z:
z.writestr('.ffs', 'Created by ffs Python')
self.fs = self.fsflavour(archive_path)
self._archive = archive_path
self._inner_value = content
self._value = self.fs.sep.join([archive_path, content])


def __lshift__(self, contents):
"""
we overload the << operator to allow us easy file writing according to the
following rules:
if contents is not a stringtype, raise TypeError.
otherwise, treat self like a file and append contents to it.
note::
if components of the path leading to self do not exist,
they will be created. it is assumed that the user knows their
own mind.
arguments:
- `contents`: stringtype
return: None
exceptions: TypeError
"""
if not isinstance(contents, six.string_types):
raise TypeError("you have to write with a stringtype larry... ")
temp = path.Path.newdir()
FOUND = False

with zipfile.ZipFile(self._archive, 'r') as rz:
with zipfile.ZipFile(temp/'tmp.zip', 'w') as wz:
for info in rz.infolist():
content = rz.open(info).read()
if info.filename == self._inner_value:
FOUND = True
content += "\n"+contents
wz.writestr(info, content)
if not FOUND:
wz.writestr(self._inner_value, contents)
nix.mv(temp/'tmp.zip', self._archive)
nix.rmdir(temp)
return
3 changes: 3 additions & 0 deletions ffs/exceptions.py
Expand Up @@ -26,3 +26,6 @@ class BadParentingError(Error):

class NotATarFileError(Error):
"This was supposed to be a tarfile. It is not."

class NotAZipFileError(Error):
"This was supposed to be a zipfile. It is not."
4 changes: 2 additions & 2 deletions ffs/path.py
Expand Up @@ -635,9 +635,9 @@ def __lshift__(self, contents):
exceptions: TypeError
"""
if self.is_dir:
raise TypeError("you can't write to a directory larry... ")
raise TypeError("you can't write to a directory Larry... ")
if not isinstance(contents, six.string_types):
raise TypeError("you have to write with a stringtype larry... ")
raise TypeError("you have to write with a stringtype Larry... ")
with self.open('a') as fh:
fh.write(contents)
return
Expand Down
Binary file added test/contrib/fixtures/simple.zip
Binary file not shown.
100 changes: 99 additions & 1 deletion test/contrib/test_archive.py
Expand Up @@ -3,13 +3,15 @@
"""
import sys
import tarfile
import tempfile
import unittest
import zipfile

from mock import MagicMock, patch

if sys.version_info < (2, 7): import unittest2 as unittest

from ffs import exceptions, Path
from ffs import exceptions, Path, rm, touch
from ffs.contrib import archive

HERE = Path.here()
Expand Down Expand Up @@ -106,5 +108,101 @@ def test_cd_raises(self):
with self.assertRaises(exceptions.InappropriateError):
self.fs.cd('foo')


class ZipFilesystemTestCase(unittest.TestCase):
def setUp(self):
self.fs = archive.ZipFilesystem(FIXTURES/'simple.zip')

def tearDown(self):
pass

def test_sep(self):
self.assertEqual('/', self.fs.sep)

def test_not_a_zipfile_raises(self):
"Should raise"
with Path.tempfile() as temp:
with self.assertRaises(exceptions.NotAZipFileError):
fs = archive.ZipFilesystem(temp)


class ZipPathTestCase(unittest.TestCase):
def setUp(self):
self.zp = archive.ZipPath(FIXTURES/'simple.zip')
tmpath = self.tmpath = tempfile.mktemp()
self.tdir = tempfile.mkdtemp()
touch(tmpath)
self.tzp = archive.ZipPath(self.tdir + '/simple.zip')

def test_init(self):
"Should create the filesystem."
self.assertIsInstance(self.zp.fs, archive.ZipFilesystem)

def test_creates_zipfile(self):
"create a zip archive if one doesn't exist."
with Path.temp() as tmp:
created = archive.ZipPath(tmp/'my.zip')

self.assertTrue(zipfile.is_zipfile(str(tmp/'my.zip')))

def test_add(self):
"Should return a zipcontentspath"
contents = self.zp + 'other.file'
self.assertIsInstance(contents, archive.ZipContentsPath)

def test_lshift_nottuple(self):
"Should raise TypeError. Can only write tuples"
cases = [123, 12.3, {'hai': 'bai'}, object()]
for case in cases:
with self.assertRaises(TypeError):
self.tzp << case

def test_lshift_write(self):
"Should append to the file"
self.tzp << ('some.file', 'Hello Beautiful')
zf = zipfile.ZipFile(str(self.tzp))
contents = zf.read('some.file')
self.assertEqual("Hello Beautiful", contents)


class ZipContentsPathTestCase(unittest.TestCase):
def setUp(self):
tmpath = self.tmpath = tempfile.mktemp()
self.tdir = tempfile.mkdtemp()
touch(tmpath)
self.zp = archive.ZipPath(self.tdir + '/simple.zip')
self.zcp = self.zp + 'some.file'

def tearDown(self):
rm(self.tmpath)
rm(self.tdir, force=True, recursive=True)

def test_init(self):
"Should initialize"
zcp = archive.ZipContentsPath((FIXTURES/'simple.zip', 'some.file'))
self.assertIsInstance(zcp.fs, archive.ZipFilesystem)

def test_lshift_notstring(self):
"Should raise TypeError. Can only write strings"
cases = [123, 12.3, {'hai': 'bai'}, object()]
for case in cases:
with self.assertRaises(TypeError):
self.zcp << case

def test_lshift_write(self):
"Should append to the file"
self.zcp << 'Hello Beautiful'
zf = zipfile.ZipFile(str(self.zp))
contents = zf.read('some.file')
self.assertEqual("Hello Beautiful", contents)

def test_lshift_append(self):
"Should be able to append to a file in an archive."
self.zcp << 'Hello Beautiful'
self.zcp << 'Help Beautiful'
zf = zipfile.ZipFile(str(self.zp))
contents = zf.read('some.file')
self.assertEqual("Hello Beautiful\nHelp Beautiful", contents)

if __name__ == '__main__':
unittest.main()

0 comments on commit fc3bcd7

Please sign in to comment.