In [2]:
# -*- coding: utf-8 -*-
#!/usr/bin/python
import matplotlib.pyplot as plt
import pandas as pd
import dateparser
import re

In [3]:
def log_reader(fin):
    """
    Read stoRM's log file and transform into a list of lines (str)
    
    Recive path (string) to unstructured log file
    Return a list containing where each element is a log's line
    """
    listed_log = []
    
    input_file = open(fin,"r")
    for line in input_file:
        listed_log.append(line.strip())
    input_file.close()
    
    return listed_log

In [4]:
def log_reader_small(fin,start,end):
    """
    Read stoRM's log file and transform into a list of lines (str)
    
    Recive path (string) to unstructured log file
    Return a list containing where each element is a log's line
    """
    listed_log = []
    
    input_file = open(fin,"r")
    i = start
    for line in input_file:
        listed_log.append(line.strip())
        i+=1
        if i == end:
            break
    input_file.close()
    
    return listed_log

In [5]:
def log_tabler(listed_log):
    """
    Transform a log's list of lines (str) in a dictionary
    
    Recive a list containing where each element is a (stoRM) log's line
    Return a table (dict) where each key is a log's column
    """
    #timestamp is yet to be finished
    date, time_stamp, thread, tipe, token, message = [], [], [], [], [], []
    it = 0
    total = len(listed_log)
    for line in listed_log:
        date.append(line[:18])
        time_stamp.append(dateparser.storm_dtpars(line[:18]))
        thread.append(line.split(" ",4)[3])
        tipe.append(line.split(" ",7)[6])
        token.append(line.split("[",1)[1].split("]",1)[0])
        message.append(line.split(":",3)[3].rstrip().lstrip())
        #if it%100000 == 0 :
        #    print " parsed line {0} of {1} lines".format(it,total)
        #if it == total:
        #    print "END"
        #it+=1
        
    log_table = {'date':date, 'timestamp':time_stamp, 'thread':thread,\
                 'type':tipe, 'request_id':token, 'message':message}
    return log_table

In [6]:
def dframer(fin):
    """
    simple function to read log's csv and return DF
    """
    dataf = pd.read_csv(fin,index_col=None)
    return dataf

In [7]:
def csver(fin,fout):
    """
    Transform a log (dictionary) in .csv
    
    Recive a table (dict) where each key is a log's column
           a string of the filepath output and file name
    Return None
    Produce a structured .csv file of a stoRM log file
    """
    log_table = (log_tabler(log_reader(fin)))
    dataf = pd.DataFrame.from_dict(log_table)
    #P: find out columns order
    #print dataf.columns.tolist()
    
    #P: riarrange columns order
    cols =['date', 'timestamp', 'type','thread', 'request_id','message']
    dataf = dataf[cols]
    
    #print dataf.describe()
    dataf.to_csv(fout + '.csv', index=False)

In [8]:
def csver_small(fin,fout,start, end):
    """
    Transform a log slice (dctionary) in .csv
    
    Recive a table (dict) where each key is a log's column
           a string of the filepath output and file name
    Return None
    Produce a structured .csv file of a stoRM log file
    """
    log_table = (log_tabler(log_reader_small(fin,start,end)))
    dataf = pd.DataFrame.from_dict(log_table)
    #P: find out columns order
    #print dataf.columns.tolist()
    
    #P: riarrange columns order
    cols =['date', 'timestamp', 'type','thread', 'request_id','message']
    dataf = dataf[cols]
    
    #print dataf.describe()
    dataf.to_csv(fout + '.csv', index=False)

In [9]:
def msger(fin,fout):
    """
    Transform a log's dictionary in msgpack 
    
    Recive a table (dict) where each key is a log's column
           a string of the filepath output and file name
    Return None
    Produce a msgpack file of a stoRM log file
    """
    log_table = (log_tabler(log_reader(fin)))
    dataf = pd.DataFrame.from_dict(log_table)
    cols = ['date', 'timestamp', 'type','thread', 'request_id','message']
    dataf = dataf[cols]
    dataf.to_msgpack(fout + '.msg')

In [10]:
def msger_small(fin,fout,start,end):
    """
    Transform a log's slice dictionary in msgpack 
    
    Recive a table (dict) where each key is a log's column
           a string of the filepath output and file name
    Return None
    Produce a msgpack file of a stoRM log file
    """
    log_table = (log_tabler(log_reader_small(fin,start,end)))
    dataf = pd.DataFrame.from_dict(log_table)
    cols = ['date', 'timestamp', 'type','thread', 'request_id','message']
    dataf = dataf[cols]
    dataf.to_msgpack(fout + '_small' + '.msg')

In [11]:
def csv_kibana_parser(fin, fout):
    """
    re-parse csv logs parsed by kibana adding timestamp (seconds) and renaming columns
    """
    storm_df = pd.read_csv(fin)
    lista_timest = map(dateparser.kibana_dtpars,list(storm_df['@timestamp'].values))
    storm_df['timestamp'] = pd.Series(lista_timest).values
    storm_df = storm_df.rename(index=str, columns={'@timestamp':'date ISO-8601','thread':'thread','status':'type','token':'request_id','text':'message'})
    cols = ['date ISO-8601', 'timestamp', 'type','thread', 'request_id','message']
    storm_df = storm_df[cols]
    storm_df.to_csv(fout + '.csv', index=False)

In [12]:
def funzb(line):
    """
    author: Luca Giommi
    """
    startup=''
    ResponseHandler=''
    request=re.search(r'((?<=request \')|(?<=Request \')|(?<=REQUEST \')|(?<=Request: )|(?<=process_request : )).*?(?=\'|\.| from)', line, re.M|re.I)
    DN = re.search(r'(((?<=Client DN=\').*?(?=\'))|((?<=Client DN: ).*?(?= surl\(s\)))|((?<=Client DN: ).*?(?=\.)))', line, re.M|re.I)
    ip = re.search(r'(::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?)', line, re.M|re.I)
    token = re.search(r'(((?<=# Requested token \').*?(?=\'))|((?<=# Produced request token: \').*?(?=\'))|(((?<=token: )|(?<=token description: )).*))', line, re.M|re.I)
    num = re.search(r'((?<=\' on \').*?(?=\')|(?<=# Requested \').*?(?=\'))', line, re.M|re.I)
    surl = re.search(r'(((?<=SURL\(s\): \').*?((?=\')|(.*)))|((?<=surl\(s\): ).*?(?= token))|((?<=surl\(s\): ).*))', line, re.M|re.I)
    result = re.search(r'(((?<=is \').*?(?=\'))|((?<=__process_file_request\<\> : ).*))', line, re.M|re.I)
    
    if re.search(r'((logConfiguration : )|(initSoap : )|(main : ))', line, re.M|re.I):
        startup=line
    if re.search(r'(rpcResponseHandler_AbortFiles)', line, re.M|re.I):
        ResponseHandler=line
    
    result={'Request':request,'DN':DN,'requested_token':token,'num':num,'surl':surl,'result':result, 'ip':ip, 'startup':startup}
    for key, value in result.iteritems():
        if key=='startup':
            result[key]=startup
            continue
        if key=='result' and ResponseHandler != '':
            result[key]=ResponseHandler
            continue
        if value:
            result[key]=value.group()
            if key=='surl':
                result[key]=result[key].replace("\'","")
                result[key]=result[key].split()
        else:
            result[key]=''
        
    if len(result['surl'])>4:
        result['surl'][3]=result['surl'][4]
        result['surl']=result['surl'][:4]
    return result

In [13]:
def message_parser(storm_df):
    """
    Recive stoRM dataframe with message NOT parsed ad use funzb to return a new DF with parsed message.
    Takes about 15 min to parse 4mln lines log. 
    
    """
    inizio = {'DN': '', 'Request': '', 'ip': '', 'num': '', 'requested_token': '', 'result': '', 'startup': '', 'surl': ''}
    lista = list(storm_df['message'])
    listaa = map(funzb,lista)
    finale={}
    for key in inizio.iterkeys():
        finale[key]=list(finale[key] for finale in listaa)
    finale = pd.DataFrame.from_dict(finale)
    finalone= pd.concat([storm_df, finale], axis=1)
    return finalone

#### ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [17]:
%time DF = dframer('stoRM_FE_kibana.csv')

CPU times: user 22.6 s, sys: 1.3 s, total: 23.9 s
Wall time: 24.9 s


In [18]:
list(DF)

['date ISO-8601', 'timestamp', 'type', 'thread', 'request_id', 'message']

In [19]:
len(DF)

4585705

In [20]:
message_parser(DF).to_csv('stoRM_FE_kibana_parsed'+ '.csv', index=False)

In [13]:
list(DF)

['date ISO-8601', 'timestamp', 'type', 'thread', 'request_id', 'message']

-----------------------------------------------------------------------------------------------------------

-----------------------------------------------------------------------------------------------------------

-----------------------------------------------------------------------------------------------------------

In [25]:
inizio={'DN': '', 'Request': '', 'ip': '', 'num': '', 'requested_token': '', 'result': '', 'startup': '', 'surl': ''}

In [20]:
%time listaa = map(funzb,lista)

CPU times: user 15min 53s, sys: 5.52 s, total: 15min 59s
Wall time: 16min 3s


Interessante, utilizzando map il tempo che ci mette il sistema ad applicare la funzb a tutti i messaggi è di 16min per 4585705 righe di log. Sfruttando quindi map potremmo riuscire ad accellerare e velocizzare il parsing di tutti questi message. Rimane da capire come una volta che abbiamo i mille dizionari possiamo riunirli al DF iniziale.

In [57]:
def message_parser_compressed(storm_df):
    """
    
    """
    inizio = {'DN': '', 'Request': '', 'ip': '', 'num': '', 'requested_token': '', 'result': '', 'startup': '', 'surl': ''}
    finale={}
    for key in inizio.iterkeys():
        finale[key]=list(finale[key] for finale in map(funzb,list(storm_df['message'])))
    return pd.concat([storm_df, pd.DataFrame.from_dict(finale)], axis=1)

In [22]:
%reset

Once deleted, variables cannot be recovered. Proceed (y/[n])? y
