Skip to content

Commit

Permalink
feat: add keep alive timeout (#182)
Browse files Browse the repository at this point in the history
* feat: add keep alive timeout

* feat: override channel settings
  • Loading branch information
HemangChothani committed Feb 9, 2021
1 parent e28e770 commit e9637cb
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 13 deletions.
61 changes: 57 additions & 4 deletions google/cloud/bigtable/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@

from google.cloud import bigtable_v2
from google.cloud import bigtable_admin_v2
from google.cloud.bigtable_v2.gapic.transports import bigtable_grpc_transport
from google.cloud.bigtable_admin_v2.gapic.transports import (
bigtable_table_admin_grpc_transport,
bigtable_instance_admin_grpc_transport,
)

from google.cloud.bigtable import __version__
from google.cloud.bigtable.instance import Instance
Expand All @@ -60,13 +65,14 @@
"""Scope for reading table data."""


def _create_gapic_client(client_class, client_options=None):
def _create_gapic_client(client_class, client_options=None, transport=None):
def inner(self):
if self._emulator_host is None:
return client_class(
credentials=self._credentials,
credentials=None,
client_info=self._client_info,
client_options=client_options,
transport=transport,
)
else:
return client_class(
Expand Down Expand Up @@ -161,7 +167,13 @@ def __init__(
self._emulator_channel = None

if self._emulator_host is not None:
self._emulator_channel = grpc.insecure_channel(self._emulator_host)
self._emulator_channel = grpc.insecure_channel(
target=self._emulator_host,
options={
"grpc.keepalive_time_ms": 30000,
"grpc.keepalive_timeout_ms": 10000,
}.items(),
)

if channel is not None:
warnings.warn(
Expand Down Expand Up @@ -196,6 +208,29 @@ def _get_scopes(self):

return scopes

def _create_gapic_client_channel(self, client_class, grpc_transport):
if self._client_options and self._client_options.api_endpoint:
api_endpoint = self._client_options.api_endpoint
else:
api_endpoint = client_class.SERVICE_ADDRESS

channel = grpc_transport.create_channel(
api_endpoint,
self._credentials,
options={
"grpc.max_send_message_length": -1,
"grpc.max_receive_message_length": -1,
"grpc.keepalive_time_ms": 30000,
"grpc.keepalive_timeout_ms": 10000,
}.items(),
)
transport = grpc_transport(
address=api_endpoint,
channel=channel,
credentials=None,
)
return transport

@property
def project_path(self):
"""Project name to be used with Instance Admin API.
Expand Down Expand Up @@ -236,8 +271,14 @@ def table_data_client(self):
:returns: A BigtableClient object.
"""
if self._table_data_client is None:
transport = self._create_gapic_client_channel(
bigtable_v2.BigtableClient,
bigtable_grpc_transport.BigtableGrpcTransport,
)
klass = _create_gapic_client(
bigtable_v2.BigtableClient, client_options=self._client_options
bigtable_v2.BigtableClient,
client_options=self._client_options,
transport=transport,
)
self._table_data_client = klass(self)
return self._table_data_client
Expand All @@ -262,9 +303,15 @@ def table_admin_client(self):
if self._table_admin_client is None:
if not self._admin:
raise ValueError("Client is not an admin client.")

transport = self._create_gapic_client_channel(
bigtable_admin_v2.BigtableTableAdminClient,
bigtable_table_admin_grpc_transport.BigtableTableAdminGrpcTransport,
)
klass = _create_gapic_client(
bigtable_admin_v2.BigtableTableAdminClient,
client_options=self._admin_client_options,
transport=transport,
)
self._table_admin_client = klass(self)
return self._table_admin_client
Expand All @@ -289,9 +336,15 @@ def instance_admin_client(self):
if self._instance_admin_client is None:
if not self._admin:
raise ValueError("Client is not an admin client.")

transport = self._create_gapic_client_channel(
bigtable_admin_v2.BigtableInstanceAdminClient,
bigtable_instance_admin_grpc_transport.BigtableInstanceAdminGrpcTransport,
)
klass = _create_gapic_client(
bigtable_admin_v2.BigtableInstanceAdminClient,
client_options=self._admin_client_options,
transport=transport,
)
self._instance_admin_client = klass(self)
return self._instance_admin_client
Expand Down
36 changes: 27 additions & 9 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ def test_wo_emulator(self):
credentials = _make_credentials()
client = _Client(credentials)
client_info = client._client_info = mock.Mock()
transport = mock.Mock()

result = self._invoke_client_factory(client_class)(client)
result = self._invoke_client_factory(client_class, transport=transport)(client)

self.assertIs(result, client_class.return_value)
client_class.assert_called_once_with(
credentials=client._credentials,
credentials=None,
client_info=client_info,
client_options=None,
transport=transport,
)

def test_wo_emulator_w_client_options(self):
Expand All @@ -47,16 +49,18 @@ def test_wo_emulator_w_client_options(self):
client = _Client(credentials)
client_info = client._client_info = mock.Mock()
client_options = mock.Mock()
transport = mock.Mock()

result = self._invoke_client_factory(
client_class, client_options=client_options
client_class, client_options=client_options, transport=transport
)(client)

self.assertIs(result, client_class.return_value)
client_class.assert_called_once_with(
credentials=client._credentials,
credentials=None,
client_info=client_info,
client_options=client_options,
transport=transport,
)

def test_w_emulator(self):
Expand Down Expand Up @@ -170,7 +174,13 @@ def test_constructor_with_emulator_host(self):

self.assertEqual(client._emulator_host, emulator_host)
self.assertIs(client._emulator_channel, factory.return_value)
factory.assert_called_once_with(emulator_host)
factory.assert_called_once_with(
target=emulator_host,
options={
"grpc.keepalive_time_ms": 30000,
"grpc.keepalive_timeout_ms": 10000,
}.items(),
)
getenv.assert_called_once_with(BIGTABLE_EMULATOR)

def test__get_scopes_default(self):
Expand Down Expand Up @@ -234,7 +244,9 @@ def test_table_data_client_not_initialized_w_client_options(self):
from google.api_core.client_options import ClientOptions

credentials = _make_credentials()
client_options = ClientOptions(quota_project_id="QUOTA-PROJECT")
client_options = ClientOptions(
quota_project_id="QUOTA-PROJECT", api_endpoint="xyz"
)
client = self._make_one(
project=self.PROJECT, credentials=credentials, client_options=client_options
)
Expand All @@ -245,9 +257,11 @@ def test_table_data_client_not_initialized_w_client_options(self):

self.assertIs(table_data_client, mocked.return_value)
self.assertIs(client._table_data_client, table_data_client)

mocked.assert_called_once_with(
client_info=client._client_info,
credentials=mock.ANY, # added scopes
credentials=None,
transport=mock.ANY,
client_options=client_options,
)

Expand Down Expand Up @@ -308,6 +322,7 @@ def test_table_admin_client_not_initialized_w_client_options(self):
admin_client_options=admin_client_options,
)

client._create_gapic_client_channel = mock.Mock()
patch = mock.patch("google.cloud.bigtable_admin_v2.BigtableTableAdminClient")
with patch as mocked:
table_admin_client = client.table_admin_client
Expand All @@ -316,7 +331,8 @@ def test_table_admin_client_not_initialized_w_client_options(self):
self.assertIs(client._table_admin_client, table_admin_client)
mocked.assert_called_once_with(
client_info=client._client_info,
credentials=mock.ANY, # added scopes
credentials=None,
transport=mock.ANY,
client_options=admin_client_options,
)

Expand Down Expand Up @@ -377,6 +393,7 @@ def test_instance_admin_client_not_initialized_w_client_options(self):
admin_client_options=admin_client_options,
)

client._create_gapic_client_channel = mock.Mock()
patch = mock.patch("google.cloud.bigtable_admin_v2.BigtableInstanceAdminClient")
with patch as mocked:
instance_admin_client = client.instance_admin_client
Expand All @@ -385,7 +402,8 @@ def test_instance_admin_client_not_initialized_w_client_options(self):
self.assertIs(client._instance_admin_client, instance_admin_client)
mocked.assert_called_once_with(
client_info=client._client_info,
credentials=mock.ANY, # added scopes
credentials=None,
transport=mock.ANY,
client_options=admin_client_options,
)

Expand Down

0 comments on commit e9637cb

Please sign in to comment.