Skip to content

Commit

Permalink
Merge pull request #28 from fsteggink/nlextract_update
Browse files Browse the repository at this point in the history
Bedankt! Various Stetl updates for NLExtract
  • Loading branch information
justb4 committed Jan 18, 2016
2 parents 76fb0e8 + c1a273b commit ba02fc7
Show file tree
Hide file tree
Showing 6 changed files with 371 additions and 4 deletions.
2 changes: 1 addition & 1 deletion stetl/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(self, options_dict, args_dict=None):
sys.exit(1)

ETL.CONFIG_DIR = os.path.dirname(os.path.abspath(config_file))
log.info("Config/working dir =%s" % ETL.CONFIG_DIR)
log.info("Config/working dir = %s" % ETL.CONFIG_DIR)

self.configdict = ConfigParser()

Expand Down
61 changes: 61 additions & 0 deletions stetl/filters/packetwriter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
#
# Writes the payload of a packet as a string to a file.
# Based on outputs.fileoutput.FileOutput.
#
# Author: Frank Steggink
#
from stetl.component import Config
from stetl.filter import Filter
from stetl.util import Util
from stetl.packet import FORMAT

import os

log = Util.get_log('packetwriter')


class PacketWriter(Filter):
"""
Writes the payload of a packet as a string to a file.
consumes=FORMAT.any, produces=FORMAT.string
"""

# Start attribute config meta
@Config(ptype=str, default=None, required=True)
def file_path(self):
"""
File path to write content to.
Required: True
Default: None
"""
pass

# End attribute config meta

# Constructor
def __init__(self, configdict, section):
Filter.__init__(self, configdict, section, consumes=FORMAT.any, produces=FORMAT.string)
log.info("working dir %s" % os.getcwd())

def invoke(self, packet):
if packet.data is None:
return packet

file_path = self.cfg.get('file_path')
return self.write_file(packet, file_path)

def write_file(self, packet, file_path):
log.info('writing to file %s' % file_path)
out_file = open(file_path, 'w')

out_file.write(packet.to_string())

out_file.close()
log.info("written to %s" % file_path)

packet.data = file_path
return packet
10 changes: 7 additions & 3 deletions stetl/filters/templatingfilter.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ class StringTemplatingFilter(TemplatingFilter):
contains the actual values to be substituted in the template string as a record (key/value pairs).
Output is a regular string.
consumes=FORMAT.record, produces=FORMAT.string
consumes=FORMAT.record or FORMAT.record_array, produces=FORMAT.string
"""

def __init__(self, configdict, section):
TemplatingFilter.__init__(self, configdict, section, consumes=FORMAT.record)
TemplatingFilter.__init__(self, configdict, section, consumes=[FORMAT.record, FORMAT.record_array])

def create_template(self):
# Init once
Expand All @@ -115,7 +115,11 @@ def create_template(self):
self.template = Template(self.template_string)

def render_template(self, packet):
packet.data = self.template.substitute(packet.data)
if type(packet.data) is list:
packet.data = [self.template.substitute(item) for item in packet.data]
else:
packet.data = self.template.substitute(packet.data)

return packet


Expand Down
63 changes: 63 additions & 0 deletions stetl/filters/zipfileextractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# -*- coding: utf-8 -*-
#
# Extracts a file from a ZIP file, and saves it as the given file name.
#
# Author: Frank Steggink
#
from stetl.component import Config
from stetl.filter import Filter
from stetl.util import Util
from stetl.packet import FORMAT

log = Util.get_log('zipfileextractor')


class ZipFileExtractor(Filter):
"""
Extracts a file from a ZIP file, and saves it as the given file name.
consumes=FORMAT.record, produces=FORMAT.string
"""

# Start attribute config meta
@Config(ptype=str, default=None, required=True)
def file_path(self):
"""
File name to write the extracted file to.
Required: True
Default: None
"""
pass

# End attribute config meta

# Constructor
def __init__(self, configdict, section):
Filter.__init__(self, configdict, section, consumes=FORMAT.record, produces=FORMAT.string)
self.cur_file_path = self.cfg.get('file_path')

def invoke(self, packet):
event = None

if packet.data is None:
log.info("No file name given")
return packet

import os
import zipfile

with zipfile.ZipFile(packet.data['file_path']) as z:
with open(self.cur_file_path, 'wb') as f:
f.write(z.read(packet.data['name']))

packet.data = self.cur_file_path
return packet

def after_chain_invoke(self, packet):
import os.path
if os.path.isfile(self.cur_file_path):
os.remove(self.cur_file_path)

return True
37 changes: 37 additions & 0 deletions stetl/inputs/fileinput.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ def read_file(self, file_path):

return file_data


class ApacheLogFileInput(FileInput):
"""
Parses Apache log files. Lines are converted into records based on the log format.
Expand Down Expand Up @@ -560,3 +561,39 @@ def read(self, packet):
return packet


class ZipFileInput(FileInput):
"""
Parse ZIP file from file system or URL into a stream of records containing file names.
produces=FORMAT.record
"""

def __init__(self, configdict, section):
FileInput.__init__(self, configdict, section, produces=FORMAT.record)
self.file_content = None

def read(self, packet):
# No more files left and done with current file ?
if not(self.file_content) and not len(self.file_list):
packet.set_end_of_stream()
log.info("EOF file list, all files done")
return packet

# Done with current file or first file ?
if self.file_content is None:
self.cur_file_path = self.file_list.pop(0)

# Read file names
import zipfile

zf = zipfile.ZipFile(self.cur_file_path, 'r')
self.file_content = [{'file_path': self.cur_file_path, 'name': name} for name in zf.namelist()]

log.info("zip file read : %s size=%d" % (self.cur_file_path, len(self.file_content)))

packet.data = self.file_content.pop(0)

if not len(self.file_content):
self.file_content = None

return packet

0 comments on commit ba02fc7

Please sign in to comment.