In [8]:
import findspark
findspark.init()

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [10]:
import pandas as pd

In [11]:
import re

In [12]:
from pyspark.sql.types import IntegerType, StringType, StructType, StructField


In [13]:
from collections import OrderedDict


In [14]:
spark = SparkSession.builder\
        .appName("LogMining")\
        .master("local")\
        .config("spark.executor.cores","6")\
        .config("spark.executor.memory","2g")\
        .getOrCreate()

In [15]:
#StructField("EventId", StringType(), True),
#StructField("EventTemplate", StringType(), True)
# file = open(,"r")

In [161]:
from pyspark.sql import Row


In [162]:
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) (\S+) (\S+) (.*)'


In [163]:
logline = "081109 203518 35 INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_200811092030_0001/job.jar. blk_-1608999687919862906"

In [164]:
type(logline)

str

In [165]:
match = re.findall(APACHE_ACCESS_LOG_PATTERN, logline)


In [166]:
match[0]

('081109',
 '203518',
 '35',
 'INFO',
 'dfs.FSNamesystem:',
 'BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_200811092030_0001/job.jar. blk_-1608999687919862906')

In [167]:
def parse_hdfs_log_line(logline):
    match = re.findall(APACHE_ACCESS_LOG_PATTERN, logline)
    if match is None:
        raise Error("Invalid logline: %s" % logline)
    return Row(
        Date =  match[0][0],
        Time    = match[0][1],
        Pid =   match[0][2],
        Level =  match[0][3],
        Component        = match[0][4],
        Content      = match[0][5])

In [168]:
abc = parse_hdfs_log_line(logline)

In [169]:
s = "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.73.220:50010 is added to blk_7128370237687728475 size 67108864"
s = re.sub("[\d.-]+", "<*>", s)
print(s)

BLOCK* NameSystem<*>addStoredBlock: blockMap updated: <*>:<*> is added to blk_<*> size <*>


In [170]:
# fo = open("abc.txt", "r")
# file_line = fo.readlines()


In [171]:
# file_line

In [172]:
# file_line[1]

In [173]:
customSchema = StructType([
    StructField("Date", IntegerType(), True),
    StructField("Time", IntegerType(), True),
    StructField("Pid", IntegerType(), True),
    StructField("Pid", IntegerType(), True),
    StructField("Component", StringType(), True),
    StructField("Content", StringType(), True),
#     StructField("EventId", StringType(), True),
#     StructField("EventTemplate", StringType(), True)
    
])

In [174]:
import hashlib

In [175]:
def parse_hdfs_file(file):
    ab = []
    file_line = file.readlines()
    for line in range(len(file_line)):
        match = re.findall(APACHE_ACCESS_LOG_PATTERN, file_line[line])
        for mat in range(len(match)):
            bd = re.sub("[\d.-]+", "<*>", match[mat][5])
            eventid = hashlib.md5(' '.join(bd).encode('utf-8')).hexdigest()[0:8]
            ab.append(Row(Date =  match[mat][0],Time = match[mat][1],Pid =match[mat][2],Level =  match[mat][3],Component   = match[mat][4],Content  = match[mat][5], EventTemplate=bd, EventId =eventid ))
    return ab 
    

In [176]:
fo = open("abc.txt", "r")


In [177]:
orkuh = parse_hdfs_file(fo)

In [178]:
orkuh

[Row(Component='dfs.DataNode$DataXceiver:', Content='Receiving block blk_-1608999687919862906 src: /10.250.19.102:54106 dest: /10.250.19.102:50010', Date='081109', EventId='2ea739e7', EventTemplate='Receiving block blk_<*> src: /<*>:<*> dest: /<*>:<*>', Level='INFO', Pid='143', Time='203518'),
 Row(Component='dfs.FSNamesystem:', Content='BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_200811092030_0001/job.jar. blk_-1608999687919862906', Date='081109', EventId='8b9dee11', EventTemplate='BLOCK* NameSystem<*>allocateBlock: /mnt/hadoop/mapred/system/job_<*>_<*>/job<*>jar<*> blk_<*>', Level='INFO', Pid='35', Time='203518'),
 Row(Component='dfs.DataNode$DataXceiver:', Content='Receiving block blk_-1608999687919862906 src: /10.250.10.6:40524 dest: /10.250.10.6:50010', Date='081109', EventId='2ea739e7', EventTemplate='Receiving block blk_<*> src: /<*>:<*> dest: /<*>:<*>', Level='INFO', Pid='143', Time='203519'),
 Row(Component='dfs.DataNode$DataXceiver:', Content='Receiving blo

In [179]:
orkuh[1][1]

'BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_200811092030_0001/job.jar. blk_-1608999687919862906'

In [180]:
# output = []
# for x in orkuh[]:
#     if x not in output:
#         output.append(x)
# print (output)

In [181]:
customSchema = StructType([
    StructField("Date", StringType(), True),
    StructField("Time", StringType(), True),
    StructField("Pid", StringType(), True),
    StructField("Pid", StringType(), True),
    StructField("Component", StringType(), True),
    StructField("Content", StringType(), True),
    StructField("EventTemplate", StringType(), True),
    StructField("EventId", StringType(), True)
    
])

In [182]:
df_train = spark.createDataFrame(orkuh,schema = customSchema)

In [183]:
df_train.show(5)

+------+------+---+---+--------------------+--------------------+--------------------+--------+
|  Date|  Time|Pid|Pid|           Component|             Content|       EventTemplate| EventId|
+------+------+---+---+--------------------+--------------------+--------------------+--------+
|081109|203518|143|143|dfs.DataNode$Data...|Receiving block b...|Receiving block b...|2ea739e7|
|081109|203518| 35| 35|   dfs.FSNamesystem:|BLOCK* NameSystem...|BLOCK* NameSystem...|8b9dee11|
|081109|203519|143|143|dfs.DataNode$Data...|Receiving block b...|Receiving block b...|2ea739e7|
|081109|203519|145|145|dfs.DataNode$Data...|Receiving block b...|Receiving block b...|2ea739e7|
|081109|203519|145|145|dfs.DataNode$Pack...|PacketResponder 1...|PacketResponder <...|fd96e398|
+------+------+---+---+--------------------+--------------------+--------------------+--------+
only showing top 5 rows



In [184]:
df_train.toPandas()['EventId'].unique()

array(['2ea739e7', '8b9dee11', 'fd96e398', 'a7fb7c2e', '1defe93d',
       '66a23a40', '1cb650a2', '6c9db9d3', '0193e112', '6bcaef6a',
       '62adf7b3', '0ec49ccc', '159ab19e'], dtype=object)

In [185]:
# def append_event(read_df):
    
#     for i in len(df_train):
#         orkuch[i]['EventTemplate'] = re.sub("[\d.-]+", "<*>", orkuh[i][1])
        
        

In [186]:
# s = "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.73.220:50010 is added to blk_7128370237687728475 size 67108864"
# s = re.sub("[\d.-]+", "<*>", s)
# print(s)

In [187]:
# content = "Receiving block blk_-1608999687919862906 src: /10.250.19.102:54106 dest: /10.250.19.102:50010"


In [188]:
#  def generate_logformat_regex(logformat):
#     ''' 
#     Function to generate regular expression to split log messages
#     '''
#     headers = []
#     splitters = re.split(r'(<[^<>]+>)', logformat)
#     regex = ''
#     for k in range(len(splitters)):
#         if k % 2 == 0:
#             splitter = re.sub(' +', '\s+', splitters[k])
#             regex += splitter
#         else:
#             header = splitters[k].strip('<').strip('>')
#             regex += '(?P<%s>.*?)' % header
#             headers.append(header)
#     regex = re.compile('^' + regex + '$')
#     return headers, regex

In [189]:
# names = df_train.schema.Content


In [190]:
# rex = r'blk_-?\d+', r'(\d+\.){3}\d+(:\d+)?' 
# lil = re.sub(rex,'<*>', content)


In [191]:
# generate_logformat_regex(content)

In [192]:
 def read_file(data):
    #struct_log = spark.read.format("csv").option("header", "true").load(file)
    data_dict = OrderedDict()
    for row in data.collect():  
        blkId_list = re.findall(r'(blk_-?\d+)', row['Content'])
        blkId_set = set(blkId_list)
        for blk_Id in blkId_set:
            if not blk_Id in data_dict:
                data_dict[blk_Id] = []
            data_dict[blk_Id].append(row['EventId'])
    data_df = spark.createDataFrame(list(data_dict.items()), schema=['BlockId', 'EventSequence'])
    
    return data_df

LineId,Date,Time,Pid,Level,Component,Content,EventId,EventTemplate


In [193]:
df_train_append = read_file(df_train)

In [194]:
df_train_append.show(10)

+--------------------+--------------------+
|             BlockId|       EventSequence|
+--------------------+--------------------+
|blk_-160899968791...|[2ea739e7, 8b9dee...|
|blk_7503483334202...|[2ea739e7, 2ea739...|
|blk_-354458337728...|[2ea739e7, 62adf7...|
|blk_-907399258668...|[2ea739e7, 159ab1...|
|blk_7854771516489...|[2ea739e7, 2ea739e7]|
+--------------------+--------------------+



In [None]:
# abc_log = spark.read.format("csv").option("header", "false").schema(customSchema).load("log.csv")


In [None]:
# abc_log.show(5)

In [None]:
# ab = read_file("log.csv")

In [None]:
# type(ab)

In [None]:
# ab.head(10)

In [None]:
schema 

In [None]:
ab = read_file("log.csv")

In [None]:
type(ab)

In [None]:
ab.show(10)

In [None]:
label_csv = spark.read.format("csv").option("header", "true").load("anomaly_label.csv")


In [None]:
label_csv.show(10)

In [None]:
# label_data = label_csv.set_index('BlockId')
# label_dict = label_data['Label'].to_dict()


In [None]:
# ab['Label'] = ab[ab['BlockId']].apply(lambda x: 1 if x == 'Anomaly' else 0)


In [None]:
# (x_train, y_train), (x_test, y_test) = _split_data(data_df['EventSequence'].values, data_df['Label'].values, train_ratio, split_type)

In [None]:
# label_data = pd.read_csv("anomaly_label.csv", engine='c', na_filter=False, memory_map=True)
# label_data = label_data.set_index('BlockId')
# label_dict = label_data['Label'].to_dict()
# ab.select[ab['BlockId'].rdd.flatmap(lambda x: 1 if label_dict[x] == 'Anomaly' else 0).toDF()
# # df.select("_c0").rdd.flatMap(lambda x: x + ("anything", )).toDF()


# # # Split trgiain and test data
# # (x_train, y_train), (x_test, y_test) = _split_data(data_df['EventSequence'].values, data_df['Label'].values, train_ratio, split_type)

In [None]:
train_x, test_x = ab.randomSplit([0.8, 0.2], seed=12345)

In [None]:
train_y, test_y = label_csv.randomSplit([0.8, 0.2], seed=12345)

In [None]:
train_x.show(10)

In [None]:
# from pyspark.mllib.feature import HashingTF
# from pyspark.mllib.feature import IDF

In [None]:
# hashingTF = HashingTF(100000)
# tf = hashingTF.transform(documents)

In [None]:
# tf.cache()
# idf = IDF(minDocFreq=1).fit(tf)

In [None]:
# from pyspark.ml import Pipeline
# from pyspark.ml.classification import DecisionTreeClassifier
# from pyspark.ml.feature import StringIndexer, VectorIndexer
# from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# from pyspark.mllib.util import MLUtils

In [None]:
# labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# # Automatically identify categorical features, and index them.
# # We specify maxCategories so features with > 4 distinct values are treated as continuous.
# featureIndexer =\
#     VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# # Split the data into training and test sets (30% held out for testing)
# (trainingData, testData) = data.randomSplit([0.7, 0.3])

# # Train a DecisionTree model.
# dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# # Chain indexers and tree in a Pipeline
# pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# # Train model.  This also runs the indexers.
# model = pipeline.fit(trainingData)

# # Make predictions.
# predictions = model.transform(testData)

# # Select example rows to display.
# predictions.select("prediction", "indexedLabel", "features").show(5)

# # Select (prediction, true label) and compute test error
# evaluator = MulticlassClassificationEvaluator(
#     labelCol="indexedLabel", predictionCol="prediction", metricName="precision")
# accuracy = evaluator.evaluate(predictions)
# print "Test Error = %g" % (1.0 - accuracy)

# treeModel = model.stages[2]