In [7]:
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
import time
import pandas as pd
import sqlite3
from pathlib import Path
from sklearn.cluster import DBSCAN
import numpy as np
from numpy import unique
import matplotlib.pyplot as plt
import time
import _thread
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Disable warnings
import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter('ignore')

In [8]:
def percentage(part, whole):
    return part/whole

In [9]:
def cluster(html_file, connection_id, connection_count, f, MINIMUM_POINTS_IN_CLUSTER, MINIMUM_LIKELIHOOD):
    # helpful for debugging - just use a couple of records
    # if f.sip.unique()[0] != "192.168.30.152" or f.dip.unique()[0] != "72.52.217.73":
    #     continue

    should_write = False

    print(f"Evaluation #{connection_count} ID: {connection_id}: {len(f)} original records...", end="\r")
    if len(f) <= 4:  # need at least n delta records to make it interesting
        return

    html = f"<div id='{connection_id}' class='connection'>\n"
    html += f.to_html(border=0, classes="table table-striped table-sm").replace("<tr>", "<tr style='text-align: right'>")

    X = f.iloc[:, [1, 7]]  # X = [connection_id, delta time in milliseconds]

    # here we DBSCAN cluster over several different epsilon values calculated from time_spans
    # we picked arbitrary spans of time in minutes, but they have served us well in the results
    spans = [[0, 5], [2, 15], [15, 35], [30, 60]]

    span_count = 0
    for span in spans:
        span_count += 1
        print(f"Evaluating {connection_id} span {span}...", end="\r")
        eps = ((span[1] - span[0]) / 2) * .10  # this is our eps calculation: take a span's difference, and halve it, then multiply by .10

        t_time = time.time()
        # run DBSCAN clustering model...
        dbscan = DBSCAN(eps=eps, min_samples=MINIMUM_POINTS_IN_CLUSTER).fit(X)
        core_samples_mask = np.zeros_like(dbscan.labels_, dtype=bool)
        core_samples_mask[dbscan.core_sample_indices_] = True

        (unique, counts) = np.unique(dbscan.labels_, return_counts=True)
        frequencies = np.asarray((unique, counts)).T

        dbscan_cluster_data = pd.DataFrame()
        dbscan_cluster_data['delta'] = f["delta"]
        dbscan_cluster_data['cluster'] = dbscan.labels_
        dbscan_cluster_data = dbscan_cluster_data.groupby(["cluster"])["cluster"].count().reset_index(name="count")

        # calculate likelihood of each cluster
        likelihood = percentage(dbscan_cluster_data['count'], dbscan_cluster_data['count'].sum())
        # // TODO: need to expand upon likelihood calculation
        # could we add this number of items in the cluster vs the largest cluster overall?
        # vs the largest cluster in this time span?
        dbscan_cluster_data["likelihood"] = likelihood

        cols = [col for col in dbscan_cluster_data.columns if col == 'likelihood']
        dbscan_cluster_data.loc[dbscan_cluster_data['cluster'] == -1, cols] = 0

        for index, row in dbscan_cluster_data.iterrows():
            print(f"Iterating row {index}", end="\r")
            if row['cluster'] > -1:
                if row["likelihood"] > MINIMUM_LIKELIHOOD:
                    TOP_TARGETS.extend([[f["connection_id"].iloc[0], f["sip"].iloc[0], f["dip"].iloc[0], row["likelihood"]]])
                    TOP_TARGET_DIPS.extend([f["dip"].iloc[0]])
                    should_write = True

        print(f"Evaluated {connection_id} span {span} in {(time.time()-t_time)} seconds", end="\r")

        t_time = time.time()
        # get cluster points
        unique_labels = set(dbscan.labels_)
        for k in unique_labels:
            print(f"Capturing points in and out of cluster {k}", end="\r")

            class_member_mask = (dbscan.labels_ == k)

            # points in cluster
            xy = X[class_member_mask & core_samples_mask].to_numpy()
            if len(xy[:, 0]) > 0 or len(xy[:, 1]) > 0:
                plt.scatter(xy[:, 0] + span_count, xy[:, 1], label=f"dbscan cluster {eps}")

            # points not in cluster
            xy = X[class_member_mask & ~core_samples_mask].to_numpy()
            if len(xy[:, 0]) > 0 or len(xy[:, 1]) > 0:
                plt.plot(xy[:, 0] + span_count, xy[:, 1], 'x', label=f"not in dbscan cluster {eps}")

        print(f"Captured points in and out of cluster {k} in {(time.time()-t_time)} seconds", end="\r")

        html += f"<h3>span {span[0]}-{span[1]} minutes. EPS: {eps} Minimum Samples: {MINIMUM_POINTS_IN_CLUSTER}</h3>\n"
        html += "<div class='col-4'>\n"
        html += dbscan_cluster_data.to_html(border=0, classes="table table-striped table-sm").replace("<tr>", "<tr style='text-align: right'>")
        html += "</div>\n"

    plt.grid()
    # plot original values
    plt.scatter(X.iloc[:, 0], X.iloc[:, 1], c="green", label="original values")
    plt.title('Cluster evaluation for: %d' % connection_id)
    plt.legend(loc="best")
    # save plot
    plt.savefig(f"data/reports/i/{connection_id}-x.png")

    plt.clf()
    plt.cla()
    plt.close()

    # add plot to html
    html += f"<img src='i/{connection_id}-x.png'>\n"
    html += "<div class='topper'>\n<a href='#top_talkers'>Back to top</a>\n</div>\n"
    html += "</div>\n"

    if should_write:
        with open(html_file, "a") as fi:
            fi.write(html)


plt.switch_backend('Agg')

Path("data/chunks").mkdir(parents=True, exist_ok=True)
Path("data/deltas").mkdir(parents=True, exist_ok=True)
Path("data/in").mkdir(parents=True, exist_ok=True)
Path("data/reports/i").mkdir(parents=True, exist_ok=True)

es = Elasticsearch(['https://192.168.1.181:9200'], timeout=30, max_retries=10, retry_on_timeout=True,
ca_certs=False,verify_certs=False, http_auth=('jupyter','password'))

system_query_table_name = "query_since_ts"
sleep_time = 60

db = sqlite3.connect('app.db')
db.row_factory = sqlite3.Row
db.execute(f"create table if not exists {system_query_table_name} (id integer primary key autoincrement, data_source text, ts integer, created timestamp default current_timestamp)")
db.execute("create table if not exists deltas (id integer primary key autoincrement, connection_id integer, sip text, dip text, port integer, protocol text, datetime timestamp, delta float)")

# server info?
print(f"Connected to {es}: {es.info()}")

Connected to <Elasticsearch(['https://192.168.1.181:9200'])>: {'name': 'voodoo-onion', 'cluster_name': 'voodoo-onion', 'cluster_uuid': 'dP1VycCSTQ2EPeCVxQ25GA', 'version': {'number': '7.16.2', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '2b937c44140b6559905130a8650c64dbd0879cfb', 'build_date': '2021-12-18T19:42:46.604893745Z', 'build_snapshot': False, 'lucene_version': '8.10.1', 'minimum_wire_compatibility_version': '6.8.0', 'minimum_index_compatibility_version': '6.0.0-beta1'}, 'tagline': 'You Know, for Search'}


In [10]:
searchContext = Search(using=es, index='*:so-*', doc_type='doc')
index='*:so-*'

In [11]:
# get last date queried
query_since_ts = 0
cursor = db.execute(f"SELECT ts from {system_query_table_name} WHERE data_source='{index}' order by id desc limit 1")
for row in cursor:
    query_since_ts = row[0]
    break
print(f"Getting all records since {pd.to_datetime(query_since_ts, unit='s')} ({query_since_ts})")    

Getting all records since 1970-01-01 00:00:00 (0)


In [12]:
############################################################
############################################################
# get new records from elasticsearch
############################################################
############################################################

# get last date queried
query_since_ts = 0
cursor = db.execute(f"SELECT ts from {system_query_table_name} WHERE data_source='{index}' order by id desc limit 1")
for row in cursor:
    query_since_ts = row[0]
    break
    
    
# query elasticsearch
# '{"query":{"match_all":{}},"sort":[{"ts":{"order":"desc"}}]}'
# order by ts asc
body_filter = '{"query":{"bool":{"filter":[{"range":{@timestamp":{"gt":' + str(query_since_ts) + '}}}]}},"sort":[{"@timstamp":{"order":"asc"}}]}'
#s = searchContext.query('query_string', query='destination.port:*')
s = searchContext.query('query_string', query='network.protocol:*')
res = es.search(index=index, size=10000)
print(f"Received {res['hits']['total']['value']} records")
for hit in res['hits']['hits']:
    print(hit["_source"]["@timestamp"], end="\r")
response = s.execute()
df = pd.DataFrame(columns=['ts','source.ip','destination.ip','destination.port', 'network.protocol'])
if response.success():
    for d in s[:10000]:
        try:
            df = df.append({'ts' : d['@timestamp'], 'source.ip' : d['source']['ip'], 'destination.ip' : d['destination']['ip'], 'destination.port': d['destination']['port'], 'network.protocol' : d['network']['protocol']}, ignore_index=True)
        except KeyError:
            pass
display(df)

df.info()

src = 'source.ip'
dst = 'destination.ip'
prt = 'destination.port'

Received 10000 records
2021-12-30T14:25:22.929Z

Unnamed: 0,ts,source.ip,destination.ip,destination.port,network.protocol
0,2021-12-29T23:58:55.889Z,192.168.1.220,224.0.0.252,5355,dns
1,2021-12-29T23:59:00.463Z,192.168.1.220,224.0.0.252,5355,dns
2,2021-12-29T23:59:05.283Z,192.168.128.22,192.168.128.25,88,krb_tcp
3,2021-12-29T23:59:22.053Z,192.168.1.220,224.0.0.252,5355,dns
4,2021-12-29T23:59:26.848Z,192.168.1.220,224.0.0.252,5355,dns
...,...,...,...,...,...
9995,2021-12-30T09:00:10.832Z,192.168.1.154,52.167.249.196,443,ssl
9996,2021-12-30T09:00:10.800Z,192.168.128.25,13.107.222.240,53,dns
9997,2021-12-30T08:59:56.595Z,192.168.1.154,216.239.32.10,53,dns
9998,2021-12-30T08:59:46.760Z,192.168.1.154,65.55.44.109,443,ssl


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 5 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   ts                10000 non-null  object
 1   source.ip         10000 non-null  object
 2   destination.ip    10000 non-null  object
 3   destination.port  10000 non-null  object
 4   network.protocol  10000 non-null  object
dtypes: object(5)
memory usage: 390.8+ KB


In [18]:
############################################################
# filter records that do not cluster or that are irrelevant
############################################################
if src.startswith("192.168.28.25") or src.startswith("192.168.28.15") or dst.startswith("192.168.28.25") or dst.startswith("192.168.28.15"):  # exclude traffic to and from DCs
    is_valid_record = False
if src.startswith("10.") or dst.startswith("10."):  # filter internal traffic
    is_valid_record = False
if src.startswith("7.7.7.") and dst.startswith("7.7.7."):  # filter internal traffic
    is_valid_record = False
if src.startswith("192.168.") and (dst.startswith("10.") or dst.startswith("7.7.7.")):  # filter internal traffic
    is_valid_record = False
if prt == "icmp":
    is_valid_record = False

db.execute(
    f"INSERT INTO deltas (sip, dip, port, protocol, datetime) values ('source.ip', 'destination.ip', 'destination.port', 'network.protocol', '{pd.to_datetime(df['ts'],infer_datetime_format=True)}')")
query_since_ts = 'ts'

In [20]:
db.execute(f"INSERT INTO {system_query_table_name} (data_source,ts) values ('{index}', {query_since_ts})")
db.commit()

# first we create an empty array to store the delta results
processed = []
last_row = None
connection_id = 0

############################################################
############################################################
# calculate each connection's delta time
############################################################
############################################################
cursor = db.execute(f"SELECT id, sip, dip, port, protocol, datetime from deltas where delta is null order by dip, sip, port, protocol, datetime")
print(f"Processing {cursor.rowcount} deltas...")
for row in cursor:

    if (last_row is not None) and (row["sip"] == last_row["sip"] and (row["dip"] == last_row["dip"]) and (row["port"] == last_row["port"]) and (row["protocol"] == last_row["protocol"])):
        # if the new row matches the last row, we want to calculate its delta, so no action needed
        pass
    else:
        # this is a new connection! so we need to remove the last row and increase the delta count
        last_row = None
        connection_id += 1

    if last_row is None:
        delta = 0
    else:
        print(row["datetime"], last_row["datetime"], end="\r")
        # Compute the milliseconds in a timedelta as floating-point number between this row and the previous row
        delta = (pd.to_datetime(row["datetime"]) - pd.to_datetime(last_row["datetime"])).total_seconds() * 1e3

    db.execute(f"update deltas set connection_id={connection_id}, delta={delta} where id={row['id']}")

    last_row = row

db.commit()

OperationalError: no such column: ts

In [None]:
            ############################################################
            ############################################################
            # cluster the deltas
            ############################################################
            ############################################################
            TOP_TARGETS = []
            TOP_TARGET_DIPS = []
            MINIMUM_DELTA = 60000  # this is the minimum delta we will accept write to the delta file (short deltas are not useful and get found out by a "top talkers" report)

            ds = pd.read_sql_query("SELECT * FROM deltas", db)
            connection_ids = ds.connection_id.unique()
            print(f"{len(connection_ids)} unique connection_ids found ({(time.time()-start_time)} seconds)")
            ds["delta"] = ds["delta"] / 1000 / 60

            MINIMUM_POINTS_IN_CLUSTER = 4  # this is the minimum number of points we will accept in a cluster, TODO future work to dig deeper into optimum values here
            MINIMUM_LIKELIHOOD = .70

            id = time.strftime('%Y%m%d-%H%M%S')
            start_time = time.time()
            html_file = f"data/reports/report-{id}.html"
            connection_count = 0

            with open(html_file, "w") as fi:
                fi.write("")

            threads = []
            for connection_id in connection_ids:
                connection_count += 1

                # get all of the records from the delta file with this connection_id
                f = ds.query(f"connection_id == {connection_id}").copy()

                ############################################################
                ############################################################
                # _thread.start_new_thread(cluster, (html_file, connection_id, connection_count, f, MINIMUM_POINTS_IN_CLUSTER, MINIMUM_LIKELIHOOD))

                cluster(html_file, connection_id, connection_count, f, MINIMUM_POINTS_IN_CLUSTER, MINIMUM_LIKELIHOOD)

            for thread in threads:
                thread.join()

            print(f"\n{connection_count} clusters processed in ({(time.time()-start_time)} seconds)")

            TOP_TARGETS = sorted(set(map(tuple, TOP_TARGETS)), key=lambda x: x[3], reverse=True)
            TOP_TARGET_DIPS = sorted(list(set(TOP_TARGET_DIPS)), reverse=True)

            # write template file
            template_data = ""
            with open("templates/report.html", 'r+') as f:
                template_data = f.read()

            html = ""
            if(len(TOP_TARGETS) > 0):
                html += "<div id='top_talkers' class='row'>\n"
                html += "<div class='column'>\n"
                html += "<h1>Suspect destination IPs by cluster/likelihood</h1>\n"
                html += "<ul>\n"
                for row in TOP_TARGET_DIPS:
                    html += f"<li>{row}</li>\n"
                html += "</ul>\n"
                html += "</div>\n"
                html += "<div class='column'>\n"
                html += "<h3>Clusters of Note</h3>\n"
                html += "<ul>\n"
                for row in TOP_TARGETS:
                    html += f"<li><a href='#{row[0]}'>{row[1]} ==> {row[2]} ({round(row[3]*100)}%)</a></li>\n"
                html += "</ul>\n"
                html += "</div>\n"
                html += "</div>\n"

            # with open("data/chunks/summary.txt", "r") as f:
            #     raw = f.read().split("|")
            #     template_data = template_data.replace("{{start}}", str(datetime.datetime.fromtimestamp(float(raw[0]))))
            #     template_data = template_data.replace("{{end}}", str(datetime.datetime.fromtimestamp(float(raw[1]))))
            #     template_data = template_data.replace("{{file_count}}", "{:,d}".format(int(raw[2])))
            #     template_data = template_data.replace("{{total_records}}", "{:,d}".format(int(raw[3])))
            #     template_data = template_data.replace("{{filtered_records}}", "{:,d}".format(int(raw[4])))

            with open(html_file, 'r+') as f:
                content = f.read()
                f.seek(0, 0)

                template_data = template_data.replace("{{top_talkers}}", html)
                template_data = template_data.replace("{{report}}", content)
                template_data = template_data.replace("{{id}}", id)

                f.write(template_data)

            print(f"Found {len(TOP_TARGETS)} top targets in {time.time() - start_time}.")
            print(f"View the report here: {html_file}")

            ############################################################
            ############################################################
            # time to do it again!
            ############################################################
            ############################################################
            print(f"Sleeping for {sleep_time} seconds...", end="\r")
            time.sleep(sleep_time)