In [None]:
from netaddr import IPNetwork, IPAddress
import matplotlib.pyplot as plt
import os

# c
#CITY_BLOCKS_PATH    = ''
#CITY_LOCATIONS_PATH = ''
#DATA_PATHS          = ''

# m Databricks paths
CITY_BLOCKS_PATH    = '/FileStore/tables/5ne9pwzz1484570889985/GeoLite2_City_Blocks_IPv4-82d63.csv'
CITY_LOCATIONS_PATH = '/FileStore/tables/fcj10i6i1484571614099/GeoLite2_City_Locations_en-6f8fe.csv'
DATA_PATHS          = ['/FileStore/tables/xqubiq301484572436491/NetworkTraffic100.csv',\
                       '/FileStore/tables/810vguox1485019521316/100000am.csv',\
                       '/FileStore/tables/810vguox1485019521316/100000pm.csv']

# m local paths
CITY_BLOCKS_PATH    = 'geolite/GeoLite2-City-Blocks-IPv4.csv'
CITY_LOCATIONS_PATH = 'geolite/GeoLite2-City-Locations-en.csv'
DATA_PATHS          = ['data/NetworkTraffic100.csv',\
                       'data/100000am.csv',\
                       'data/100000pm.csv',\
                       'data/1000000am.csv',\
                       'data/1000000pm.csv']
# m local, from directory
DATA_PATHS = [os.path.join('data', file) for file in os.listdir('data') if file.endswith('.csv')]

# s
#CITY_BLOCKS_PATH    = '/FileStore/tables/jk3parwb1484571151117/GeoLite2_City_Blocks_IPv4-82d63.csv
#CITY_LOCATIONS_PATH = '/FileStore/tables/1rn58fwl1484571610700/GeoLite2_City_Locations_en-6f8fe.csv'
#DATA_PATHS          = '/FileStore/tables/tjpvf32z1484575566663/100.csv']

minColSet            = True # True to restrict the RDD to the required columns
persist              = True # True to persist the data RDD. Can lead to warnings 'Not enough memory to cache ...', falls back to file system
useBroadcastVariable = True # True to broadcast the reduced network map

In [None]:
def isIpInNet(ip, net):
  """
  Checks if a ip is part of a net.
  e.g. isIpInNet("192.168.0.1", "192.168.0.0/24")
  """
  if IPAddress(ip) in IPNetwork(net):
    return True
  return False

def buildNetworkCountryMap():
  """
  Builds a map with data from geoIp2 (http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz)
  1st file contains networks and locationkey (much more data available but we consider only this)
  2nd file contains locationkey and country name.
  method joins this two datarecords
  
  returns an RDD as [('network', 'country'), ...]
  """
  geoipfile = sc.textFile(CITY_BLOCKS_PATH, 2)
  
  #(geoname_id, network) / filters the header
  geoipdata = geoipfile.map(lambda l:l.split(',')).filter(lambda l:l[0] not in 'network').map(lambda p: (p[1], p[0]))

  locationsfile = sc.textFile(CITY_LOCATIONS_PATH, 2)
  
  #(geoname_id, country_name) / filters the header
  locationsdata = locationsfile.map(lambda l:l.split(',')).filter(lambda l:l[0] not in 'geoname_id').map(lambda p: (p[0], p[5]))

  #join on geoname_id
  joineddata = geoipdata.join(locationsdata)
  
  #(network, country_name)
  networkmap = joineddata.map(lambda d: (d[1][0],d[1][1]))
  return networkmap

def getCountryByIp(ip):
  """
  Returns the country for the given ip or None if not found
  """
  firstoctet = ip.split(".")[0]
  
  if firstoctet in collectedNetworkMap: 
    for network in collectedNetworkMap[firstoctet]:
      if isIpInNet(ip, network[0]):
        return network[1]
  return None

def getCountryByIpBc(ip):
  """
  Returns the country for the given ip or None if not found; using a broad cast variable.
  """
  firstoctet = ip.split(".")[0]
  
  if firstoctet in reducednetworkmapbc.value: 
    for network in reducednetworkmapbc.value[firstoctet]:
      if isIpInNet(ip, network[0]):
        return network[1]
  return None

def loadData(filePath):
  """ Loads the network traffic data from the specified file.
      Args:     path to the file to load.
      Returns:  an RDD containing each line (except the header) of the file as a list of values.
  """
  return sc.textFile(filePath).map(lambda l: l.split(",")).filter(lambda l:l[0] not in 'ts' and len(l) > 14)

In [None]:
networkmap = buildNetworkCountryMap()

#We build a map with the first part of the ip, eg. 255 and a list of all the networks to improve performance (max 4.294.967.296 adresse w/ ipv4)
reducednetworkmap = networkmap.map(lambda e: (e[0].split(".")[0], [e])).reduceByKey(lambda a,b: a+b)

#print networkmap.count() #2'766'452
#print reducednetworkmap.count() #221 => 2'766'452 / 221 = 12'500; 12'500 / 2 = 6'250 mean average (loops) after dictionary access instead of 2'766'452 / 2 = 1'383'226 (calculation supposes a uniform distribution - which is likely not the case)

collectedNetworkMap = reducednetworkmap.collectAsMap()
if useBroadcastVariable:
  reducednetworkmapbc = sc.broadcast(collectedNetworkMap)

In [None]:
#read the firewall data
data = loadData(DATA_PATHS[0])
for path in range(1, len(DATA_PATHS)):
  data = data.union(loadData(DATA_PATHS[path]))

sourceAddressIndex = 3
destinationAddressIndex = 4
sourcePortIndex = 5
destinationPortIndex = 6
protocolIndex = 7
inBytesIndex = 12
outBytesIndex = 14

if minColSet:
  data = data.map(lambda r: [r[3], r[4], r[5], r[6], r[7], r[12], r[14]]) # can't use index vars b/c of lazy execution (no closures)
  sourceAddressIndex = 0
  destinationAddressIndex = 1
  sourcePortIndex = 2
  destinationPortIndex = 3
  protocolIndex = 4
  inBytesIndex = 5
  outBytesIndex = 6

if persist:
  data.persist() # lazy excution, (cache is a synonym of persist w/ MEMORY_ONLY storage level)

#print data.getNumPartitions()

In [None]:
# requests by country;
# PERFORMANCE   2m 26s (8 cores)
# minColSet:    2m 22s
# minColSet && perist(): (1st call: 2m 32s, subsequent calls: 1m 57s) - few 'Not enough memory to cache ...' warnings
# persist():             (1st call: 2m 28s, subsequent calls: 2m 8s) - some 'Not enough memory to cache ...' warnings
if useBroadcastVariable:
  requestsByCountry = data\
    .map(lambda line: line[sourceAddressIndex]).distinct()\
    .map(lambda sa: (getCountryByIpBc(sa), 1))\
    .filter(lambda r: r[0] is not None)\
    .reduceByKey(lambda a, b: a + b)\
    .takeOrdered(10, lambda x: -x[1]) # use takeOrdered for small result sets only
else:
  requestsByCountry = data\
    .map(lambda line: line[sourceAddressIndex]).distinct()\
    .map(lambda sa: (getCountryByIp(sa), 1))\
    .filter(lambda r: r[0] is not None)\
    .reduceByKey(lambda a, b: a + b)\
    .takeOrdered(10, lambda x: -x[1])
  # CPU load about 20% for the first 30-40s, in comparison to 100% from the get go when using broadcast var

In [None]:
# visualize requests by country
fig = plt.figure(figsize=(10, 4))
plt.pie([r[1] for r in requestsByCountry],\
        explode=[0.1 if i == 0 else 0.0 for i, r in enumerate(requestsByCountry)],\
        labels=[r[0] for r in requestsByCountry],\
        autopct='%1.1f%%', shadow=True, startangle=90)
plt.axis('equal')
fig.show()

In [None]:
# VPN data definition (no collection)
vpnIdentifier = '192.168'
vpnData = data.filter(lambda r: r[sourceAddressIndex].startswith(vpnIdentifier) or r[destinationAddressIndex].startswith(vpnIdentifier))

In [None]:
# requests by protocol
print vpnData\
  .map(lambda r: (r[protocolIndex], 1))\
  .reduceByKey(lambda a, b: a + b)\
  .takeOrdered(10, lambda x: -x[1])

In [None]:
# requests by port (53: Namensaufloesung, 3128: proxy)
print vpnData\
  .flatMap(lambda r: [(r[sourcePortIndex], 1), (r[destinationPortIndex], 1)])\
  .reduceByKey(lambda a, b: a + b)\
  .takeOrdered(10, lambda x: -x[1])

In [None]:
# heavy senders in MiB 194.9.121.8: proxy
heavySenders = data.map(lambda r: (r[sourceAddressIndex], int(r[outBytesIndex])))\
  .reduceByKey(lambda a, b: a + b)\
  .takeOrdered(20, lambda x: -x[1])

print [(x[0], x[1] / 1024.0 / 1024.0) for x in heavySenders]

In [None]:
# heavy receivers in MiB
heavyReceivers = data.map(lambda r: (r[sourceAddressIndex], int(r[inBytesIndex])))\
  .reduceByKey(lambda a, b: a + b)\
  .takeOrdered(20, lambda x: -x[1])

print [(x[0], x[1] / 1024.0 / 1024.0) for x in heavyReceivers]

In [None]:
# overall data whores in MiB
overallDataWhores = data.map(lambda r: (r[sourceAddressIndex], int(r[inBytesIndex]) + int(r[outBytesIndex])))\
  .reduceByKey(lambda a, b: a + b)\
  .takeOrdered(20, lambda x: -x[1])

print [(x[0], x[1] / 1024.0 / 1024.0) for x in overallDataWhores]

In [None]:
# senders and receivers
data\
  .filter(lambda r: int(r[inBytesIndex]) > 0 and int(r[outBytesIndex]) > 0)\
  .map(lambda r: (r[sourceAddressIndex], int(r[inBytesIndex]) + int(r[outBytesIndex])))\
  .take(10)

### Execution times

<!-- would generate an HTML table w/o pre, but makes it hard to read b/c of cell alignment -->
<pre>
|---------------------------------|------------|-----------|-----------|-----------|
| Query                           | all cols   | min cols  | min cols  | min cols  |
|                                 | no persist | persist   | persist   | persist   |
|                                 |  (8 cores) | (8 cores) | (4 cores) | (2 cores) |
|---------------------------------|------------|-----------|-----------|-----------|
| Requests by Country             |  3m 36.00s | 3m 43.00s | 4m 12.00s | 5m 18.00s |
| Requests by Country (broadcast) |  2m 21.00s | 2m 32.00s | 2m 53.00s | 3m 37.00s |
| Requests by Protocol            |     29.00s |     2.90s |     3.28s |     5.41s |
| Requests by Port                |     31.70s |     5.01s |     5.90s |     9.08s |
| Heavy Senders (in MiB)          |     31.50s |     3.67s |     4.44s |     6.66s |
| Heavy Receivers (in MiB)        |     30.65s |     3.61s |     4.71s |     6.93s |
| Overall Data Whores (in MiB)    |     31.90s |     4.32s |     5.47s |     8.91s |
|---------------------------------|------------|-----------|-----------|-----------|

* 1 run, no averages
* Requests by Country is the first query and will therefore trigger caching of the RDD ~40s
* ~ 800MB Daten
* cores 4x2 (HT)
</pre>