-
Notifications
You must be signed in to change notification settings - Fork 116
/
connection.py
2188 lines (1873 loc) · 86.6 KB
/
connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# A proxy interface to initiate and interact with candlepin.
#
# Copyright (c) 2010 - 2012 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#
import base64
from rhsm import certificate
import datetime
import dateutil.parser
import locale
import logging
import os
import socket
import sys
import time
import traceback
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from pathlib import Path
import re
import enum
from email.utils import format_datetime
from rhsm.https import httplib, ssl
from urllib.request import proxy_bypass
from urllib.parse import urlencode, urlparse, quote, quote_plus
from rhsm.config import get_config_parser
from rhsm import ourjson as json
from rhsm import utils
try:
import subscription_manager.version
subman_version = subscription_manager.version.pkg_version
except ImportError:
subman_version = "unknown"
try:
from subscription_manager.i18n import ugettext as _
except ImportError:
def _(message: str):
return message
config = get_config_parser()
MULTI_ENV = "multi_environment"
REUSE_CONNECTION = True
def safe_int(value: Any, safe_value: Any = None) -> Union[int, None, Any]:
try:
return int(value)
except Exception:
return safe_value
def normalized_host(host: str) -> str:
"""
When you want to use IPv6 address and port in e.g. HTTP header, then you cannot use following
notation common for IPv4 (147.230.16.1:53). You have to use following notation for IPv6
[2001:718:1c01:16::aa]:53.
:param host: hostname or IPv4 or IPv6 address
:return: When host is IPv6 address, then it encapsulated in [] brackets
"""
if ":" in host:
return "[%s]" % host
else:
return host
def get_time_drift(timestamp: str) -> datetime.timedelta:
"""Get a difference between server and local clock.
:param timestamp: A timezone-unaware timestamp in RFC 1123 format.
:returns: Absolute difference between server and local time.
"""
# RFC 1123: 'Fri, 12 Jan 2024 08:10:46 GMT'
timestamp: datetime.datetime = dateutil.parser.parse(timestamp)
if timestamp.tzinfo.tzname(timestamp) != "UTC":
log.warning(f"Expected UTC timestamp, got '{timestamp}', drift check may be off.")
# dateutil has its own tzinfo object representing UTC
timestamp = timestamp.replace(tzinfo=datetime.timezone.utc)
now = datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0)
drift: datetime.timedelta = abs(timestamp - now)
return drift
class NullHandler(logging.Handler):
def emit(self, record: Any) -> None:
pass
h = NullHandler()
logging.getLogger("rhsm").addHandler(h)
log = logging.getLogger(__name__)
class NoValidEntitlement(Exception):
"""Throw when there is no valid entitlement certificate for accessing CDN"""
pass
class ConnectionException(Exception):
pass
class ProxyException(Exception):
"""
Thrown in case of errors related to the proxy server.
"""
def __init__(self, hostname: str = None, port: int = None, exc: Optional[Exception] = None):
self._hostname = hostname
self.port = port
self.exc = exc
@property
def hostname(self) -> str:
return normalized_host(self._hostname)
@property
def address(self) -> str:
return f"{self.hostname}:{self.port}"
def __str__(self) -> str:
addr = self.address
err = f"Proxy error at {addr}"
if self.exc is not None:
err = f"{err}: {self.exc}"
return err
class ConnectionSetupException(ConnectionException):
pass
class BadCertificateException(ConnectionException):
"""Thrown when an error parsing a certificate is encountered."""
def __init__(self, cert_path: str, ssl_exc: ssl.SSLError) -> None:
"""Pass the full path to the bad certificate."""
self.cert_path = cert_path
self.ssl_exc = ssl_exc
def __str__(self) -> str:
return "Bad certificate at %s" % self.cert_path
class ConnectionOSErrorException(ConnectionException):
"""
Thrown in case of OSError during the connect() of HTTPSConnection,
in case the OSError does not come from a syscall failure (and thus
its 'errno' attribute is None.
"""
def __init__(self, host: str, port: int, handler: str, exc: OSError):
self._host = host
self.port = port
self.handler = handler
self.exc = exc
@property
def host(self) -> str:
return normalized_host(self._host)
class ConnectionType(enum.Enum):
"""
Enumerate of allowed connection types
"""
# Connection uses no authentication
NO_AUTH = enum.auto()
# Connection uses basic authentication (username and password)
BASIC_AUTH = enum.auto()
# Connection uses consumer certificate for authentication
CONSUMER_CERT_AUTH = enum.auto()
# Connection uses Keycloak token
KEYCLOAK_AUTH = enum.auto()
class BaseConnection:
def __init__(
self,
host: Optional[str] = None,
ssl_port: Optional[int] = None,
handler: Optional[str] = None,
ca_dir: Optional[str] = None,
insecure: Optional[bool] = None,
proxy_hostname: Optional[str] = None,
proxy_port: Optional[int] = None,
proxy_user: Optional[str] = None,
proxy_password: Optional[str] = None,
no_proxy: Optional[bool] = None,
username: Optional[str] = None,
password: Optional[str] = None,
cert_file: Optional[str] = None,
key_file: Optional[str] = None,
cert_dir: Optional[str] = None,
token: Optional[str] = None,
user_agent: Optional[str] = None,
correlation_id: Optional[str] = None,
timeout: Optional[int] = None,
auth_type: Optional[ConnectionType] = None,
**kwargs,
) -> None:
self.host = host or config.get("server", "hostname")
self.handler = handler or config.get("server", "prefix")
self.ssl_port = ssl_port or safe_int(config.get("server", "port"))
self.timeout = timeout or safe_int(config.get("server", "server_timeout"))
# allow specifying no_proxy via api or config
no_proxy_override = no_proxy or config.get("server", "no_proxy")
if no_proxy_override:
os.environ["no_proxy"] = no_proxy_override
utils.fix_no_proxy()
log.debug("Environment variable NO_PROXY=%s will be used" % no_proxy_override)
# honor no_proxy environment variable
if proxy_bypass(self.host):
self.proxy_hostname = None
self.proxy_port = None
self.proxy_user = None
self.proxy_password = None
else:
info = utils.get_env_proxy_info()
if proxy_hostname is not None:
self.proxy_hostname = proxy_hostname
else:
self.proxy_hostname = config.get("server", "proxy_hostname") or info["proxy_hostname"]
if proxy_port is not None:
self.proxy_port = proxy_port
else:
self.proxy_port = config.get("server", "proxy_port") or info["proxy_port"]
if proxy_user is not None:
self.proxy_user = proxy_user
else:
self.proxy_user = config.get("server", "proxy_user") or info["proxy_username"]
if proxy_password is not None:
self.proxy_password = proxy_password
else:
self.proxy_password = config.get("server", "proxy_password") or info["proxy_password"]
self.cert_file = cert_file
self.key_file = key_file
self.username = username
self.password = password
self.token = token
self.auth_type = auth_type
self.ca_dir = ca_dir or config.get("rhsm", "ca_cert_dir")
self.insecure = insecure
if insecure is None:
self.insecure = False
config_insecure = safe_int(config.get("server", "insecure"))
if config_insecure:
self.insecure = True
using_basic_auth = False
using_id_cert_auth = False
using_ent_cert_auth = False
using_keycloak_auth = False
if username and password:
using_basic_auth = True
elif cert_file and key_file:
using_id_cert_auth = True
elif cert_dir:
using_ent_cert_auth = True
elif token:
using_keycloak_auth = True
if (
len(
[
value
for value in (
using_basic_auth,
using_id_cert_auth,
using_keycloak_auth,
using_ent_cert_auth,
)
if value
]
)
> 1
):
raise Exception("Cannot specify multiple auth types")
proxy_description = None
if self.proxy_hostname and self.proxy_port:
proxy_description = "http_proxy=%s:%s " % (
normalized_host(self.proxy_hostname),
safe_int(self.proxy_port),
)
# initialize connection
self.conn: BaseRestLib = BaseRestLib(
self.host,
self.ssl_port,
self.handler,
username=self.username,
password=self.password,
token=self.token,
cert_file=self.cert_file,
key_file=self.key_file,
proxy_hostname=self.proxy_hostname,
proxy_port=self.proxy_port,
proxy_user=self.proxy_user,
proxy_password=self.proxy_password,
ca_dir=self.ca_dir,
insecure=self.insecure,
cert_dir=cert_dir,
timeout=self.timeout,
correlation_id=correlation_id,
user_agent=user_agent,
auth_type=auth_type,
)
if using_keycloak_auth:
auth_description = "auth=bearer %s" % token
elif using_basic_auth:
auth_description = "auth=basic username=%s" % username
elif using_id_cert_auth:
auth_description = "auth=identity_cert ca_dir=%s insecure=%s" % (self.ca_dir, self.insecure)
elif using_ent_cert_auth:
auth_description = "auth=entitlement_certs"
else:
auth_description = "auth=none"
self.resources = None
self.capabilities = None
connection_description = ""
if proxy_description:
connection_description += proxy_description
connection_description += "host=%s port=%s handler=%s %s" % (
normalized_host(self.host),
safe_int(self.ssl_port),
self.handler,
auth_description,
)
log.debug("Connection built: %s", connection_description)
class TokenAuthException(Exception):
pass
class KeycloakConnection(BaseConnection):
"""
Keycloak Based Authentication
"""
def __init__(self, realm: Any, auth_url: str, resource: Any, **kwargs) -> None:
host = urlparse(auth_url).hostname or ""
handler = urlparse(auth_url).path
ssl_port = urlparse(auth_url).port or 443
super(KeycloakConnection, self).__init__(host=host, ssl_port=ssl_port, handler=handler, **kwargs)
self.realm = realm
self.resource = resource
def get_access_token_through_refresh(self, refreshtoken: Any) -> Optional[Any]:
# Get access token in exchange for refresh token
method = "/realms/" + self.realm + "/protocol/openid-connect/token"
params = {"client_id": self.resource, "grant_type": "refresh_token", "refresh_token": refreshtoken}
headers = {"Content-type": "application/x-www-form-urlencoded"}
try:
data = self.conn.request_post(method, params, headers)
return data["access_token"]
except RestlibException as e:
if e.code == 400:
raise TokenAuthException(e.msg)
raise
class RestlibException(ConnectionException):
"""
Raised when a response with a valid json body is received along with a status code
that is not in [200, 202, 204, 410, 429]
See BaseRestLib.validateResult to see when this and other exceptions are raised.
"""
def __init__(self, code: int, msg: str = None, headers: dict = None) -> None:
self.code = code
self.msg = msg or ""
self.headers = headers or {}
@property
def title(self) -> str:
return httplib.responses.get(self.code, "Unknown")
def __str__(self) -> str:
return f"HTTP error ({self.code} - {self.title}): {self.msg}"
class GoneException(RestlibException):
"""
GoneException is used to detect when a consumer has been deleted on the candlepin side.
A client handling a GoneException should verify that GoneException.deleted_id
matches the consumer uuid before taking any action (like deleting the consumer
cert from disk).
This is to prevent an errant 410 response from candlepin (or a reverse_proxy in
front of it, or it's app server, or an injected response) from causing
accidental consumer cert deletion.
"""
def __init__(self, code: int, msg: str, deleted_id: Any):
# Exception doesn't inherit from object on el5 python version
RestlibException.__init__(self, code, msg)
self.deleted_id = deleted_id
class UnknownContentException(ConnectionException):
"""
Thrown when the response of a request has no valid json content
and the http status code is anything other than the following:
[200, 202, 204, 401, 403, 410, 429, 500, 502, 503, 504]
"""
def __init__(self, code: int, content_type: Optional[str] = None, content: Optional[str] = None) -> None:
self.code = code
self.content_type = content_type
self.content = content
@property
def title(self) -> str:
return httplib.responses.get(self.code, "Unknown")
def __str__(self) -> str:
s = f"Unknown content error (HTTP {self.code} - {self.title}"
if self.content_type is not None:
s += f", type {self.content_type}"
if self.content is not None:
s += f", len {len(self.content)}"
s += ")"
return s
class RemoteServerException(ConnectionException):
"""
Thrown when the response to a request has no valid json content and
one of these http status codes: [404, 410, 500, 502, 503, 504]
"""
def __init__(self, code: int, request_type: str = None, handler: str = None) -> None:
self.code = code
self.request_type = request_type
self.handler = handler
def __str__(self) -> str:
if self.request_type and self.handler:
return "Server error attempting a %s to %s returned status %s" % (
self.request_type,
self.handler,
self.code,
)
return "Server returned %s" % self.code
class AuthenticationException(RemoteServerException):
prefix = "Authentication error"
def __str__(self) -> str:
buf = super(AuthenticationException, self).__str__()
buf += "\n"
buf += "%s: Invalid credentials for request." % self.prefix
return buf
class RateLimitExceededException(RestlibException):
"""
Thrown in response to a http code 429.
This means that too many requests have been made in a given time period.
The retry_after attribute is an int of seconds to retry the request after.
The retry_after attribute may not be included in the response.
"""
def __init__(self, code: int, msg: str = None, headers: str = None) -> None:
super(RateLimitExceededException, self).__init__(code, msg)
self.headers = headers or {}
self.retry_after = safe_int(self.headers.get("retry-after"))
self.msg = msg or "Access rate limit exceeded"
if self.retry_after is not None:
self.msg += ", retry access after: %s seconds." % self.retry_after
class UnauthorizedException(AuthenticationException):
"""
Thrown in response to http status code 401 with no valid json content
"""
prefix = "Unauthorized"
class ForbiddenException(AuthenticationException):
"""
Thrown in response to http status code 403 with no valid json content
"""
prefix = "Forbidden"
class ExpiredIdentityCertException(ConnectionException):
pass
def _encode_auth(username, password):
encoded = base64.b64encode(":".join((username, password)).encode("utf-8")).decode("utf-8")
return "Basic %s" % encoded
# FIXME: this is terrible, we need to refactor
# Restlib to be Restlib based on a https client class
class ContentConnection(BaseConnection):
def __init__(self, cert_dir: str = None, **kwargs) -> None:
log.debug("ContentConnection")
user_agent = "RHSM-content/1.0 (cmd=%s)" % utils.cmd_name(sys.argv)
if "client_version" in kwargs:
user_agent += kwargs["client_version"]
cert_dir = cert_dir or "/etc/pki/entitlement"
super(ContentConnection, self).__init__(
handler="/", cert_dir=cert_dir, user_agent=user_agent, **kwargs
)
def get_versions(self, path: str, cert_key_pairs: Iterable[Tuple[str, str]] = None) -> Union[dict, None]:
"""
Get list of available release versions from the given path
:param path: path, where is simple text file containing supported release versions
:param cert_key_pairs: optional argument including list of supported cert and keys
to reduce number of failed http requests.
:return:
"""
handler = "%s/%s" % (self.handler, path)
result = self.conn.request_get(handler, cert_key_pairs=cert_key_pairs)
return result
def _get_versions_for_product(self, product_id) -> None:
pass
def _get_locale() -> Union[None, str]:
new_locale = None
try:
new_locale = locale.getlocale()
except (locale.Error, ValueError):
try:
new_locale = locale.getdefaultlocale()
except locale.Error:
pass
except ValueError:
pass
if new_locale and new_locale != (None, None):
return new_locale[0]
return None
class BaseRestLib:
"""
A low-level wrapper around httplib
to make rest calls easy and expose the details of
responses
"""
__conn = None
ALPHA: float = 0.9
# Default value of timeout. This value is set according observed timeout
# on typical installations of candlepin server (hosted 75 seconds,
# tomcat 60 seconds)
KEEP_ALIVE_TIMEOUT: int = 50
def __init__(
self,
host: str,
ssl_port: int,
apihandler: str,
username: Optional[str] = None,
password: Optional[str] = None,
proxy_hostname: Optional[str] = None,
proxy_port: Optional[int] = None,
proxy_user: Optional[str] = None,
proxy_password: Optional[str] = None,
cert_file: Optional[str] = None,
key_file: Optional[str] = None,
cert_dir: Optional[str] = None,
ca_dir: Optional[str] = None,
insecure: Optional[bool] = False,
timeout: Optional[int] = None,
correlation_id: Optional[str] = None,
token: Optional[str] = None,
user_agent: Optional[str] = None,
auth_type: Optional[ConnectionType] = None,
) -> None:
log.debug("Creating new BaseRestLib instance")
self.host = host
self.ssl_port = ssl_port
self.apihandler = apihandler
# Default, updated by UepConnection
self.user_agent = user_agent or "python-rhsm-user-agent"
self.headers = {
"Content-type": "application/json",
"Accept": "application/json",
"x-subscription-manager-version": subman_version,
}
if correlation_id:
self.headers["X-Correlation-ID"] = correlation_id
self.cert_file = cert_file
self.key_file = key_file
self.cert_dir = cert_dir
self.ca_dir = ca_dir
self.insecure = insecure
self.username = username
self.password = password
self.timeout = timeout
self.proxy_hostname = proxy_hostname
self.proxy_port = proxy_port
self.proxy_user = proxy_user
self.proxy_password = proxy_password
self.smoothed_rt = None
self.token = token
self.auth_type = auth_type
# We set this to None, because we don't know the truth unless we get
# first response from the server using cert/key connection
self.is_consumer_cert_key_valid = None
# Setup basic authentication if specified:
if username and password:
self.headers["Authorization"] = _encode_auth(username, password)
elif token:
self.headers["Authorization"] = "Bearer " + token
def close_connection(self) -> None:
"""
Try to close connection to server
:return: None
"""
if self.__conn is not None:
# Do proper TLS shutdown handshake (TLS tear down) first
if self.__conn.sock is not None:
log.debug(f"Closing HTTPS connection {self.__conn.sock}")
try:
self.__conn.sock.unwrap()
except ssl.SSLError as err:
log.debug(f"Unable to close TLS connection properly: {err}")
else:
log.debug("TLS connection closed")
# Then it is possible to close TCP connection
self.__conn.close()
self.__conn = None
def _get_cert_key_list(self) -> List[Tuple[str, str]]:
"""
Create list of cert-key pairs to be used with the connection
"""
cert_key_pairs = []
if self.cert_dir is None:
return [(self.cert_file, self.key_file)]
for cert_file in os.listdir(self.cert_dir):
if cert_file.endswith(".pem") and not cert_file.endswith("-key.pem"):
cert_path = os.path.join(self.cert_dir, cert_file)
key_path = os.path.join(self.cert_dir, "%s-key.pem" % cert_file.split(".", 1)[0])
cert_key_pairs.append((cert_path, key_path))
return cert_key_pairs
def _load_ca_certificates(self, context: ssl.SSLContext) -> None:
"""
Tries to load CA certificates to SSL context
:param context: SSL context
:return: None
"""
if not os.path.isdir(self.ca_dir):
log.warning('Directory "%s" with CA certificates is missing' % self.ca_dir)
return None
loaded_ca_certs = []
cert_path = ""
try:
for cert_file in os.listdir(self.ca_dir):
if cert_file.endswith(".pem"):
cert_path = os.path.join(self.ca_dir, cert_file)
context.load_verify_locations(cert_path)
loaded_ca_certs.append(cert_file)
except ssl.SSLError as exc:
raise BadCertificateException(cert_path, exc)
except OSError as e:
raise ConnectionSetupException(e.strerror)
if loaded_ca_certs:
log.debug("Loaded CA certificates from %s: %s" % (self.ca_dir, ", ".join(loaded_ca_certs)))
else:
log.warning("Unable to load any CA certificate from: %s" % self.ca_dir)
def _create_connection(self, cert_file: str = None, key_file: str = None) -> httplib.HTTPSConnection:
"""
This method tries to return existing connection, when connection exists and limit of connection
has not been reached (timeout, max number of requests). When no connection exists, then this
method creates new TCP and TLS connection.
"""
if self.__conn is not None:
# Check if it is still possible to use existing connection
now = time.time()
if now - self.__conn.last_request_time > self.__conn.keep_alive_timeout:
log.debug(f"Connection timeout {self.__conn.keep_alive_timeout}. Closing connection...")
self.close_connection()
elif (
self.__conn.max_requests_num is not None
and self.__conn.requests_num > self.__conn.max_requests_num
):
log.debug(
f"Maximal number of requests ({self.__conn.max_requests_num}) reached. "
"Closing connection..."
)
self.close_connection()
else:
log.debug("Reusing connection: %s", self.__conn.sock)
return self.__conn
log.debug("Creating new connection")
# Select the highest TLS version supported by both the client and the server.
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
if self.insecure:
# Allow clients to connect to servers with missing or invalid certificates.
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
else:
context.verify_mode = ssl.CERT_REQUIRED
if self.ca_dir is not None:
self._load_ca_certificates(context)
if cert_file and os.path.exists(cert_file):
context.load_cert_chain(cert_file, keyfile=key_file)
if self.proxy_hostname and self.proxy_port:
log.debug(
"Using proxy: %s:%s" % (normalized_host(self.proxy_hostname), safe_int(self.proxy_port))
)
proxy_headers = {
"User-Agent": self.user_agent,
"Host": "%s:%s" % (normalized_host(self.host), safe_int(self.ssl_port)),
}
if self.proxy_user and self.proxy_password:
proxy_headers["Proxy-Authorization"] = _encode_auth(self.proxy_user, self.proxy_password)
# Note: we use only HTTPS for connection with proxy server, and we ignore proxy_scheme setting
# from rhsm.conf here. The proxy_scheme is used only for generating redhat.repo. It is even worse.
# The default value of proxy_scheme is http (not https). It could be very confusing.
conn = httplib.HTTPSConnection(
self.proxy_hostname, self.proxy_port, context=context, timeout=self.timeout
)
conn.set_tunnel(self.host, safe_int(self.ssl_port), proxy_headers)
self.headers["Host"] = "%s:%s" % (normalized_host(self.host), safe_int(self.ssl_port))
else:
conn = httplib.HTTPSConnection(self.host, self.ssl_port, context=context, timeout=self.timeout)
# Set default keep-alive connection timeout in case server does not
# send HTTP header Keep-Alive with information about timeout
conn.keep_alive_timeout = self.KEEP_ALIVE_TIMEOUT
# Number of requests
conn.requests_num = 0
# Maximal number of requests. None means no limits, when server does not
conn.max_requests_num = None
# Do TCP and TLS handshake here before we make any request
try:
conn.connect()
except OSError as e:
# in case this OSError does not have an errno set, it means it was
# not a syscall failure; mostly (if at all) this is raisen on proxy
# connection failures
if e.errno is None:
# wrap this to carry also the details on the destination host
raise ConnectionOSErrorException(self.host, self.ssl_port, self.apihandler, e)
raise
log.debug(f"Created connection: {conn.sock}")
# Store connection object only in the case, when it is not forbidden
if REUSE_CONNECTION is True:
self.__conn = conn
return conn
def _print_debug_info_about_request(
self, request_type: str, handler: str, final_headers: dict, body: Union[dict, Any]
) -> None:
"""
This method can print debug information about sent http request. We do not use
httplib.HTTPConnection.debuglevel = 1, because it doesn't provide control about displayed information.
The debug print is printed to stdout, when environment variable SUBMAN_DEBUG_PRINT_REQUEST is set.
Output can be modified with following environment variables:
* SUBMAN_DEBUG_PRINT_REQUEST_HEADER
* SUBMAN_DEBUG_PRINT_REQUEST_BODY
:param request_type: (GET, POST, PUT, ...)
:param handler: e.g. /candlepin/status
:param final_headers: HTTP headers used by request
:param body: request can contain body. It is usually dictionary, but it can be anything that
can be serialized by json.dumps()
:return: None
"""
if os.environ.get("SUBMAN_DEBUG_PRINT_REQUEST", "") == "":
return
print()
# Print information about TCP/IP layer
if (
os.environ.get("SUBMAN_DEBUG_TCP_IP", "")
and self.__conn is not None
and self.__conn.sock is not None
):
print(utils.colorize("TCP socket:", utils.COLOR.GREEN))
print(utils.colorize(f"{self.__conn.sock}", utils.COLOR.BLUE))
# When proxy server is used, then print some additional information about proxy connection
if self.proxy_hostname and self.proxy_port:
print(utils.colorize("Proxy:", utils.COLOR.GREEN))
# Note: using only https:// is not a mistake. We use only https for proxy connection.
proxy_msg: str = "https://"
# Print username and eventually password
if self.proxy_user:
if self.proxy_user and self.proxy_password:
proxy_msg += f"{self.proxy_user}:{self.proxy_password}@"
elif self.proxy_user and not self.proxy_password:
proxy_msg += f"{self.proxy_user}@"
# Print hostname and port
proxy_msg += f"{normalized_host(self.proxy_hostname)}:{safe_int(self.proxy_port)}"
print(utils.colorize(proxy_msg, utils.COLOR.MAGENTA))
# Print HTTP headers used for proxy connection
tunnel_msg = ""
tunnel_headers = None
if self.__conn is not None and hasattr(self.__conn, "_tunnel_headers"):
tunnel_headers = getattr(self.__conn, "_tunnel_headers")
if tunnel_headers is not None:
tunnel_msg = f"{tunnel_headers}"
if tunnel_msg:
print(utils.colorize(tunnel_msg, utils.COLOR.BLUE))
auth = ""
if self.insecure:
auth = "insecure "
if self.auth_type == ConnectionType.KEYCLOAK_AUTH:
auth += "keycloak auth"
elif self.auth_type == ConnectionType.BASIC_AUTH:
auth += "basic auth"
elif self.auth_type == ConnectionType.CONSUMER_CERT_AUTH:
auth += "consumer auth"
elif self.auth_type == ConnectionType.NO_AUTH:
auth += "no auth"
else:
auth += "undefined auth"
print(utils.colorize("Request:", utils.COLOR.GREEN))
print(
utils.colorize(
f"{request_type} "
+ "https://"
+ f"{normalized_host(self.host)}:{safe_int(self.ssl_port)}{handler}",
utils.COLOR.RED,
)
+ " using "
+ utils.colorize(f"{auth}", utils.COLOR.BLUE)
)
if os.environ.get("SUBMAN_DEBUG_PRINT_REQUEST_HEADER", ""):
print(utils.colorize("Request headers:", utils.COLOR.GREEN))
print(utils.colorize(f"{final_headers}", utils.COLOR.BLUE))
if os.environ.get("SUBMAN_DEBUG_PRINT_REQUEST_BODY", "") and body is not None:
print(utils.colorize("Request body:", utils.COLOR.GREEN))
print(utils.colorize(f"{body}", utils.COLOR.YELLOW))
if os.environ.get("SUBMAN_DEBUG_PRINT_TRACEBACKS", ""):
print(utils.colorize("Current call stack:", utils.COLOR.GREEN))
traceback.print_stack(file=sys.stdout)
if os.environ.get("SUBMAN_DEBUG_SAVE_TRACEBACKS", ""):
debug_dir = Path("/tmp/rhsm/")
debug_dir.mkdir(exist_ok=True)
timestamp: str = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# make sure we don't overwrite previous logs
i = 0
while True:
filename = Path(f"{timestamp}_{i}.log")
debug_log = debug_dir / filename
if not debug_log.exists():
break
i += 1
with debug_log.open("w", encoding="utf-8") as handle:
traceback.print_stack(file=handle)
print(utils.colorize("Call stack file:", utils.COLOR.GREEN))
print(f"{str(debug_log)}")
print()
@staticmethod
def _print_debug_info_about_response(result: dict) -> None:
"""
This method can print result of HTTP request to stdout, when
environment variable SUBMAN_DEBUG_PRINT_RESPONSE is set
:param result: response from candlepin server
:return: None
"""
if os.environ.get("SUBMAN_DEBUG_PRINT_RESPONSE", ""):
print(utils.colorize("Response:", utils.COLOR.GREEN))
print(utils.colorize(f"{result['status']}", utils.COLOR.RED))
print(utils.colorize("Response headers:", utils.COLOR.GREEN))
print(utils.colorize(f"{result['headers']}", utils.COLOR.BLUE))
if result["content"]:
print(utils.colorize("Response body:", utils.COLOR.GREEN))
print(utils.colorize(f"{result['content']}", utils.COLOR.YELLOW))
print()
def _set_accept_language_in_header(self) -> None:
"""
Set accepted language in http header according current settings or environment variable
:return: None
"""
try:
import subscription_manager.i18n
try:
language = subscription_manager.i18n.LOCALE.language
except AttributeError:
language = None
except ImportError:
language = None
if language is None:
lc = _get_locale()
else:
lc = language
if lc:
self.headers["Accept-Language"] = lc.lower().replace("_", "-").split(".", 1)[0]
@staticmethod
def parse_keep_alive_header(keep_alive_header: str) -> Tuple[Union[None, int], Union[None, int]]:
"""
Try to parse 'Keep-Alive' header received from candlepin server
:param keep_alive_header: string with value of the header
:return: Tuple containing connection timeout and maximal number of requests
"""
keep_alive_timeout = None
max_requests_num = None
# Regular expression pattern represents: key=number
pattern = re.compile(r"^(.*)=(\d+)[,;]*$")
items = keep_alive_header.split()
for item in items:
search_result = pattern.search(item)
if search_result is not None: