Skip to content

Commit

Permalink
feat: add configurable leader placement support (#399)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-spanner/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕
  • Loading branch information
zoercai authored Jul 29, 2021
1 parent 9c529f3 commit 7f1b120
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 1 deletion.
11 changes: 11 additions & 0 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def __init__(
self._version_retention_period = None
self._earliest_version_time = None
self._encryption_info = None
self._default_leader = None
self.log_commit_stats = False
self._logger = logger
self._encryption_config = encryption_config
Expand Down Expand Up @@ -279,6 +280,15 @@ def encryption_info(self):
"""
return self._encryption_info

@property
def default_leader(self):
"""The read-write region which contains the database's leader replicas.
:rtype: str
:returns: a string representing the read-write region
"""
return self._default_leader

@property
def ddl_statements(self):
"""DDL Statements used to define database schema.
Expand Down Expand Up @@ -414,6 +424,7 @@ def reload(self):
self._earliest_version_time = response.earliest_version_time
self._encryption_config = response.encryption_config
self._encryption_info = response.encryption_info
self._default_leader = response.default_leader

def update_ddl(self, ddl_statements, operation_id=""):
"""Update DDL for this database.
Expand Down
83 changes: 83 additions & 0 deletions tests/system/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
INSTANCE_ID = os.environ.get(
"GOOGLE_CLOUD_TESTS_SPANNER_INSTANCE", "google-cloud-python-systest"
)
MULTI_REGION_INSTANCE_ID = "multi-region" + unique_resource_id("-")
EXISTING_INSTANCES = []
COUNTERS_TABLE = "counters"
COUNTERS_COLUMNS = ("name", "value")
Expand Down Expand Up @@ -353,9 +354,25 @@ def setUpClass(cls):
SPANNER_OPERATION_TIMEOUT_IN_SECONDS
) # raises on failure / timeout.

# Create a multi-region instance
multi_region_config = "nam3"
config_name = "{}/instanceConfigs/{}".format(
Config.CLIENT.project_name, multi_region_config
)
create_time = str(int(time.time()))
labels = {"python-spanner-systests": "true", "created": create_time}
cls._instance = Config.CLIENT.instance(
instance_id=MULTI_REGION_INSTANCE_ID,
configuration_name=config_name,
labels=labels,
)
operation = cls._instance.create()
operation.result(SPANNER_OPERATION_TIMEOUT_IN_SECONDS)

@classmethod
def tearDownClass(cls):
cls._db.drop()
cls._instance.delete()

def setUp(self):
self.to_delete = []
Expand Down Expand Up @@ -443,6 +460,42 @@ def test_create_database_pitr_success(self):
for result in results:
self.assertEqual(result[0], retention_period)

@unittest.skipIf(
USE_EMULATOR, "Default leader setting is not supported by the emulator"
)
def test_create_database_with_default_leader_success(self):
pool = BurstyPool(labels={"testcase": "create_database_default_leader"})

temp_db_id = "temp_db" + unique_resource_id("_")
default_leader = "us-east4"
ddl_statements = [
"ALTER DATABASE {}"
" SET OPTIONS (default_leader = '{}')".format(temp_db_id, default_leader)
]
temp_db = self._instance.database(
temp_db_id, pool=pool, ddl_statements=ddl_statements
)
operation = temp_db.create()
self.to_delete.append(temp_db)

# We want to make sure the operation completes.
operation.result(30) # raises on failure / timeout.

database_ids = [database.name for database in self._instance.list_databases()]
self.assertIn(temp_db.name, database_ids)

temp_db.reload()
self.assertEqual(temp_db.default_leader, default_leader)

with temp_db.snapshot() as snapshot:
results = snapshot.execute_sql(
"SELECT OPTION_VALUE AS default_leader "
"FROM INFORMATION_SCHEMA.DATABASE_OPTIONS "
"WHERE SCHEMA_NAME = '' AND OPTION_NAME = 'default_leader'"
)
for result in results:
self.assertEqual(result[0], default_leader)

def test_table_not_found(self):
temp_db_id = "temp_db" + unique_resource_id("_")

Expand Down Expand Up @@ -551,6 +604,36 @@ def test_update_database_ddl_pitr_success(self):
self.assertEqual(temp_db.version_retention_period, retention_period)
self.assertEqual(len(temp_db.ddl_statements), len(ddl_statements))

@unittest.skipIf(
USE_EMULATOR, "Default leader update is not supported by the emulator"
)
def test_update_database_ddl_default_leader_success(self):
pool = BurstyPool(labels={"testcase": "update_database_ddl_default_leader"})

temp_db_id = "temp_db" + unique_resource_id("_")
default_leader = "us-east4"
temp_db = self._instance.database(temp_db_id, pool=pool)
create_op = temp_db.create()
self.to_delete.append(temp_db)

# We want to make sure the operation completes.
create_op.result(240) # raises on failure / timeout.

self.assertIsNone(temp_db.default_leader)

ddl_statements = DDL_STATEMENTS + [
"ALTER DATABASE {}"
" SET OPTIONS (default_leader = '{}')".format(temp_db_id, default_leader)
]
operation = temp_db.update_ddl(ddl_statements)

# We want to make sure the operation completes.
operation.result(240) # raises on failure / timeout.

temp_db.reload()
self.assertEqual(temp_db.default_leader, default_leader)
self.assertEqual(len(temp_db.ddl_statements), len(ddl_statements))

def test_db_batch_insert_then_db_snapshot_read(self):
retry = RetryInstanceState(_has_all_ddl)
retry(self._db.reload)()
Expand Down
6 changes: 5 additions & 1 deletion tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class TestClient(unittest.TestCase):
PROCESSING_UNITS = 5000
LABELS = {"test": "true"}
TIMEOUT_SECONDS = 80
LEADER_OPTIONS = ["leader1", "leader2"]

def _get_target_class(self):
from google.cloud import spanner
Expand Down Expand Up @@ -457,7 +458,9 @@ def test_list_instance_configs(self):
instance_config_pbs = ListInstanceConfigsResponse(
instance_configs=[
InstanceConfigPB(
name=self.CONFIGURATION_NAME, display_name=self.DISPLAY_NAME
name=self.CONFIGURATION_NAME,
display_name=self.DISPLAY_NAME,
leader_options=self.LEADER_OPTIONS,
)
]
)
Expand All @@ -473,6 +476,7 @@ def test_list_instance_configs(self):
self.assertIsInstance(instance_config, InstanceConfigPB)
self.assertEqual(instance_config.name, self.CONFIGURATION_NAME)
self.assertEqual(instance_config.display_name, self.DISPLAY_NAME)
self.assertEqual(instance_config.leader_options, self.LEADER_OPTIONS)

expected_metadata = (
("google-cloud-resource-prefix", client.project_name),
Expand Down
10 changes: 10 additions & 0 deletions tests/unit/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,13 @@ def test_encryption_info(self):
]
self.assertEqual(database.encryption_info, encryption_info)

def test_default_leader(self):
instance = _Instance(self.INSTANCE_NAME)
pool = _Pool()
database = self._make_one(self.DATABASE_ID, instance, pool=pool)
default_leader = database._default_leader = "us-east4"
self.assertEqual(database.default_leader, default_leader)

def test_spanner_api_property_w_scopeless_creds(self):

client = _Client()
Expand Down Expand Up @@ -715,6 +722,7 @@ def test_reload_success(self):
kms_key_version="kms_key_version",
)
]
default_leader = "us-east4"
api = client.database_admin_api = self._make_database_admin_api()
api.get_database_ddl.return_value = ddl_pb
db_pb = Database(
Expand All @@ -725,6 +733,7 @@ def test_reload_success(self):
earliest_version_time=_datetime_to_pb_timestamp(timestamp),
encryption_config=encryption_config,
encryption_info=encryption_info,
default_leader=default_leader,
)
api.get_database.return_value = db_pb
instance = _Instance(self.INSTANCE_NAME, client=client)
Expand All @@ -740,6 +749,7 @@ def test_reload_success(self):
self.assertEqual(database._ddl_statements, tuple(DDL_STATEMENTS))
self.assertEqual(database._encryption_config, encryption_config)
self.assertEqual(database._encryption_info, encryption_info)
self.assertEqual(database._default_leader, default_leader)

api.get_database_ddl.assert_called_once_with(
database=self.DATABASE_NAME,
Expand Down

0 comments on commit 7f1b120

Please sign in to comment.