In [None]:
import pandas as pd
from datetime import timedelta
import json
import requests
import time
import json
from geoip2 import database
reader = database.Reader('./geoip/GeoLite2-City_20190611/GeoLite2-City.mmdb')

HOSTNAME = "100.88.37.43"
PORT = 9200
ADDR = f"http://{HOSTNAME}:{PORT}"

headers = {'Content-type': 'application/json'}

def create_url(index_name, end_point="", i=""):
    return f"{ADDR}/{index_name}/{end_point}/{i}"

def bulk_send_dataframe(df, index):
    """
    index: elastic search index that you want to send to
    Example: 
    index="netflow":
    or index="features"
    Returns
        json message
    """
    action = { "index" : { "_index" : index}}
    action = json.dumps(action)
    # convert dataframe to list of dict
    to_send = [row.to_dict() for i, row in df.iterrows()]
    data_to_post = f"{action}\n" + f"\n{action}\n".join(json.dumps(d) for d in to_send)+ "\n"
    r = requests.post(url=f"{ADDR}/_bulk?pretty", data=data_to_post, headers=headers)
    return r.json()
    
def geo_helper(ip):
    """
    Extract the following from ip (String)
    - Latitude
    - Longitude
    - Country
    Returns
        ["{Latitude},{Longitude}", Country] or [None, None]
    """
    try:
        response = reader.city(ip)
        lat = response.location.latitude
        lon = response.location.longitude
        geostring = f"{round(lat,2)},{round(lon,2)}"
        country = response.country.names["en"]
        return [geostring, country]
    except:
        return [None, None]
    
def add_geoinformation(df, dest_col, src_col=None):
    """
    https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
    
    Read from df[dest_col] (and df[src_col] if given src_col)
    for each ip in each col:
    Apply geo_helper on each ip to create 2 new columns: 
    df["dest_location"] and df["dest_country"] (and src_location and src_country if given src_col)
    Returns:
        Dataframe with additional columns
    """
    dest_addr = df[dest_col]
    if src_col is not None:
        src_addr = df[src_col]
        addr = pd.concat([src_addr, dest_addr])
    else:
        addr = dest_addr
        
    geo_map = {ip: geo_helper(ip) for ip in addr.unique()}
    
    if src_col is not None:
        src_result = list(zip(*src_addr.map(geo_map)))
        df["src_location"], df["src_country"] = src_result
    
    dest_result = list(zip(*dest_addr.map(geo_map)))
    df["dest_location"], df["dest_country"] = dest_result
        
    return df

def add_geoinformation_and_send_dataframe(df, index):
    if index=="netflow":
        src_col="IPV4_SRC_ADDR"
    if index=="features":
        src_col=None
    df = add_geoinformation(df, dest_col='IPV4_DST_ADDR', src_col=src_col)
    response_message = bulk_send_dataframe(df, index)
    return response_message

In [2]:
def test_success(df, response_result):
    N, d = df.shape
    n_success = len(response_result["items"])
    print(f"Sent {n_success} out of {N} successful")

In [2]:
requests.delete(create_url("features"))
requests.delete(create_url("netflow"))

# define the mapping
features_schema = { 
        "IPV4_DST_ADDR": {"type": "ip"},
        "dest_location": { "type" : "geo_point" }, 
        "record_time": {
            "type": "date",
            "format": "yyyy-MM-dd HH:mm:ss"
                       }
}

netflow_schema = { 
        "Attack":      { "type": "integer" },
        "IPV4_SRC_ADDR": {"type": "ip"},
        "IPV4_DST_ADDR": {"type": "ip"},
        "EXPORTER_IPV4_ADDRESS": {"type": "ip"},
        "src_location": { "type" : "geo_point" },
        "dest_location": { "type" : "geo_point" }, 
        "record_time": {
            "type": "date",
            "format": "yyyy-MM-dd HH:mm:ss"
                       }
}

url = create_url("features")
r = requests.put(url=url, 
                 json={"mappings": {"properties": features_schema}},
                 headers=headers)
print(r.json())

url = create_url("netflow")
r = requests.put(url=url, 
                 json={"mappings": {"properties": netflow_schema}},
                 headers=headers)
print(r.json())

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'features'}
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'netflow'}


In [3]:
netflow = pd.read_csv("data.csv")
features = pd.read_csv("20-min-features.csv")

In [None]:
# netflow
# unique_time = netflow.record_time.unique()
# unique_time = pd.to_datetime(unique_time)
# unique_time
# new_times = (unique_time + timedelta(minutes=20))
# time_map = {unique_time[i].strftime("%Y-%m-%d %H:%M:00"): new_times[i].strftime("%Y-%m-%d %H:%M:00") \
#             for i in range(len(new_times))}
# time_map
# netflow["record_time"] = netflow.record_time.map(time_map)
# features["record_time"] = features.record_time.map(time_map)

In [4]:
# from datetime import datetime
# current_time = datetime.now()
# current_time.strftime("%Y-%m-%d %H:%M:%S")
unique_time = features.record_time.unique()
unique_time

array(['2018-12-01 00:00:00', '2018-12-01 00:01:00',
       '2018-12-01 00:02:00', '2018-12-01 00:03:00',
       '2018-12-01 00:04:00', '2018-12-01 00:05:00',
       '2018-12-01 00:06:00', '2018-12-01 00:07:00',
       '2018-12-01 00:08:00', '2018-12-01 00:09:00',
       '2018-12-01 00:10:00', '2018-12-01 00:11:00',
       '2018-12-01 00:12:00', '2018-12-01 00:13:00',
       '2018-12-01 00:14:00', '2018-12-01 00:15:00',
       '2018-12-01 00:16:00', '2018-12-01 00:17:00',
       '2018-12-01 00:18:00', '2018-12-01 00:19:00'], dtype=object)

In [None]:
from datetime import datetime

In [15]:
current_time_string

'2019-07-17 04:17:17'

In [16]:
for r_time in unique_time:
#     print(r_time)
    df = features[features["record_time"] == r_time]
    df = add_geoinformation(df, dest_col='IPV4_DST_ADDR')
    current_time_string = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
    print(current_time_string)
    df["record_time"] = current_time_string
    start = time.time()
    response_result = bulk_send_dataframe(df, 'features')
    test_success(df, response_result)
    print(f"Time taken: {time.time() - start}")
    
    time.sleep(30)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  import sys


2019-07-17 04:19:16
Sent 22 out of 22 successful
Time taken: 0.705718994140625
2019-07-17 04:19:46
Sent 22 out of 22 successful
Time taken: 0.6540226936340332
2019-07-17 04:20:17
Sent 23 out of 23 successful
Time taken: 0.7681765556335449
2019-07-17 04:20:48
Sent 23 out of 23 successful
Time taken: 0.7407703399658203
2019-07-17 04:21:19
Sent 21 out of 21 successful
Time taken: 0.6633322238922119
2019-07-17 04:21:50
Sent 23 out of 23 successful
Time taken: 0.7512097358703613
2019-07-17 04:22:20
Sent 20 out of 20 successful
Time taken: 0.654097318649292
2019-07-17 04:22:51
Sent 19 out of 19 successful
Time taken: 0.6461508274078369
2019-07-17 04:23:22
Sent 21 out of 21 successful
Time taken: 0.6468310356140137
2019-07-17 04:23:53
Sent 22 out of 22 successful
Time taken: 0.6633543968200684
2019-07-17 04:24:23
Sent 21 out of 21 successful
Time taken: 0.6991755962371826
2019-07-17 04:24:54
Sent 21 out of 21 successful
Time taken: 0.6846144199371338
2019-07-17 04:25:25
Sent 21 out of 21 succ

In [None]:
unique_time = netflow.record_time.unique()

for r_time in unique_time:
    if r_time in unique_time[:4]:
        continue
    print(r_time)
    df = netflow[netflow["record_time"] == r_time]
    df = add_geoinformation(df, dest_col='IPV4_DST_ADDR', src_col= 'IPV4_SRC_ADDR')
    start = time.time()
    response_result = bulk_send_dataframe(df, 'netflow')
    test_success(df, response_result)
    print(f"Time taken: {time.time() - start}")

In [None]:
# read table csv? dataframe
# everyone minute
# send to dashboard
def 

In [None]:
for r_time in unique_time:
    print(r_time)
    df = netflow[netflow["record_time"] == r_time]
    df = add_geoinformation(df, dest_col='IPV4_DST_ADDR', src_col= 'IPV4_SRC_ADDR')
    
    start = time.time()
    response_result = bulk_send_dataframe(df, 'netflow')
    test_success(df, response_result)
    print(f"Time taken: {time.time() - start}")