-
Notifications
You must be signed in to change notification settings - Fork 11
/
endpoint.py
102 lines (82 loc) · 2.83 KB
/
endpoint.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"""DefCon Endpoint plugin."""
import requests
import logging
from defcon.plugins import base
class EndpointPlugin(base.Plugin):
"""DefCon Endpoint plugin.
Set an endpoint to get a defcon level.
The endpoint should have the info {...'defcon':[1-5],...}
Config:
```python
{
'url': 'http://defcon.com/api/simple/production/', // Url to endpoint
}
```
response from the url should be like the DefCon simple API:
{
"name": "Production",
"contact": "production@prod.com",
"link": "https://confluence/production+Home",
"defcon": 3,
"description": "fooo"
}
"""
def __init__(self, config=None):
"""Create an instance of the plugin."""
super(EndpointPlugin, self).__init__(config)
if config is None:
config = {}
self.url = config.get('url')
@property
def short_name(self):
"""Return the short name."""
return 'endpoint'
@property
def name(self):
"""Return the name."""
return 'endpoint'
@property
def description(self):
"""Return the description."""
return 'Returns statuses based on endpoint.'
@property
def link(self):
"""Return the link."""
return 'https://github.com/criteo/defcon'
def _get_defcon_from_url(self, url):
r = requests.get(self.url)
r.raise_for_status()
return (r.json())
def statuses(self):
"""Return the generated statuses."""
ret = {}
if self._config is None:
return ret
try:
payload = self._get_defcon_from_url(self.url)
except requests.exceptions.RequestException as e:
# catch requests exceptions and create a false payload
logging.exception(e)
payload = {
"name": "Failed to get info from {}".format(self.url),
"contact": "Failed to get info from {}".format(self.url),
"link": "Failed to get info from {}".format(self.url),
"defcon": 5,
"description": "Failed to get info from {}".format(self.url)
}
status = self._to_status(payload, self.url)
if status is not None:
ret[status['id']] = status
return ret
def _to_status(self, payload, url):
"""Return a status or None."""
logging.debug('Handling %s' % (url))
# if endpoint doesn't have all the infos
# default message will be send
status = base.Status(
title=payload.get('name', "name not found from {}".format(url)),
link=payload.get('link', "link not from {}".format(url)),
defcon=payload.get('defcon', 5),
description=payload.get('description', "description not found from {}".format(url)),
)
return status