In [1]:
from netaddr import IPNetwork, IPAddress
#c
#CITY_BLOCKS_PATH    = ''
#CITY_LOCATIONS_PATH = ''
#DATA_SAMPLE_PATH    = ''

#m
CITY_BLOCKS_PATH    = 'GeoLite2-City-Blocks-IPv4.csv'
CITY_LOCATIONS_PATH = 'GeoLite2-City-Locations-en.csv'
DATA_SAMPLE_PATH    = '100.csv'

#s
#CITY_BLOCKS_PATH    = '/FileStore/tables/jk3parwb1484571151117/GeoLite2_City_Blocks_IPv4-82d63.csv'
#CITY_LOCATIONS_PATH = '/FileStore/tables/1rn58fwl1484571610700/GeoLite2_City_Locations_en-6f8fe.csv'
#DATA_SAMPLE_PATH    = '/FileStore/tables/tjpvf32z1484575566663/100.csv'

def isIpInNet(ip, net):
  """
  Checks if a ip is part of a net.
  e.g. isIpInNet("192.168.0.1", "192.168.0.0/24")
  """
  if IPAddress(ip) in IPNetwork(net):
    return True
  return False

def buildNetworkCountryMap():
  """
  Builds a map with data from geoIp2 (http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz)
  1st file contains networks and locationkey (much more data available but we consider only this)
  2nd file contains locationkey and country name.
  method joins this two datarecords
  
  returns an RDD as [('network', 'country'), ...]
  """
  geoipfile = sc.textFile(CITY_BLOCKS_PATH, 2)
  
  #(geoname_id, network) / filters the header
  geoipdata = geoipfile.map(lambda l:l.split(',')).filter(lambda l:l[0] not in 'network').map(lambda p: (p[1], p[0]))

  locationsfile = sc.textFile(CITY_LOCATIONS_PATH, 2)
  
  #(geoname_id, country_name) / filters the header
  locationsdata = locationsfile.map(lambda l:l.split(',')).filter(lambda l:l[0] not in 'geoname_id').map(lambda p: (p[0], p[5]))

  #join on geoname_id
  joineddata = geoipdata.join(locationsdata)
  
  #(network, country_name)
  networkmap = joineddata.map(lambda d: (d[1][0],d[1][1]))
  return networkmap

def getCountryByIpBc(ip):
  """
  Returns the country for the given ip or None if not found
  """
  firstoctet = ip.split(".")[0]  
  #For speed improvements we build a map with the first 3 digits of the ip. # "built" or move to actual creation of map
  
  if firstoctet in reducednetworkmapbc.value: 
    for network in reducednetworkmapbc.value[firstoctet]:
      if isIpInNet(ip, network[0]):
        return network[1]
  return None
  
networkmap = buildNetworkCountryMap().persist()
#We build a map with the first part of the ip, eg. 255 and a list of all the networks (max 4.294.967.296 adresse w/ ipv4)
reducednetworkmap = networkmap.map(lambda e: (e[0].split(".")[0], [e])).reduceByKey(lambda a,b: a+b)

#We need to broadcast this to all nodes to be able to access this map from inside a map function
reducednetworkmapbc = sc.broadcast(reducednetworkmap.collectAsMap())

In [2]:
#read the firewall data
sourcefile = sc.textFile(DATA_SAMPLE_PATH).filter(lambda l:l[0] not in 'ts')
#get list of ips and country

#-> here sollten wir noch distinct machen auf der ip wrde nochmals viel schneller werden, oder?!
# yep, vorallem da unsere 4.5s nur fuer take(5) gelten, bei take(100) sind's bereits 47s - von daher gute Optimierung ;o)
#ipCountry = sourcefile.map(lambda l: l.split(',')).flatMap(lambda l: list(set(l[3]))).map(lambda ip: (ip, getCountryByIpBc(ip))) #47s
ipCountry = sourcefile.map(lambda l: l.split(',')).map(lambda line: line[3]).distinct().map(lambda sa: (sa, getCountryByIpBc(sa))).filter(lambda l: not (l[1]==None) and not(l[1]==''))
#print networkmap.count() #2'766'452
#print reducednetworkmap.count() #221 => 2'766'452 / 221 = 12'500; 12'500 / 2 = 6'250 mean average (loops) after dictionary access instead of 2'766'452 / 2 = 1'383'226 (calculation supposes a uniform distribution - which is likely not the case)

In [3]:
countries = ipCountry.map(lambda l: l[1]).distinct().collect()
print countries