In [30]:
#--------------------------------------------------------------------------
# Copyright (c) 2022 USAF ACC/A29 Intel Data/Tech
# Futures Division
# All Rights Reserved.
#
# Dissemination of this information and/or reproduction
# and modification are restricted to other  government
# organizations.  Commercial use is strictly forbidden
# unless prior written permission is obtained from ACC/A29
# Intel Data/Tech Futures Division.
#
#--------------------------------------------------------------------------
__author__ = 'Andres Davila'
__version__ = '0.1'
__email__ = 'andres.davila-corujo@us.af.mil'

import copy
import uuid
import tqdm
import pandas as pd

from datetime import datetime, timedelta
from elasticsearch7 import Elasticsearch, helpers


data = pd.DataFrame()
es = Elasticsearch(["http://localhost:9200"],
                   http_auth=('elastic', 'p3DzCcQvxt5Mg8TaEE61'), timeout=6000)


def read_data(file_path : str, delimiter : str) -> pd.DataFrame:
    """
    Read data from csv file
    """

    data = pd.read_csv(file_path, delimiter=delimiter)

    return data

def get_indices(es):
    return [index for index in list(es.indices.get_alias("*").keys()) if not index.startswith('.')]

def get_index_features(es, index_name):
    """Get features in an index

    Args:
        es (Elasticsearch), Elasticsearch object
        index_name (str), index name

    Return:
        a list of str
    """
    index_mapping = es.indices.get_mapping(index = index_name)
    features = index_mapping[index_name]['mappings']['properties']
    features_copy = copy.deepcopy(features)

    for feature in features:
        if 'properties' in features[feature]:
            for key in features[feature]['properties']:
                features_copy[feature+'.'+key] =  features[feature]['properties'][key]
            del features_copy[feature]

    return list(features_copy.keys())



def filter_keys(document, common): 
    return {key: document[key] for key in common}


def doc_generator(df, index, common):
    df_iter = df.iterrows()
    for _, document in df_iter:
        yield {
            '_index' : index,
            '_type'  : '_doc',
            '_id'    : f"{uuid.uuid4()}",
            "_source": filter_keys(document, common)
        }

    raise StopIteration


def add_data(hour, file_path, index_name):

    data = read_data(file_path, '\t')

    data['@timestamp'] = data['@timestamp'].apply(lambda x: (datetime.now() - timedelta(hours=hour)).strftime("%Y-%m-%dT%H:%M:%S.000Z"))

    # index_name = get_indices(es)

    # if len(index_name) < 0:
    #     # create index
    #     index_name = 'filebeat'


    feature_names = data.columns.to_list()

    actions = list()
    data = data.fillna('')
    # data = data[:1]
    index = 0
    while True:
        line = data.iloc[index].to_dict()
        # record = json.dumps(line)
        record = line
        record_ingest = {}

        for features_name in feature_names:
            if features_name not in record:
                record_ingest[features_name] = None
            else:
                record_ingest[features_name] = record[str(features_name)]

        action = {'_index' : index_name, '_source': record_ingest}
        actions.append(action)

        index += 1

        if(index%1000 == 0 and index > 0):
            helpers.bulk(es, actions)
            actions = list()


        if index == len(data):
            break

    if len(actions) > 0:
        helpers.bulk(es, actions)

if __name__ == '__main__':
    for _ in tqdm.tqdm(range(10)):
        hours = range(-1, 1)

        for hour in hours:
            add_data(hour, file_path='./data/zeek_AD.csv', index_name='zeek_test')

100%|██████████| 2/2 [00:04<00:00,  2.38s/it]
100%|██████████| 2/2 [00:04<00:00,  2.36s/it]
100%|██████████| 2/2 [00:04<00:00,  2.42s/it]
100%|██████████| 2/2 [00:04<00:00,  2.37s/it]
100%|██████████| 2/2 [00:04<00:00,  2.36s/it]
100%|██████████| 2/2 [00:04<00:00,  2.37s/it]
100%|██████████| 2/2 [00:04<00:00,  2.36s/it]
100%|██████████| 2/2 [00:04<00:00,  2.35s/it]
100%|██████████| 2/2 [00:04<00:00,  2.34s/it]
100%|██████████| 2/2 [00:04<00:00,  2.34s/it]


In [26]:
import sys
import json
from pprint import pprint
from elasticsearch7 import Elasticsearch, helpers


data = pd.DataFrame()
es = Elasticsearch(["http://localhost:9200"],
                   http_auth=('elastic', 'p3DzCcQvxt5Mg8TaEE61'), timeout=6000)

MyFile= open("./data/dns.log",'r').read()
ClearData = MyFile.splitlines(True)
i=0
json_str=""
docs ={}
index = 'dns_test'

for line in ClearData:
    line = ''.join(line.split())
    es.index(index=index, doc_type='Blog', id=i, body=line)


  es.index(index=index, doc_type='Blog', id=i, body=line)
