forked from pantsbuild/pants
-
Notifications
You must be signed in to change notification settings - Fork 0
/
basic_auth.py
116 lines (89 loc) · 4.13 KB
/
basic_auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# Copyright 2018 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
from collections import namedtuple
from dataclasses import dataclass
from typing import Dict, Optional
import requests
import www_authenticate
from pants.auth.cookies import Cookies
from pants.subsystem.subsystem import Subsystem
from pants.version import VERSION
@dataclass(frozen=True)
class Authentication:
headers: Dict[str, str]
request_args: Dict[str, str]
class BasicAuthException(Exception):
pass
class BasicAuthAttemptFailed(BasicAuthException):
def __init__(self, url, status_code, reason):
msg = "Failed to auth against {}. Status code: {}. Reason: {}.".format(
url, status_code, reason
)
super().__init__(msg)
self.url = url
class Challenged(BasicAuthAttemptFailed):
def __init__(self, url, status_code, reason, realm):
super().__init__(url, status_code, reason)
self.realm = realm
BasicAuthCreds = namedtuple("BasicAuthCreds", ["username", "password"])
class BasicAuth(Subsystem):
"""Support for HTTP basicauth."""
options_scope = "basicauth"
@classmethod
def register_options(cls, register):
super().register_options(register)
register(
"--providers",
type=dict,
help="Map from provider name to config dict. This dict contains the following items: "
"{provider_name: <url of endpoint that accepts basic auth and sets a session "
"cookie>}. For example, `{'prod': 'https://app.pantsbuild.org/auth'}`.",
)
register(
"--allow-insecure-urls",
advanced=True,
type=bool,
default=False,
help="Allow auth against non-HTTPS urls. Must only be set when testing!",
)
@classmethod
def subsystem_dependencies(cls):
return super().subsystem_dependencies() + (Cookies,)
def authenticate(
self,
provider: str,
creds: Optional[BasicAuthCreds] = None,
cookies: Optional[Cookies] = None,
) -> None:
"""Authenticate against the specified provider.
:param provider: Authorize against this provider.
:param creds: The creds to use. If unspecified, assumes that creds are set in the netrc file.
:param cookies: Store the auth cookies in this instance. If unspecified, uses the global instance.
:raises pants.auth.basic_auth.BasicAuthException: If auth fails due to misconfiguration or
rejection by the server.
"""
cookies = cookies or Cookies.global_instance()
if not provider:
raise BasicAuthException("No basic auth provider specified.")
provider_config = self.options.providers.get(provider)
if not provider_config:
raise BasicAuthException(f"No config found for provider {provider}.")
url = provider_config.get("url")
if not url:
raise BasicAuthException(f"No url found in config for provider {provider}.")
if not self.options.allow_insecure_urls and not url.startswith("https://"):
raise BasicAuthException(f"Auth url for provider {provider} is not secure: {url}.")
auth = requests.auth.HTTPBasicAuth(creds.username, creds.password) if creds else None
response = requests.get(url, auth=auth, headers={"User-Agent": f"pants/v{VERSION}"})
if response.status_code != requests.codes.ok:
if response.status_code == requests.codes.unauthorized:
parsed = www_authenticate.parse(response.headers.get("WWW-Authenticate", ""))
if "Basic" in parsed:
raise Challenged(
url, response.status_code, response.reason, parsed["Basic"]["realm"]
)
raise BasicAuthException(url, response.status_code, response.reason)
cookies.update(response.cookies)
def get_auth_for_provider(self, auth_provider: Optional[str]) -> Authentication:
cookies = Cookies.global_instance()
return Authentication(headers={}, request_args={"cookies": cookies.get_cookie_jar()})