-
-
Notifications
You must be signed in to change notification settings - Fork 109
/
bro_log_reader.py
226 lines (187 loc) · 8.89 KB
/
bro_log_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
"""BroLogReader: This class reads in various Zeek logs. The class inherits from
the FileTailer class so it supports the following use cases:
- Read contents of a Zeek log file (tail=False)
- Read contents + 'tail -f' Zeek log file (tail=True)
Args:
filepath (str): The full path the file (/full/path/to/the/file.txt)
delimiter (str): The delimiter in the Zeek logs (default='\t')
tail (bool): Do a dynamic tail on the file (i.e. tail -f) (default=False)
"""
from __future__ import print_function
import os
import time
import datetime
# Local Imports
from zat.utils import file_tailer, file_utils
class BroLogReader(file_tailer.FileTailer):
"""BroLogReader: This class reads in various Zeek logs. The class inherits from
the FileTailer class so it supports the following use cases:
- Read contents of a Zeek log file (tail=False)
- Read contents + 'tail -f' Zeek log file (tail=True)
Args:
filepath (str): The full path the file (/full/path/to/the/file.txt)
delimiter (str): The delimiter in the Zeek logs (default='\t')
tail (bool): Do a dynamic tail on the file (i.e. tail -f) (default=False)
strict (bool): Raise an exception on conversions errors (default=False)
"""
def __init__(self, filepath, delimiter='\t', tail=False, strict=False):
"""Initialization for the BroLogReader Class"""
# First check if the file exists and is readable
if not os.access(filepath, os.R_OK):
raise IOError('Could not read/access zeek log file: {:s}'.format(filepath))
# Setup some class instance vars
self._filepath = filepath
self._delimiter = delimiter
self._tail = tail
self._strict = strict
# Setup the Zeek to Python Type mapper
self.field_names = []
self.field_types = []
self.type_converters = []
self.type_mapper = {'bool': lambda x: True if x == 'T' else False,
'count': int,
'int': int,
'double': float,
'time': lambda x: datetime.datetime.fromtimestamp(float(x)),
'interval': lambda x: datetime.timedelta(seconds=float(x)),
'string': lambda x: x,
'enum': lambda x: x,
'port': int,
'unknown': lambda x: x}
self.dash_mapper = {'bool': False, 'count': 0, 'int': 0, 'port': 0, 'double': 0.0,
'time': datetime.datetime.fromtimestamp(86400), 'interval': datetime.timedelta(seconds=0),
'string': '-', 'unknown:': '-'}
# Initialize the Parent Class
super(BroLogReader, self).__init__(self._filepath, full_read=True, tail=self._tail)
def readrows(self):
"""The readrows method reads in the header of the Zeek log and
then uses the parent class to yield each row of the log file
as a dictionary of {key:value, ...} based on Zeek header.
"""
# Calling the internal _readrows so we can catch issues/log rotations
reconnecting = True
while True:
# Yield the rows from the internal reader
try:
for row in self._readrows():
if reconnecting:
print('Successfully monitoring {:s}...'.format(self._filepath))
reconnecting = False
yield row
except IOError:
# If the tail option is set then we do a retry (might just be a log rotation)
if self._tail:
print('Could not open file {:s} Retrying...'.format(self._filepath))
reconnecting = True
time.sleep(5)
continue
else:
break
# If the tail option is set then we do a retry (might just be a log rotation)
if self._tail:
print('File closed {:s} Retrying...'.format(self._filepath))
reconnecting = True
time.sleep(5)
continue
else:
break
def _readrows(self):
"""Internal method _readrows, see readrows() for description"""
# Read in the Zeek Headers
offset, self.field_names, self.field_types, self.type_converters = self._parse_bro_header(self._filepath)
# Use parent class to yield each row as a dictionary
for line in self.readlines(offset=offset):
# Check for #close
if line.startswith('#close'):
return
# Yield the line as a dict
yield self.make_dict(line.strip().split(self._delimiter))
def _parse_bro_header(self, bro_log):
"""Parse the Zeek log header section.
Format example:
#separator \x09
#set_separator ,
#empty_field (empty)
#unset_field -
#path httpheader_recon
#fields ts origin useragent header_events_json
#types time string string string
"""
# Open the Zeek logfile
with open(bro_log, 'r') as bro_file:
# Skip until you find the #fields line
_line = bro_file.readline()
while not _line.startswith('#fields'):
_line = bro_file.readline()
# Read in the field names
field_names = _line.strip().split(self._delimiter)[1:]
# Read in the types
_line = bro_file.readline()
field_types = _line.strip().split(self._delimiter)[1:]
# Setup the type converters
type_converters = []
for field_type in field_types:
type_converters.append(self.type_mapper.get(field_type, self.type_mapper['unknown']))
# Keep the header offset
offset = bro_file.tell()
# Return the header info
return offset, field_names, field_types, type_converters
def make_dict(self, field_values):
''' Internal method that makes sure any dictionary elements
are properly cast into the correct types.
'''
data_dict = {}
for key, value, field_type, converter in zip(self.field_names, field_values, self.field_types, self.type_converters):
try:
# We have to deal with the '-' based on the field_type
data_dict[key] = self.dash_mapper.get(field_type, '-') if value == '-' else converter(value)
except ValueError as exc:
print('Conversion Issue for key:{:s} value:{:s}\n{:s}'.format(key, str(value), str(exc)))
data_dict[key] = value
if self._strict:
raise exc
return data_dict
def test():
"""Test for BroLogReader Python Class"""
import pytest
# Grab a test file
data_path = file_utils.relative_dir(__file__, '../data')
# For each file, create the Class and test the reader
files = ['app_stats.log', 'conn.log', 'dhcp.log', 'dns.log', 'files.log', 'ftp.log',
'http.log', 'notice.log', 'smtp.log', 'ssl.log', 'weird.log', 'x509.log']
for bro_log in files:
test_path = os.path.join(data_path, bro_log)
print('Opening Data File: {:s}'.format(test_path))
reader = BroLogReader(test_path, tail=False) # First with no tailing
for line in reader.readrows():
print(line)
print('Read with NoTail Test successful!')
# Test an empty log (a log with header/close but no data rows)
test_path = os.path.join(data_path, 'http_empty.log')
reader = BroLogReader(test_path)
for line in reader.readrows():
print(line)
# Test some of the error conditions
reader.field_names = ['good', 'error']
reader.type_converters = [int, lambda x: datetime.datetime.fromtimestamp(float(x))]
reader.make_dict([5, '0, .5, .5'])
# Test invalid file path
with pytest.raises(IOError):
BroLogReader('nowhere.log')
# Now include tailing (note: as an automated test this needs to timeout quickly)
try:
from interruptingcow import timeout
# Spin up the class
tailer = BroLogReader(test_path, tail=True)
# Tail the file for 2 seconds and then quit
try:
with timeout(2, exception=RuntimeError):
for line in tailer.readrows():
print(line)
except RuntimeError: # InterruptingCow raises a RuntimeError on timeout
print('Tailing Test successful!')
except ImportError:
print('Tailing Test not run, need interruptcow module...')
if __name__ == '__main__':
# Run the test for easy testing/debugging
test()