Skip to content

Commit

Permalink
issue #21 - allow assembly into record arrays to handle millions of l…
Browse files Browse the repository at this point in the history
…og records
  • Loading branch information
justb4 committed Apr 23, 2015
1 parent 8fa96f7 commit 1d048c7
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
12 changes: 11 additions & 1 deletion examples/basics/14_logfileinput/etl.cfg
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Transform apache log to records

[etl]
chains = input_apache_log|output_std
chains = input_apache_log|to_record_array|output_std

# Transform apache log to records
[input_apache_log]
Expand All @@ -11,5 +11,15 @@ key_map = {'%>s': 'status', '%D': 'deltat', '%{User-agent}i': 'agent', '%b': 'by
file_path = input
filename_pattern = *.log

# The GML must be a simple features collection
[to_record_array]
class = filters.formatconverter.FormatConverter
input_format = record
output_format = record_array
converter_args = {
'max_len': 10
}


[output_std]
class = outputs.standardoutput.StandardOutput
6 changes: 4 additions & 2 deletions stetl/filters/formatconverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,15 +305,17 @@ def record2struct(packet, converter_args=None):
return packet

@staticmethod
def record2record_array(packet):
def record2record_array(packet, converter_args=None):
if not hasattr(packet, 'arr'):
packet.arr = list()

if packet.data is not None:
packet.arr.append(packet.data)
packet.consume()

if packet.is_end_of_stream() is True:
# At end of stream or when max array size reached: close the array
if packet.is_end_of_stream() is True or \
(converter_args is not None and len(packet.arr) >= converter_args['max_len']):
# End of stream reached: assembled record array
packet.data = packet.arr
packet.arr = list()
Expand Down
2 changes: 2 additions & 0 deletions stetl/utils/apachelog.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ def parse(self, line):
v = int(parse_date(v)[0])
except:
v = 0
elif v == '-':
v = None

# JvdB: elaborate request '%r' string
if k == '%r':
Expand Down

0 comments on commit 1d048c7

Please sign in to comment.