This repository has been archived by the owner on Jun 17, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
/
zzmq.py
66 lines (48 loc) · 1.61 KB
/
zzmq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import logging
from time import sleep
logger = logging.getLogger(__name__)
import logging
import os.path
from csirtg_indicator import Indicator
import zmq
from pprint import pprint
TYPE = os.environ.get('CSIRTG_SMRT_ZMQ_TYPE', 'PUB')
TOPIC = os.environ.get('CSIRTG_SMRT_ZMQ_TOPIC', 'scanners')
ENDPOINT = os.environ.get('CSIRTG_SMRT_ZMQ_ENDPOINT', 'ipc:///tmp/csirtg_smrt.ipc')
TYPE_MAPPING = {
'PUB': zmq.PUB,
'PUSH': zmq.PUSH,
'PUSH_ZYRE_GATEWAY': zmq.DEALER,
}
logger = logging.getLogger()
class _Zmq(object):
__name__ = 'zmq'
def __init__(self, socket_type=TYPE, topic=TOPIC, endpoint=ENDPOINT, *args, **kwargs):
self.socket_type = socket_type
self.topic = topic
self.endpoint = endpoint
if not endpoint:
raise ValueError("Invalid endpoint: '{}'".format(endpoint))
context = zmq.Context()
self.socket = context.socket(TYPE_MAPPING[socket_type])
self.start()
def start(self):
self.socket.connect(self.endpoint)
def stop(self):
self.socket.close()
def indicators_create(self, data):
if isinstance(data, dict):
data = Indicator(**data)
if isinstance(data, Indicator):
data = [data]
if self.socket_type == "PUB":
for i in data:
self.socket.send_multipart([self.topic, str(i)])
return
if self.socket_type == 'PUSH_ZYRE_GATEWAY':
for i in data:
self.socket.send_multipart(['PUB', self.topic, str(i)])
return
for i in data:
self.socket.send(str(i))
Plugin = _Zmq