Skip to content

Commit

Permalink
Merge pull request #33 from fsteggink/master
Browse files Browse the repository at this point in the history
ZipFileInput, ZipFileExtractor changes
  • Loading branch information
justb4 committed Feb 9, 2016
2 parents 04311d6 + d801014 commit 6b70d53
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
9 changes: 8 additions & 1 deletion stetl/filters/zipfileextractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

log = Util.get_log('zipfileextractor')

BUFFER_SIZE = 1024 * 1024 * 1024


class ZipFileExtractor(Filter):
"""
Expand Down Expand Up @@ -50,7 +52,12 @@ def invoke(self, packet):

with zipfile.ZipFile(packet.data['file_path']) as z:
with open(self.cur_file_path, 'wb') as f:
f.write(z.read(packet.data['name']))
with z.open(packet.data['name']) as zf:
while True:
buffer = zf.read(BUFFER_SIZE)
if not buffer:
break
f.write(buffer)

packet.data = self.cur_file_path
return packet
Expand Down
16 changes: 16 additions & 0 deletions stetl/inputs/fileinput.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from stetl.utils.apachelog import formats, parser
from stetl.packet import FORMAT
import csv
import re

log = Util.get_log('fileinput')

Expand Down Expand Up @@ -568,6 +569,17 @@ class ZipFileInput(FileInput):
produces=FORMAT.record
"""

@Config(ptype=str, default=None, required=False)
def name_filter(self):
"""
Regular expression which is used as a filter to the ZIP file names.
Required: False
Default: None
"""
pass

def __init__(self, configdict, section):
FileInput.__init__(self, configdict, section, produces=FORMAT.record)
self.file_content = None
Expand All @@ -589,6 +601,10 @@ def read(self, packet):
zf = zipfile.ZipFile(self.cur_file_path, 'r')
self.file_content = [{'file_path': self.cur_file_path, 'name': name} for name in zf.namelist()]

# Apply name filter
if self.name_filter:
self.file_content = [item for item in self.file_content if re.match(self.name_filter, item["name"])]

log.info("zip file read : %s size=%d" % (self.cur_file_path, len(self.file_content)))

packet.data = self.file_content.pop(0)
Expand Down

0 comments on commit 6b70d53

Please sign in to comment.