In [32]:
import json
import re
from collections import OrderedDict

In [34]:
# Define Helper functions
def sanitize_key(key):
    return re.sub('[,;/]', '_', key)


def parseLine2(line):
    message = OrderedDict()
    firstTag = None
    continuing = None
    for seg in line.split(' '):
        if continuing:
            continuing.append(seg)
            if seg[-1:] == "'":
                message[sanitize_key(firstTag)] = parseLine2(' '.join(continuing)[1:-1])
                continuing = None
        else:
            ind = seg.find('=')
            if ind != -1:
                left = seg[:ind]
                right = seg[ind+1:]
                #(left, right) = seg.split('=')
                if left == 'msg' and right.startswith('audit'):
                    message['timestamp'] =  float(re.search('audit\(([0-9]+.[0-9]+)', seg).group(1))
                elif right.startswith("'"):
                    firstTag = left
                    continuing = [right]
                else:
                    message[sanitize_key(left)] = right
            else:
                message[sanitize_key(seg)] = None


    return message

In [30]:
logs = sc.textFile("hdfs:///user/ytesfaye/lab41_logs_small.log.gz").repartition(10)

In [35]:
log_map = logs.map(parseLine2).map(lambda x: json.dumps(x))
log_map_df = sqlCtx.jsonRDD(log_map)
#log_map_df.saveAsParquetFile('hdfs:///user/ytesfaye/lab41_logs_small_parquet')

# Use Parquet Representation

In [5]:
# If we read from a text file then save it as a parquet RDD
log_map_df = sqlCtx.read.parquet('hdfs:///user/ytesfaye/lab41_logs_small_parquet').repartition(4)
log_map_df.registerTempTable('logs2')

In [3]:
# View the schema that was generated for our parquet data
log_map_df.printSchema()

root
 |-- a0: string (nullable = true)
 |-- a1: string (nullable = true)
 |-- a2: string (nullable = true)
 |-- a3: string (nullable = true)
 |-- arch: string (nullable = true)
 |-- audit_backlog_limit: string (nullable = true)
 |-- auditd: string (nullable = true)
 |-- auid: string (nullable = true)
 |-- comm: string (nullable = true)
 |-- egid: string (nullable = true)
 |-- entries: string (nullable = true)
 |-- euid: string (nullable = true)
 |-- exe: string (nullable = true)
 |-- exit: string (nullable = true)
 |-- family: string (nullable = true)
 |-- format: string (nullable = true)
 |-- fsgid: string (nullable = true)
 |-- fsuid: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- halt_: string (nullable = true)
 |-- items: string (nullable = true)
 |-- kernel: string (nullable = true)
 |-- key: string (nullable = true)
 |-- msg: struct (nullable = true)
 |    |-- _etc_group: string (nullable = true)
 |    |-- _etc_gshadow: string (nullable = true)
 |    |-- acct: s

In [None]:
# See what UIDs exist in logs
for row in sqlCtx.sql("select distinct(uid) from logs2").collect():
    print row

In [None]:
# See what user accounts exist in logs
for acct in sqlCtx.sql('select distinct(msg.acct) from logs2 where msg.acct is not null').collect():
    print type(acct.acct), acct.acct

In [7]:
# Some user account names are in hex, decode
import binascii
acct_name = '28696E76616C6964207573657229'

print binascii.unhexlify(acct_name)

(invalid user)


In [8]:
# Look at values in the type field
lookup_code = {}
counter = 0
for row in sqlCtx.sql('select distinct(type) from logs2').collect():
    lookup_code[row.type] = counter
    counter += 1
print lookup_code

{u'CONFIG_CHANGE': 24, u'USER_START': 16, u'CRED_ACQ': 20, u'LOGIN': 19, u'USER_CHAUTHTOK': 21, u'SYSCALL': 5, u'CRED_REFR': 17, u'DAEMON_START': 7, u'CRYPTO_SESSION': 14, u'ADD_USER': 11, u'DAEMON_END': 8, u'USER_CMD': 3, u'SYSTEM_SHUTDOWN': 18, u'USER_AUTH': 10, u'CRYPTO_KEY_USER': 6, u'ANOM_ABEND': 0, u'CRED_DISP': 13, u'USER_ACCT': 9, u'USER_ERR': 12, u'USER_END': 23, u'ADD_GROUP': 4, u'USER_LOGOUT': 15, u'SYSTEM_RUNLEVEL': 1, u'USER_LOGIN': 2, u'NETFILTER_CFG': 22}
