#  Iterate Destination Hosts and associated Ports, over time periods, to create multiple Analytical Data Sets &nbsp;&nbsp; (in Big Query)

In [14]:
# use print only as a function
from __future__ import print_function
import sys
sys.version

'2.7.9 (default, Jun 29 2016, 13:08:31) \n[GCC 4.9.2]'

## Connect to data and read into dataframe

In [15]:
__author__ = 'swe03'

import numpy as np
import pandas as pd
import pandas.io.gbq as pdg
import matplotlib.pylab as plt
import matplotlib.pyplot as pplt
from decimal import *
import re 

%matplotlib inline
from matplotlib.pylab import rcParams
rcParams['figure.figsize'] = 15, 6

from scipy import stats

from datetime import datetime, timedelta

desired_width = 250
pd.set_option('display.width',desired_width)
pd.set_option('display.max_rows', 500)

###### Install the SQL package if not already installed 

In [16]:
#!pip install pandasql

In [17]:
from pandasql import PandaSQL 
pdsql = PandaSQL()

##### Use an existing table but change the timestamps to create a contiguous distribution.

In [18]:
def Get_recs_w_date_range(date_s,date_e, addr):
    """Iterate through various date ranges to create the a timeframe sample for later aggregation"""
    global dfx2   # Otherwise, dfx2 is considered Local and will not be global scope of the dataframe created above
    query = """SELECT Timestamp, src_addr, src_port, dst_addr, dst_port,
                 cast(duration_ms as integer) as duration_ms,
                 cast(bytes as integer) as bytes,
                 protocol, flow_direction, tcp_flags 
               FROM ipfix.ipfix                
               WHERE Timestamp BETWEEN timestamp('{}') AND timestamp('{}')
                 AND dst_addr in ({})                  # single quotes unnecessary since the ip's are quoted in string 
               LIMIT 10 """.format(date_s,date_e,addr)
               
    #print('The value of local var date_s is: {}'.format(date_s))
    #print('The value of local var date_e is: {}'.format(date_e))
    print('The value of local var addr is: {}'.format(addr))
    dfx1 = pd.read_gbq(query, project_id="network-sec-analytics")
    dfx2 = dfx2.append(dfx1)                                       # Append onto the dfx2 dataframe
    return           

In [19]:
def Get_recs_w_date_range_lowgrain(date_s,date_e):
    """Iterate through various date ranges to create the a timeframe sample for later aggregation"""
    global dfx2   # Otherwise, dfx2 is considered Local and will not be global scope of the dataframe created above
    query = """SELECT Timestamp, src_addr, src_port, dst_addr, dst_port,
                 cast(duration_ms as integer) as duration_ms,
                 cast(bytes as integer) as bytes,
                 protocol, flow_direction, tcp_flags 
               FROM ipfix.ipfix                
               WHERE Timestamp BETWEEN timestamp('{}') AND timestamp('{}')
                 AND dst_addr in ('165.130.217.229') 
                 AND dst_port in ('443')
                 AND src_addr in ('172.29.236.82')""".format(date_s,date_e)
               
    #print('The value of local var date_s is: {}'.format(date_s))
    #print('The value of local var date_e is: {}'.format(date_e))
    #print('The value of local var addr is: {}'.format(addr))
    dfx1 = pd.read_gbq(query, project_id="network-sec-analytics")
    dfx2 = dfx2.append(dfx1)                                       # Append onto the dfx2 dataframe
    return       

In [20]:
def Write_to_gbq(addr_port_s):
  #model_addr_port = "'" + 'prod.' + addr_port_s + "'"
  model_addr_port = 'prod.' + addr_port_s  
  parm_str = pd.Series([model_addr_port]) 
  print("This is the parm str:",parm_str)
  
  pdg.to_gbq(dfx2, model_addr_port, "network-sec-analytics", verbose=True, reauth=False, 
   if_exists='replace', private_key=None)

  #dfx2.to_gbq(parm_str, "network-sec-analytics", verbose=True, reauth=False, 
   #if_exists='replace', private_key=None)
  return

In [32]:
def Initialize_and_Iterate():
  date_start = pd.to_datetime('2017-02-01 00:00:00')           # '2017-02-01 00:00:00'
  date_end_interval   = pd.to_datetime('2017-02-01 01:00:00')  # '2017-02-01 10:59:59'
  
  while date_start <= pd.to_datetime(date_end_interval):
    date_end = date_start + timedelta(minutes=59,seconds=59)   # Set the datetime end of the hour interval
    print('For get_recs function date_start=',date_start)
    print('For get_recs function date_end=',date_end)
    
    Get_recs_w_date_range(date_start,date_end,out_str) # Extract the query Limit above within the specified hour  
    
    date_start = date_end + timedelta(seconds=1)           # Add a second to the nn:59:59 end date to start the next # hour on nn:00:00 start time
  
  return      

In [30]:
def Initialize_and_Iterate_lowgrain():
  date_start = pd.to_datetime('2017-02-01 00:00:00')           # '2017-02-01 00:00:00'
  date_end_interval   = pd.to_datetime('2017-02-01 01:00:00')  # '2017-02-01 10:59:59'
  
  while date_start <= pd.to_datetime(date_end_interval):
    date_end = date_start + timedelta(minutes=59,seconds=59)   # Set the datetime end of the hour interval
    print('For get_recs function date_start=',date_start)
    print('For get_recs function date_end=',date_end)
    
    Get_recs_w_date_range_lowgrain(date_start,date_end) # Extract the query Limit above within the specified hour  
    
    date_start = date_end + timedelta(seconds=1)            
                                                            
  return      

In [23]:
def Create_date_hour():
  global dfx2
  dfx2['duration_ms'].fillna(0, inplace=True)
  dfx2['date_hour'] = dfx2.Timestamp.dt.strftime('%Y-%m-%d-%H')  # This works and creates a Series with Date and Hour
  dfx2['date_hour'] = pd.to_datetime(dfx2['date_hour'] )
  dfx2.reset_index(drop=True, inplace=True)
  return 

### Initiate the Process

##### Read the specific Host Segments reference table from BQ

In [24]:
## This function accomplishes both: 1. Creating an enumerated list of Host Segments to be sent to the Get_recs... function and
##                                  2. Parsing the text strings to enable generating the enumerated list
def read_in_host_segments():
  global out
  host_segs = pd.read_gbq("select Host_Range_Start, Host_Range_End from reference.Host_Segments", project_id="network-sec-analytics")
  host_start = host_segs.Host_Range_Start.str.split('.',expand=True).astype(int)  # This creates a DF.  type(host_start)
  host_end = host_segs.Host_Range_End.str.split('.',expand=True).astype(int)  # This creates a DF.  type(host_end)
  host_start2 = host_segs.join(host_start[3])   # Join the host_seg table with just the last component of the address as int
  host_start2 = host_start2.join(host_end[3],rsuffix='E') # Join to get the host range end value
  host_start2.rename(columns={"3":"start","3E":"end"},inplace=True) 
  host_start2.replace(to_replace='\w+$', value=' ', inplace=True, limit=None, regex=True, method='pad', axis=None) # place a blank in the last addr component
  
  temp_list = []
  end_str = ''
  for index, row in host_start2.iterrows(): 
    counter = host_start2['start'][index]  # necessary to include index or will get "truth value of a Series is ambiguous"
    last = host_start2['end'][index]
  while counter <= last:
    end_str = counter.astype(str)   ## counter retains int dtype
    temp_dict = {'host': row.Host_Range_Start.strip() + end_str}
    temp_list.append(temp_dict)
    counter = counter + 1
    
  out = pd.DataFrame(temp_list)   ## Currently, this will overwrite each df and only retain the last record's df

  return


read_in_host_segments()

Requesting query... ok.
Query running...
Query done.
Processed: 98.0 b

Retrieving results...
Got 3 rows.

Total time taken 1.09 s.
Finished at 2017-04-14 19:19:19.


In [25]:
out

Unnamed: 0,host
0,165.130.217.225
1,165.130.217.226
2,165.130.217.227
3,165.130.217.228
4,165.130.217.229
5,165.130.217.230
6,165.130.217.231
7,165.130.217.232
8,165.130.217.233
9,165.130.217.234


In [26]:
out_array=out['host'].values
#out_array
for i in range(0, len(out_array)):
  if i == 0:
    out_str = "'" + out_array[i] + "'" + ","
  else:
    if i == (len(out_array)-1):
      out_str += "'" + out_array[i] + "'" 
    else:
      out_str += "'" + out_array[i] + "',"

In [27]:
#print(out_str)
#out_str = out_str + ',' + '165.130.1.9'
print(out_str)

'165.130.217.225','165.130.217.226','165.130.217.227','165.130.217.228','165.130.217.229','165.130.217.230','165.130.217.231','165.130.217.232','165.130.217.233','165.130.217.234','165.130.217.235','165.130.217.236','165.130.217.237','165.130.217.238'


##### Call the functions

In [33]:
#global i 
#n = 2
#del dfx2
#for i in range(0, n):                # Set n to be the n number of addr/port ads's to create
dfx2 = pd.DataFrame()              # Create the df that will be appended to for each timeframe
Initialize_and_Iterate()           # For each Host/Port, iterate through the timeframes 
Create_date_hour()                 # Create a Day/Hour variable for the final, appended df
  #del dfx2                           # df has been written to BQ so now delete it

For get_recs function date_start= 2017-02-01 00:00:00
For get_recs function date_end= 2017-02-01 00:59:59
The value of local var addr is: '165.130.217.225','165.130.217.226','165.130.217.227','165.130.217.228','165.130.217.229','165.130.217.230','165.130.217.231','165.130.217.232','165.130.217.233','165.130.217.234','165.130.217.235','165.130.217.236','165.130.217.237','165.130.217.238'
Requesting query... ok.
Query running...
Query done.
Processed: 69.9 Gb

Retrieving results...
Got 10 rows.

Total time taken 2.56 s.
Finished at 2017-04-14 19:24:57.
For get_recs function date_start= 2017-02-01 01:00:00
For get_recs function date_end= 2017-02-01 01:59:59
The value of local var addr is: '165.130.217.225','165.130.217.226','165.130.217.227','165.130.217.228','165.130.217.229','165.130.217.230','165.130.217.231','165.130.217.232','165.130.217.233','165.130.217.234','165.130.217.235','165.130.217.236','165.130.217.237','165.130.217.238'
Requesting query... ok.
Query running...
Query done.


In [None]:
#dfx2 = pd.DataFrame()              # Create the df that will be appended to for each timeframe
#Initialize_and_Iterate_lowgrain()           # For each Host/Port, iterate through the timeframes 
#Create_date_hour()                 # Create a Day/Hour variable for the final, appended df

In [35]:
dfx_s=dfx2.sort_values(['date_hour','dst_addr'],ascending=True)
dfx_s

Unnamed: 0,Timestamp,src_addr,src_port,dst_addr,dst_port,duration_ms,bytes,protocol,flow_direction,tcp_flags,date_hour
1,2017-02-01 00:16:06,172.29.236.82,45588,165.130.217.229,443,7.0,1001,tcp,,[FS.PA...],2017-02-01 00:00:00
0,2017-02-01 00:07:30,172.29.237.82,6981,165.130.217.230,443,49.0,3000,tcp,,[FS.PA...],2017-02-01 00:00:00
2,2017-02-01 00:33:12,172.29.236.82,28261,165.130.217.230,443,7.0,1001,tcp,,[FS.PA...],2017-02-01 00:00:00
3,2017-02-01 00:52:49,172.29.236.82,16346,165.130.217.230,443,7.0,1001,tcp,,[FS.PA...],2017-02-01 00:00:00
8,2017-02-01 00:43:28,172.29.237.82,36679,165.130.217.230,443,6.0,2002,tcp,,[FS.PA...],2017-02-01 00:00:00
9,2017-02-01 00:08:15,172.29.237.82,58667,165.130.217.230,443,40658.0,720,tcp,,[..R.A...],2017-02-01 00:00:00
4,2017-02-01 00:46:48,172.29.237.82,2381,165.130.217.231,443,22697.0,640,tcp,,[..R.A...],2017-02-01 00:00:00
5,2017-02-01 00:54:40,172.29.236.82,19986,165.130.217.231,443,8.0,961,tcp,,[FS.PA...],2017-02-01 00:00:00
6,2017-02-01 00:32:23,172.29.236.82,8978,165.130.217.231,443,13.0,1273,tcp,,[FS.PA...],2017-02-01 00:00:00
7,2017-02-01 00:39:28,151.140.78.211,9997,165.130.217.235,50218,271.0,88,tcp,,[FS..A...],2017-02-01 00:00:00


### Create the Behaviorial Metrics (i.e., In-Degree Entropy measures)

In [76]:
host_count = dfx_s.groupby(dfx_s['date_hour']).count()   ## This creates a dataframe
host_count['dst_addr'] = pd.to_numeric(host_count['dst_addr'],downcast='float')   ## another way to conver is below
host_count['dst_addr']

date_hour
2017-02-01 00:00:00    10.0
2017-02-01 01:00:00    10.0
Name: dst_addr, dtype: float32

In [None]:
unq_src_count=dfx_s.groupby(['date_hour','dst_addr']).src_addr.nunique()    ## This creates a Series
unq_src_count[1]

In [62]:
type(unq_src_count)

pandas.core.series.Series

In [75]:
unq_src_count_df = pdsql("""Select * from unq_src_count u""", locals())   ## unq_src_count is a Series
#type(unq_src_count_df)                                                    ## test is a Dataframe
unq_src_count_df['src_addr'] = unq_src_count_df.src_addr.astype(float)
unq_src_count_df['src_addr']                                              ## Dataframe has cols:  date_hour, dst_addr, 
                                                                          ## and src_addr (which are the unique counts)

0    1.0
1    2.0
2    2.0
3    1.0
4    1.0
5    2.0
6    2.0
7    2.0
Name: src_addr, dtype: float64

In [85]:
calc_ent1 = pdsql("""SELECT s.date_hour, s.dst_addr as dst_addr_s, s.src_addr, i.dst_addr, (s.src_addr / i.dst_addr) as ent1 
                  from host_count i join unq_src_count_df s
                  on  i.date_hour = s.date_hour 
                  order by s.date_hour""",locals())
print(calc_ent1)

                    date_hour       dst_addr_s  src_addr  dst_addr  ent1
0  2017-02-01 00:00:00.000000  165.130.217.229       1.0      10.0   0.1
1  2017-02-01 00:00:00.000000  165.130.217.230       2.0      10.0   0.2
2  2017-02-01 00:00:00.000000  165.130.217.231       2.0      10.0   0.2
3  2017-02-01 00:00:00.000000  165.130.217.235       1.0      10.0   0.1
4  2017-02-01 01:00:00.000000  165.130.217.228       1.0      10.0   0.1
5  2017-02-01 01:00:00.000000  165.130.217.229       2.0      10.0   0.2
6  2017-02-01 01:00:00.000000  165.130.217.230       2.0      10.0   0.2
7  2017-02-01 01:00:00.000000  165.130.217.231       2.0      10.0   0.2


In [86]:
calc_ent1['ln_ent1'] = np.log(calc_ent1['ent1'])
calc_ent1

Unnamed: 0,date_hour,dst_addr_s,src_addr,dst_addr,ent1,ln_ent1
0,2017-02-01 00:00:00.000000,165.130.217.229,1.0,10.0,0.1,-2.302585
1,2017-02-01 00:00:00.000000,165.130.217.230,2.0,10.0,0.2,-1.609438
2,2017-02-01 00:00:00.000000,165.130.217.231,2.0,10.0,0.2,-1.609438
3,2017-02-01 00:00:00.000000,165.130.217.235,1.0,10.0,0.1,-2.302585
4,2017-02-01 01:00:00.000000,165.130.217.228,1.0,10.0,0.1,-2.302585
5,2017-02-01 01:00:00.000000,165.130.217.229,2.0,10.0,0.2,-1.609438
6,2017-02-01 01:00:00.000000,165.130.217.230,2.0,10.0,0.2,-1.609438
7,2017-02-01 01:00:00.000000,165.130.217.231,2.0,10.0,0.2,-1.609438


In [89]:
calc_ent2 = pdsql("""SELECT i.*, sum(i.ent1 * i.ln_ent1) as hx_ent, count(dst_addr_s) as unq_dst_host_count
                  from calc_ent1 i
                  group by i.date_hour
                  order by i.date_hour""",locals())
calc_ent2

Unnamed: 0,date_hour,dst_addr_s,src_addr,dst_addr,ent1,ln_ent1,hx_ent,unq_dst_host_count
0,2017-02-01 00:00:00.000000,165.130.217.235,1.0,10.0,0.1,-2.302585,-1.104292,4
1,2017-02-01 01:00:00.000000,165.130.217.231,2.0,10.0,0.2,-1.609438,-1.195921,4


In [91]:
calc_ent2['In_degree_entropy'] = -(calc_ent2['hx_ent'] / np.log(calc_ent2['unq_dst_host_count']))
calc_ent2

Unnamed: 0,date_hour,dst_addr_s,src_addr,dst_addr,ent1,ln_ent1,hx_ent,unq_dst_host_count,entropy,In_degree_entropy
0,2017-02-01 00:00:00.000000,165.130.217.235,1.0,10.0,0.1,-2.302585,-1.104292,4,0.796578,0.796578
1,2017-02-01 01:00:00.000000,165.130.217.231,2.0,10.0,0.2,-1.609438,-1.195921,4,0.862675,0.862675


In [None]:
##### Write the dataframe to BQ
dfx2.to_gbq('prod.host_group2', "network-sec-analytics", verbose=True, reauth=False, 
   if_exists='replace', private_key=None)

In [None]:
def create_unistats(group):
    return {'min': group.min(), 'max': group.max(), 'count': group.count(), 'mean': group.mean()}
  
# These are DataFrames
bytes_dist = dfx2['bytes'].groupby([dfx2['date_hour'],   
                                  dfx2['dst_port']]).apply(create_unistats).unstack().reset_index()
bytes_dist = bytes_dist[(bytes_dist['dst_port']==53) ]   # all stats are displayed for just port 53

duration_dist = dfx2['duration_ms'].groupby([dfx2['date_hour'],   
                                  dfx2['dst_port']]).apply(create_unistats).unstack().reset_index()