Permalink
Browse files

RIPE Expert bot added

  • Loading branch information...
Brajneesh Kumar Brajneesh Kumar
Brajneesh Kumar authored and Brajneesh Kumar committed Sep 6, 2018
1 parent bbe07d3 commit f1cf742498aa82fadd9db700ff5d8b40c2c09680
View
@@ -27,4 +27,3 @@ debian/intelmq.prerm.debhelper
debian/intelmq.substvars
debian/intelmq/
/.pc/*
.cache
View
@@ -740,6 +740,13 @@
"policy": "del,drop,drop"
}
},
"RIPE": {
"description": "RIPE is the bot responsible for adding geolocation information to events (Country, City, Longitude, Latitude, etc..)",
"module": "intelmq.bots.experts.ripe.expert",
"parameters": {
"overwrite": false
}
},
"RIPENCC": {
"description": "RIPENCC is the bot responsible to get the correspondent abuse contact from source IP and destination IP of the events. RIPEstat documentation: https://stat.ripe.net/docs/data_api ",
"module": "intelmq.bots.experts.ripencc_abuse_contact.expert",
@@ -0,0 +1 @@
requests>=2.2.0
No changes.
@@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
import requests
import json
from intelmq.lib.bot import Bot
class RIPEExpertBot(Bot):
def init(self):
self.overwrite = getattr(self.parameters, 'overwrite', False)
def process(self):
event = self.receive_message()
for key in ["source.%s", "destination.%s"]:
geo_key = key % "geolocation.%s"
if key % "ip" not in event:
continue
ip = event.get(key % "ip")
try:
data = requests.get('https://stat.ripe.net/data/geoloc/data.json?resource=' + str(ip)).content
info = (json.loads(data))['data']['locations'][0]
if info['country']:
event.add(geo_key % "cc", info['country'].split('-')[0],
overwrite=self.overwrite)
if info['latitude']:
event.add(geo_key % "latitude", info['latitude'],
overwrite=self.overwrite)
if info['longitude']:
event.add(geo_key % "longitude", info['longitude'],
overwrite=self.overwrite)
if info['city']:
event.add(geo_key % "city", info['city'],
overwrite=self.overwrite)
except:
pass
self.send_message(event)
self.acknowledge_message()
BOT = RIPEExpertBot
@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
"""
Testing RIPE Expert
"""
import unittest
import intelmq.lib.test as test
from intelmq.bots.experts.ripe.expert import RIPEExpertBot
EXAMPLE_INPUT1 = {"__type": "Event",
"source.ip": "96.30.37.204"
}
EXAMPLE_INPUT2 = {"__type": "Event",
"source.geolocation.cc": "IN",
"source.ip": "96.30.37.204"
}
EXAMPLE_OUTPUT1 = {"__type": "Event",
"source.ip": "96.30.37.204",
"source.geolocation.cc": "US",
"source.geolocation.city": "Lansing",
"source.geolocation.latitude": 42.7257,
"source.geolocation.longitude": -84.636
}
EXAMPLE_OUTPUT2 = {"__type": "Event",
"source.ip": "96.30.37.204",
"source.geolocation.cc": "US",
"source.geolocation.city": "Lansing",
"source.geolocation.latitude": 42.7257,
"source.geolocation.longitude": -84.636
}
EXAMPLE_OUTPUT3 = {"__type": "Event",
"source.ip": "96.30.37.204",
"source.geolocation.cc": "IN",
"source.geolocation.city": "Lansing",
"source.geolocation.latitude": 42.7257,
"source.geolocation.longitude": -84.636
}
class TestRIPEExpertBot(test.BotTestCase, unittest.TestCase):
"""
A TestCase for RIPEExpertBot.
"""
@classmethod
def set_bot(self):
self.bot_reference = RIPEExpertBot
def test(self):
self.input_message = EXAMPLE_INPUT1
self.run_bot()
self.assertMessageEqual(0, EXAMPLE_OUTPUT1)
def test_overwrite(self):
self.input_message = EXAMPLE_INPUT2
self.sysconfig = {"overwrite": True}
self.run_bot()
self.assertMessageEqual(0, EXAMPLE_OUTPUT2)
def test_not_overwrite(self):
self.input_message = EXAMPLE_INPUT2
self.sysconfig = {"overwrite": False}
self.run_bot()
self.assertMessageEqual(0, EXAMPLE_OUTPUT3)
if __name__ == '__main__': # pragma: no cover
unittest.main()

0 comments on commit f1cf742

Please sign in to comment.