-
Notifications
You must be signed in to change notification settings - Fork 17
/
spring_boot_admin_registration.py
154 lines (127 loc) · 5.97 KB
/
spring_boot_admin_registration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import http.client
import json
import logging
import os
import ssl
import threading
import urllib.parse
from base64 import b64encode
from datetime import datetime
from http.client import HTTPConnection, HTTPResponse
from typing import Optional, Dict
from pyctuator.auth import Auth, BasicAuth
# pylint: disable=too-many-instance-attributes
class BootAdminRegistrationHandler:
def __init__(
self,
registration_url: str,
registration_auth: Optional[Auth],
application_name: str,
pyctuator_base_url: str,
start_time: datetime,
service_url: str,
registration_interval_sec: float,
application_metadata: Optional[dict] = None
) -> None:
self.registration_url = registration_url
self.registration_auth = registration_auth
self.application_name = application_name
self.pyctuator_base_url = pyctuator_base_url
self.start_time = start_time
self.service_url = service_url if service_url.endswith("/") else service_url + "/"
self.registration_interval_sec = registration_interval_sec
self.instance_id = None
self.application_metadata = application_metadata if application_metadata else {}
self.should_continue_registration_schedule: bool = False
self.disable_certificate_validation_for_https_registration: bool = \
os.getenv("PYCTUATOR_REGISTRATION_NO_CERT") is not None
def _schedule_next_registration(self, registration_interval_sec: float) -> None:
timer = threading.Timer(
registration_interval_sec,
self._register_with_admin_server,
[]
)
timer.setDaemon(True)
timer.start()
def _register_with_admin_server(self) -> None:
# When waking up, make sure registration is still needed
if not self.should_continue_registration_schedule:
return
registration_data = {
"name": self.application_name,
"managementUrl": self.pyctuator_base_url,
"healthUrl": f"{self.pyctuator_base_url}/health",
"serviceUrl": self.service_url,
"metadata": {
"startup": self.start_time.isoformat(),
**self.application_metadata
}
}
logging.debug("Trying to post registration data to %s: %s", self.registration_url, registration_data)
conn: Optional[HTTPConnection] = None
try:
headers = {"Content-type": "application/json"}
self.authenticate(headers)
response = self._http_request(self.registration_url, "POST", headers, json.dumps(registration_data))
if response.status < 200 or response.status >= 300:
logging.warning("Failed registering with boot-admin, got %s - %s", response.status, response.read())
else:
self.instance_id = json.loads(response.read().decode('utf-8'))["id"]
except Exception as e: # pylint: disable=broad-except
logging.warning("Failed registering with boot-admin, %s (%s)", e, type(e))
finally:
if conn:
conn.close()
# Schedule the next registration unless asked to abort
if self.should_continue_registration_schedule:
self._schedule_next_registration(self.registration_interval_sec)
def deregister_from_admin_server(self) -> None:
if self.instance_id is None:
return
headers = {}
self.authenticate(headers)
deregistration_url = f"{self.registration_url}/{self.instance_id}"
logging.info("Deregistering from %s", deregistration_url)
conn: Optional[HTTPConnection] = None
try:
response = self._http_request(deregistration_url, "DELETE", headers)
if response.status < 200 or response.status >= 300:
logging.warning("Failed deregistering from boot-admin, got %s - %s", response.status, response.read())
except Exception as e: # pylint: disable=broad-except
logging.warning("Failed deregistering from boot-admin, %s (%s)", e, type(e))
finally:
if conn:
conn.close()
def authenticate(self, headers: Dict) -> None:
if isinstance(self.registration_auth, BasicAuth):
password = self.registration_auth.password if self.registration_auth.password else ""
authorization_string = self.registration_auth.username + ":" + password
encoded_authorization: str = b64encode(bytes(authorization_string, "utf-8")).decode("ascii")
headers["Authorization"] = f"Basic {encoded_authorization}"
def start(self, initial_delay_sec: float = None) -> None:
logging.info("Starting recurring registration of %s with %s",
self.pyctuator_base_url, self.registration_url)
self.should_continue_registration_schedule = True
self._schedule_next_registration(initial_delay_sec or self.registration_interval_sec)
def stop(self) -> None:
logging.info("Stopping recurring registration")
self.should_continue_registration_schedule = False
def _http_request(self, url: str, method: str, headers: Dict[str, str], body: Optional[str] = None) -> HTTPResponse:
url_parts = urllib.parse.urlsplit(url)
if url_parts.scheme == "http":
conn = http.client.HTTPConnection(url_parts.hostname, url_parts.port)
elif url_parts.scheme == "https":
context = None
if self.disable_certificate_validation_for_https_registration:
context = ssl.SSLContext()
context.verify_mode = ssl.CERT_NONE
conn = http.client.HTTPSConnection(url_parts.hostname, url_parts.port, context=context)
else:
raise ValueError(f"Unknown scheme in {url}")
conn.request(
method,
url_parts.path,
body=body,
headers=headers,
)
return conn.getresponse()