# Sample Datasets

In [1]:
import os, requests, zipfile, json, time, xmltodict, datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, parallel_bulk, streaming_bulk
import sys, logging, ndjson
import numpy as np
import pandas as pd
import yaml
from yaml import Loader
from copy import deepcopy
import warnings
from IPython.display import JSON
warnings.filterwarnings("ignore")

In [2]:
container_host = 'localhost'
elastic_user = 'elastic'
elastic_password = 'elastic_password'
es1_url = f'https://{container_host}:9200'

In [3]:
def get_es_instance():
    es = Elasticsearch(es1_url, 
                       basic_auth=(elastic_user, elastic_password), 
                       verify_certs=False)
    return es

In [4]:
es = get_es_instance()

# Data Files Load

## JSON Data

In [1]:
data_path = './elasticsearch/data'

In [1]:
data_files = ['recipes.json', 'movies.json', 'chicago-taxi-data.csv', 'web.log', 'firewall.log', 
              'webapp.csv", webapp-tagged.csv', 'trips.csv', 'webapp-password-spraying-attemps.csv',
              'webapp-new-data.csv'
             ]

In [7]:
for file in data_files:
    if file.endswith('json'):
        print(f'loading {file}')
        with open(os.path.join(data_path, file), 'r') as f:
            file_errors = 0
            data = ndjson.loads(f.read())
            index_name = file.split('.')[0]
            for item in data:
                if '_id' in item:
                    item.pop('_id')
                try:
                    es.index(index=index_name, pipeline=None, document=item)
                except:
                    file_errors += 1
            print(f'Ingest errors: {file_errors}')

loading recipes.json
Ingest errors: 0
loading movies.json
Ingest errors: 34307


# CSV Files

In [58]:
csv_files = [x for x in data_files if x.split('.')[-1] == 'csv']

In [59]:
csv_files

['chicago-taxi-data.csv',
 'webapp.csv", webapp-tagged.csv',
 'trips.csv',
 'webapp-password-spraying-attemps.csvwebapp-new-data.csv']

In [60]:
csv_files = [
    'chicago-taxi-data.csv',
    'webapp.csv', 
    'webapp-tagged.csv',
    'webapp-password-spraying-attemps.csv',
    'webapp-new-data.csv',
    'trips.csv'
]

In [46]:
taxis = pd.read_csv(os.path.join(data_path, 'chicago-taxi-data.csv')).T.to_dict()
for record in taxis:
    es.index(index='chicago_taxis', document=taxis[record])

In [61]:
webaapp_files = [x for x in csv_files if x.startswith('webapp')]

In [62]:
webaapp_files

['webapp.csv',
 'webapp-tagged.csv',
 'webapp-password-spraying-attemps.csv',
 'webapp-new-data.csv']

In [63]:
webapp = pd.read_csv(os.path.join(data_path, webaapp_files[0])).T.to_dict()
for record in webapp:
    es.index(index='webapp', document=webapp[record])

In [64]:
webapp = pd.read_csv(os.path.join(data_path, webaapp_files[1])).T.to_dict()
for record in webapp:
    es.index(index='webapp-tagged', document=webapp[record])

In [65]:
webapp = pd.read_csv(os.path.join(data_path, webaapp_files[2])).T.to_dict()
for record in webapp:
    es.index(index='webapp-password-spraying-attempts', document=webapp[record])

In [66]:
webapp = pd.read_csv(os.path.join(data_path, webaapp_files[3])).T.to_dict()
for record in webapp:
    es.index(index='webapp-new-data', document=webapp[record])

In [67]:
trips = pd.read_csv(os.path.join(data_path, 'trips.csv')).T.to_dict()
for record in trips:
    es.index(index='trips', document=trips[record])

# Log Files

In [70]:
with open(os.path.join(data_path, 'web.log'), 'r') as f:
    data = f.readlines()

In [74]:
for line in data:
    es.index(index='raw_web_log', document={'message':line})

In [75]:
with open(os.path.join(data_path, 'firewall.log'), 'r') as f:
    data = f.readlines()

In [78]:
for line in data:
    es.index(index='firewall_log', document=json.loads(line))

# NVD Load

In [10]:
os.mkdir('demo')
os.mkdir('demo/data')
os.mkdir('demo/data/db')

In [5]:
from NVD_Loader import NVDLoader

In [6]:
nvd_loader = NVDLoader()

In [6]:
nvd_loader.download_files(nvd_loader.historic_database, output_path=os.path.join(os.curdir, 'demo', 'data'))

In [7]:
nvd_loader.extract_archives(data_path='./demo/data/', verbose=False)

In [9]:
# Actual Document Count
file_list = sorted(os.listdir(os.path.join(os.curdir, 'demo', 'data','db')))
total_documents_to_be_ingested = nvd_loader.document_total_for_directory(file_list, data_path=os.path.join(os.curdir, 'demo', 'data', 'db'), verbose=False)
print(total_documents_to_be_ingested)

{'total_read_documents': 187216, 'unique_cve_ids': 187216}


In [10]:
# Create dataset with singleton (slow)
nvd_loader.ingest_bulk_json_dataset(file_list, target_index='nvd', data_path=os.path.join(os.curdir, 'demo', 'data', 'db'), verbose=True, ingest_method='singleton')

round: 1: Now ingesting nvdcve-1.1-2002.json from ./demo/data/db to nvd
round: 2: Now ingesting nvdcve-1.1-2003.json from ./demo/data/db to nvd
The ingest process is %9.52 complete
round: 3: Now ingesting nvdcve-1.1-2004.json from ./demo/data/db to nvd
round: 4: Now ingesting nvdcve-1.1-2005.json from ./demo/data/db to nvd
The ingest process is %19.05 complete
round: 5: Now ingesting nvdcve-1.1-2006.json from ./demo/data/db to nvd
round: 6: Now ingesting nvdcve-1.1-2007.json from ./demo/data/db to nvd
The ingest process is %28.57 complete
round: 7: Now ingesting nvdcve-1.1-2008.json from ./demo/data/db to nvd
round: 8: Now ingesting nvdcve-1.1-2009.json from ./demo/data/db to nvd
The ingest process is %38.1 complete
round: 9: Now ingesting nvdcve-1.1-2010.json from ./demo/data/db to nvd
round: 10: Now ingesting nvdcve-1.1-2011.json from ./demo/data/db to nvd
The ingest process is %47.62 complete
round: 11: Now ingesting nvdcve-1.1-2012.json from ./demo/data/db to nvd
round: 12: Now ing

'187216 documents sent to elasticsearch'

In [11]:
# Create and Populate the nvd_recent index
nvd_loader.create_nvd_recent_index()

round: 1: Now ingesting nvdcve-1.1-recent.json from demo/data/db to nvd_recent
Now removing nvdcve-1.1-recent.json.zip from demo/data


True

In [12]:
# Load the CPE Match Data Set
nvd_loader.create_cpe_match_index(target_index='cpe_match')

Now removing nvdcpematch-1.0.json.zip from ./demo/data


In [14]:
#Create the CPE Dictionary Dataset
nvd_loader.download_files(nvd_loader.cpe_dictionary_feed, output_path=os.path.join(os.curdir, 'demo', 'data','dictionary'))
nvd_loader.extract_archives(data_path=os.path.join(os.curdir, 'demo', 'data','dictionary'), verbose=False)
target_file = [x for x in os.listdir(os.path.join(os.curdir, 'demo', 'data','db')) if x.endswith('.xml')][0]
nvd_loader.load_cpe_dictionary(target_file=os.path.join(os.curdir, 'demo', 'data','db', target_file), target_index='cpe_dictionary', ingest_method='bulk')

{'health': 'yellow',
 'status': 'open',
 'index': 'cpe_dictionary',
 'uuid': 'Cr2VALr2S-qSNLcYF3f4Kw',
 'pri': '1',
 'rep': '1',
 'docs.count': '620856',
 'docs.deleted': '0',
 'store.size': '118.3mb',
 'pri.store.size': '118.3mb'}

In [12]:
# Load the CCE Data Set
#nvd_loader.download_files(nvd_loader.cce_database_dictionary, output_path=os.path.join(os.curdir, 'demo', 'data', 'cce'), verbose=False)
files = os.listdir(os.path.join(os.curdir, 'demo', 'data', 'cce'))
nvd_loader.load_cce_data(file_list=files, data_path=os.path.join(os.curdir, 'demo', 'data', 'cce'), verbose=False)

In [13]:
# update dataset
nvd_loader.update_cve_data(nvd_loader.two_hour_stream_feeds)

round: 1: Now ingesting nvdcve-1.1-modified.json from demo/data/db to nvd
Now removing nvdcve-1.1-modified.json.zip from demo/data


'2147 documents sent to elasticsearch'

In [14]:
# Test Aggregations
# Perform some Aggregations
# https://qualysguard.qg2.apps.qualys.com/qwebhelp/fo_portal/setup/cvss_vector_strings.htm
# Pipeline 1 (High Priority Network Attacks)
remote_root_cves_requiring_network_signatures = {
    "bool": {
      "must":[
        {"match_phrase": {"impact.baseMetricV3.cvssV3.vectorString": "/AV:N/"}},
        {"match_phrase": {"impact.baseMetricV3.cvssV3.vectorString": "/PR:N/"}},
        {"match_phrase": {"impact.baseMetricV3.cvssV3.vectorString": "/UI:N/"}},
        {"match_phrase": {"impact.baseMetricV3.cvssV3.vectorString": "/I:H/"}},
        {"match_phrase": {"impact.baseMetricV2.obtainAllPrivilege": "true"}}
      ],
      "filter": [
        {"range": {"lastModifiedDate": {"gte": "2008-01-01"}}}
        ]
    }
}
high_priority_signature_targets = nvd_loader.client.search(index='nvd', query=remote_root_cves_requiring_network_signatures)

# Elastic Common Schema Generator Templates Load

## Host Records

In [5]:
from elastic_common_schema_generator import ECSRecords

In [6]:
record_generator = ECSRecords()

In [7]:
specification = record_generator.pull_schema()

In [8]:
processor_list, code_start = record_generator.generate_processor_list(schema=specification, object_type='host')

In [9]:
demo_data_set = record_generator.create_records({'document_type':'demo_dataset'}, ['host', 'cve'], count=1000)

In [10]:
for document in demo_data_set:
    document['@timestamp'] = datetime.datetime.now()
    es.index(document=document, index='demo_machines')

In [12]:
es.search(size=1, index='demo_machines').raw

{'took': 1,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 1000, 'relation': 'eq'},
  'max_score': 1.0,
  'hits': [{'_index': 'demo_machines',
    '_id': 'Hof254ABnDNUNBHfihCf',
    '_score': 1.0,
    '_source': {'document_type': 'demo_dataset',
     'host.architecture': 'arm64',
     'host.boot.id': '2d5da9c6-a6f5-4318-819f-69853dcbe11f',
     'host.cpu.usage': '',
     'host.disk.read.bytes': '',
     'host.disk.write.bytes': '',
     'host.domain': 'thefirm.remote',
     'host.geo.city_name': 'New York',
     'host.geo.continent_code': 'NA',
     'host.geo.continent_name': 'North America',
     'host.geo.country_iso_code': 'US',
     'host.geo.country_name': 'United States',
     'host.geo.location': {'lat': 40.75464371951375,
      'lon': -73.98552474566057},
     'host.geo.name': '',
     'host.geo.postal_code': '',
     'host.geo.region_iso_code': '',
     'host.geo.region_name': '',
     'host.geo.timezone'

# HR Data Generator Load

In [11]:
%%writefile hr_generator.py
import os, requests, zipfile, json, random, datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, parallel_bulk, streaming_bulk
from faker import Faker
import yaml
from yaml import Loader
from copy import deepcopy
import numpy as np


class HRGenerator:
    
    def __init__(self):
        self.container_host = 'localhost'
        self.elastic_user = 'elastic'
        self.elastic_password = 'elastic_playground'
        self.es1_url = f'https://{self.container_host}:9200'
        self.es = Elasticsearch(self.es1_url, 
                       basic_auth=(self.elastic_user, self.elastic_password), 
                       verify_certs=False)
        self.faker = Faker()
    
    def hr_data_generator(self, document_count):
        faker = self.faker
        documents = []
        for i in range(document_count):
            document = {}
            document['@timestamp'] = datetime.datetime.now().isoformat()
            document['first_name'] = faker.first_name()
            document['last_name'] = faker.last_name()
            document['middle_name'] = faker.random_element([faker.first_name(), faker.last_name()])
            document['telephone_number'] = faker.phone_number()
            document['email'] = faker.email()
            document['employee-id'] = faker.uuid4()
            document['bio'] = faker.text(max_nb_chars=200)
            document['ip_address'] = faker.ipv4_public()
            document['hired_date'] = faker.date()
            document['paid_amount'] = random.choice(range(10000,1000000))
            document['days_in_service'] = (datetime.datetime.now() - datetime.datetime.strptime(document['hired_date'], '%Y-%m-%d')).days
            document['skills'] = self.skills_list(faker)
            documents.append(document)
        return documents
    
    def skills_list(self, faker):
        path_list = [
            "technical_drawing",
            "programming",
            "photography",
            "linguistics",
            "psychology",
            "chemistry",
            "medical_research",
            "medic",
            "sports",
            "martial arts",
            "photographic memory",
            "empathic",
            "polygraph",
            "forensics",
            "land survey",
            "aeronautics",
            "pilot",
            "mechanic",
            "trader",
            "historian",
            "veterenarian",
            "agriculture",
        ]
        personal_list = faker.random_elements(path_list, unique=True, length=random.choice(range(1,4)))
        output = []
        for item in personal_list:
            doc = {'skill_name':item, 'skill_level':random.choice(range(100))}
            output.append(doc)
        return output

Overwriting hr_generator.py


In [5]:
from hr_generator import HRGenerator

In [6]:
hr_generator = HRGenerator()

In [7]:
data = hr_generator.hr_data_generator(document_count=1000)

In [9]:
for record in data:
    es.index(index='persons', document=record)

In [10]:
es.search(index='persons', size=1).raw

{'took': 1,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 1000, 'relation': 'eq'},
  'max_score': 1.0,
  'hits': [{'_index': 'persons',
    '_id': '6Yf-54ABnDNUNBHfWn_c',
    '_score': 1.0,
    '_source': {'@timestamp': '2022-05-21T14:59:55.805653',
     'first_name': 'Lori',
     'last_name': 'Walker',
     'middle_name': 'Roberta',
     'telephone_number': '610-609-0481x214',
     'email': 'ellispatrick@example.net',
     'employee-id': 'da414e4b-26b1-4e30-8476-3d2e27f3a78b',
     'bio': 'Leave her himself reflect. Western street drug store return future reflect bad. Current determine color control.',
     'ip_address': '166.213.219.36',
     'hired_date': '1971-09-28',
     'paid_amount': 732927,
     'days_in_service': 18498,
     'skills': [{'skill_name': 'linguistics', 'skill_level': 4}]}}]}}