forked from yandex-cloud/python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_channels.py
63 lines (51 loc) · 2.48 KB
/
_channels.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import grpc
import pkg_resources
from yandex.cloud.endpoint.api_endpoint_service_pb2 import ListApiEndpointsRequest
from yandex.cloud.endpoint.api_endpoint_service_pb2_grpc import ApiEndpointServiceStub
from yandexcloud import _auth_plugin
from yandexcloud._auth_fabric import YC_API_ENDPOINT, get_auth_token_requester
try:
VERSION = pkg_resources.get_distribution("yandexcloud").version
except pkg_resources.DistributionNotFound:
VERSION = "0.0.0"
SDK_USER_AGENT = "yandex-cloud-python-sdk/{version}".format(version=VERSION)
class Channels(object):
def __init__(self, client_user_agent=None, **kwargs):
self._channel_creds = grpc.ssl_channel_credentials(
root_certificates=kwargs.get("root_certificates"),
private_key=kwargs.get("private_key"),
certificate_chain=kwargs.get("certificate_chain"),
)
self._endpoint = kwargs.get("endpoint", YC_API_ENDPOINT)
self._token_requester = get_auth_token_requester(
token=kwargs.get("token"),
service_account_key=kwargs.get("service_account_key"),
iam_token=kwargs.get("iam_token"),
endpoint=self._endpoint,
)
self._unauthenticated_channel = None
self._channels = None
self._client_user_agent = client_user_agent
def channel_options(self):
return tuple(
("grpc.primary_user_agent", user_agent)
for user_agent in [self._client_user_agent, SDK_USER_AGENT]
if user_agent is not None
)
def channel(self, endpoint):
if not self._channels:
self._unauthenticated_channel = grpc.secure_channel(
self._endpoint, self._channel_creds, options=self.channel_options()
)
endpoint_service = ApiEndpointServiceStub(self._unauthenticated_channel)
resp = endpoint_service.List(ListApiEndpointsRequest())
endpoints = resp.endpoints
plugin = _auth_plugin.Credentials(self._token_requester, lambda: self._channels["iam"])
call_creds = grpc.metadata_call_credentials(plugin)
creds = grpc.composite_channel_credentials(self._channel_creds, call_creds)
self._channels = {
ep.id: grpc.secure_channel(ep.address, creds, options=self.channel_options()) for ep in endpoints
}
if endpoint not in self._channels:
raise RuntimeError("Unknown endpoint: {}".format(endpoint))
return self._channels[endpoint]