Skip to content

Commit 10fdaf6

Browse files
mreddy7alexeyserbin
authored andcommitted
KUDU-3476: Make replica placement range and table aware
Previously, the replica selection policy randomly selected two tablet servers and placed the replica on the tserver with less replicas. This could lead to hotspotting if placing replicas from the same range on the same set of tservers since the policy doesn't discriminate by range. With this patch, the replica selection policy now ranks the available tservers by range and table load and places the replica accordingly. It prioritizes replicas by range first, replicas by table are used as a tiebreaker, then total replicas is used as the final tiebreaker. The range and table load is determined by the existing number of replicas before the placement begins and the number of pending replicas placed on the tserver while placing replicas. The flag --enable_range_replica_placement on the master side controls whether or not this new policy is used. For this feature to work, both the range start key and the table id of the table the reange belongs to must be defined. This is because multiple tables could have the same range defined by the same range start key, so to differentiate the ranges, the table id is required. The link to the design doc is here: https://docs.google.com/document/d/1r-p0GW8lj2iLA3VGvZWAem09ykCmR5jEe8npUhJ07G8/edit?usp=sharing Change-Id: I9caeb8d5547e946bfeb152a99e1ec034c3fa0a0f Reviewed-on: http://gerrit.cloudera.org:8080/19931 Tested-by: Alexey Serbin <alexey@apache.org> Reviewed-by: Alexey Serbin <alexey@apache.org>
1 parent ec43817 commit 10fdaf6

14 files changed

+1052
-108
lines changed

src/kudu/integration-tests/create-table-itest.cc

Lines changed: 105 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
4141
#include "kudu/common/common.pb.h"
4242
#include "kudu/common/partial_row.h"
43+
#include "kudu/common/partition.h"
4344
#include "kudu/common/schema.h"
4445
#include "kudu/common/wire_protocol-test-util.h"
4546
#include "kudu/gutil/mathlimits.h"
@@ -53,7 +54,9 @@
5354
#include "kudu/mini-cluster/external_mini_cluster.h"
5455
#include "kudu/mini-cluster/mini_cluster.h"
5556
#include "kudu/rpc/rpc_controller.h"
57+
#include "kudu/tablet/tablet.pb.h"
5658
#include "kudu/tools/tool_test_util.h"
59+
#include "kudu/tserver/tserver.pb.h"
5760
#include "kudu/util/atomic.h"
5861
#include "kudu/util/metrics.h"
5962
#include "kudu/util/monotime.h"
@@ -199,7 +202,10 @@ TEST_F(CreateTableITest, TestCreateWhenMajorityOfReplicasFailCreation) {
199202
TEST_F(CreateTableITest, TestSpreadReplicasEvenly) {
200203
const int kNumServers = 10;
201204
const int kNumTablets = 20;
202-
NO_FATALS(StartCluster({}, {}, kNumServers));
205+
vector<string> master_flags = {
206+
"--enable_range_replica_placement=false",
207+
};
208+
NO_FATALS(StartCluster({}, master_flags, kNumServers));
203209

204210
unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
205211
auto client_schema = KuduSchema::FromSchema(GetSimpleTestSchema());
@@ -399,6 +405,99 @@ TEST_F(CreateTableITest, TestSpreadReplicasEvenlyWithDimension) {
399405
}
400406
}
401407

408+
// Tests the range aware replica placement by adding multiple tables with multiple ranges
409+
// and checking the replica distribution.
410+
TEST_F(CreateTableITest, TestSpreadReplicas) {
411+
const int kNumServers = 5;
412+
const int kNumReplicas = 3;
413+
NO_FATALS(StartCluster({ }, { }, kNumServers));
414+
415+
Schema schema = Schema({ ColumnSchema("key1", INT32),
416+
ColumnSchema("key2", INT32),
417+
ColumnSchema("int_val", INT32),
418+
ColumnSchema("string_val", STRING, true) }, 2);
419+
auto client_schema = KuduSchema::FromSchema(schema);
420+
421+
auto create_table_func = [](KuduClient* client,
422+
KuduSchema* client_schema,
423+
const string& table_name,
424+
const vector<std::pair<int32_t, int32_t>> range_bounds,
425+
const int num_buckets) {
426+
unique_ptr<client::KuduTableCreator> table_creator(client->NewTableCreator());
427+
table_creator->table_name(table_name)
428+
.schema(client_schema)
429+
.add_hash_partitions({ "key1" }, num_buckets)
430+
.set_range_partition_columns({ "key2" })
431+
.num_replicas(kNumReplicas);
432+
for (const auto& range_bound : range_bounds) {
433+
unique_ptr<KuduPartialRow> lower_bound(client_schema->NewRow());
434+
RETURN_NOT_OK(lower_bound->SetInt32("key2", range_bound.first));
435+
unique_ptr<KuduPartialRow> upper_bound(client_schema->NewRow());
436+
RETURN_NOT_OK(upper_bound->SetInt32("key2", range_bound.second));
437+
table_creator->add_range_partition(lower_bound.release(), upper_bound.release());
438+
}
439+
return table_creator->Create();
440+
};
441+
442+
vector<string> tables = {"table1", "table2", "table3", "table4"};
443+
vector<std::pair<int32_t, int32_t>> range_bounds =
444+
{ {0, 100}, {100, 200}, {200, 300}, {300, 400}};
445+
const int doubleNumBuckets = 10;
446+
const int numBuckets = 5;
447+
for (const auto& table : tables) {
448+
if (table == "table1") {
449+
ASSERT_OK(create_table_func(
450+
client_.get(), &client_schema, table, range_bounds, doubleNumBuckets));
451+
} else {
452+
ASSERT_OK(create_table_func(
453+
client_.get(), &client_schema, table, range_bounds, numBuckets));
454+
}
455+
}
456+
457+
// Stats of number of replicas per range per table per tserver.
458+
typedef std::unordered_map<string, std::unordered_map<string, int>> replicas_per_range_per_table;
459+
std::unordered_map<int, replicas_per_range_per_table> stats;
460+
for (int ts_idx = 0; ts_idx < kNumServers; ts_idx++) {
461+
rpc::RpcController rpc;
462+
tserver::ListTabletsRequestPB req;
463+
tserver::ListTabletsResponsePB resp;
464+
cluster_->tserver_proxy(ts_idx)->ListTablets(req, &resp, &rpc);
465+
for (auto i = 0; i < resp.status_and_schema_size(); ++i) {
466+
auto tablet_status = resp.status_and_schema(i).tablet_status();
467+
if (tablet_status.has_partition()) {
468+
Partition partition;
469+
Partition::FromPB(tablet_status.partition(), &partition);
470+
auto range_start_key = partition.begin().range_key();
471+
auto table_name = tablet_status.table_name();
472+
++stats[ts_idx][table_name][range_start_key];
473+
}
474+
}
475+
}
476+
477+
ASSERT_EQ(kNumServers, stats.size());
478+
for (const auto& stat : stats) {
479+
int tserver_replicas = 0;
480+
// Verifies that four tables exist on each tserver.
481+
ASSERT_EQ(tables.size(), stat.second.size());
482+
for (const auto& table : stat.second) {
483+
// Verifies that the four ranges exist for each table on each tserver.
484+
ASSERT_EQ(range_bounds.size(), table.second.size());
485+
for (const auto& ranges : table.second) {
486+
// Since there are ten buckets instead of five for table "table1",
487+
// we expect twice as many replicas (6 instead of 3).
488+
if (table.first == "table1") {
489+
ASSERT_EQ(doubleNumBuckets * kNumReplicas / kNumServers, ranges.second);;
490+
} else {
491+
ASSERT_EQ(numBuckets * kNumReplicas / kNumServers, ranges.second);
492+
}
493+
tserver_replicas += ranges.second;
494+
}
495+
}
496+
// Verifies that 60 replicas are placed on each tserver, 300 total across 5 tservers.
497+
ASSERT_EQ(60, tserver_replicas);
498+
}
499+
}
500+
402501
static void LookUpRandomKeysLoop(const std::shared_ptr<master::MasterServiceProxy>& master,
403502
const char* table_name,
404503
AtomicBool* quit) {
@@ -472,7 +571,7 @@ TEST_F(CreateTableITest, TestCreateTableWithDeadTServers) {
472571
unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
473572

474573
// Don't bother waiting for table creation to finish; it'll never happen
475-
// because all of the tservers are dead.
574+
// because all the tservers are dead.
476575
CHECK_OK(table_creator->table_name(kTableName)
477576
.schema(&client_schema)
478577
.set_range_partition_columns({ "key" })
@@ -688,7 +787,7 @@ TEST_P(NotEnoughHealthyTServersTest, TestNotEnoughHealthyTServers) {
688787
}
689788
// Wait the 3 tablet servers heartbeat timeout and unresponsive timeout. Then catalog
690789
// manager will take them as unavailable tablet servers. KSCK gets the status of tablet
691-
// server from tablet serve interface. Here must wait the caltalog manager to take the
790+
// server from tablet serve interface. Here must wait the catalog manager to take the
692791
// as unavailable.
693792
SleepFor(MonoDelta::FromMilliseconds(3*(kTSUnresponsiveTimeoutMs + kHeartbeatIntervalMs)));
694793
}
@@ -703,7 +802,7 @@ TEST_P(NotEnoughHealthyTServersTest, TestNotEnoughHealthyTServers) {
703802
{
704803
// Restart the first tablet server.
705804
NO_FATALS(cluster_->tablet_server(0)->Restart());
706-
// Wait the restarted tablet server to send a heartbeat and be registered in catalog manaager.
805+
// Wait the restarted tablet server to send a heartbeat and be registered in catalog manager.
707806
SleepFor(MonoDelta::FromMilliseconds(kHeartbeatIntervalMs));
708807
}
709808

@@ -713,7 +812,7 @@ TEST_P(NotEnoughHealthyTServersTest, TestNotEnoughHealthyTServers) {
713812
{
714813
// Restart the second tablet server.
715814
NO_FATALS(cluster_->tablet_server(1)->Restart());
716-
// Wait the restarted tablet server to send a heartbeat and be registered in catalog manaager.
815+
// Wait the restarted tablet server to send a heartbeat and be registered in catalog manager.
717816
SleepFor(MonoDelta::FromMilliseconds(kHeartbeatIntervalMs));
718817
}
719818

@@ -755,7 +854,7 @@ TEST_P(NotEnoughHealthyTServersTest, TestNotEnoughHealthyTServers) {
755854
// Add one new tablet server.
756855
NO_FATALS(cluster_->AddTabletServer());
757856
} else {
758-
// Restart the stopped tablet server
857+
// Restart the stopped tablet server.
759858
NO_FATALS(cluster_->tablet_server(2)->Restart());
760859
}
761860

src/kudu/master/catalog_manager.cc

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ TAG_FLAG(allow_unsafe_replication_factor, runtime);
228228

229229
DEFINE_int32(catalog_manager_bg_task_wait_ms, 1000,
230230
"Amount of time the catalog manager background task thread waits "
231-
"between runs");
231+
"between runs.");
232232
TAG_FLAG(catalog_manager_bg_task_wait_ms, hidden);
233233

234234
DEFINE_int32(max_create_tablets_per_ts, 60,
@@ -357,11 +357,11 @@ TAG_FLAG(auto_leader_rebalancing_enabled, runtime);
357357

358358
DEFINE_uint32(table_locations_cache_capacity_mb, 0,
359359
"Capacity for the table locations cache (in MiB); a value "
360-
"of 0 means table locations are not be cached");
360+
"of 0 means table locations are not be cached.");
361361
TAG_FLAG(table_locations_cache_capacity_mb, advanced);
362362

363363
DEFINE_bool(enable_per_range_hash_schemas, true,
364-
"Whether to support range-specific hash schemas for tables");
364+
"Whether to support range-specific hash schemas for tables.");
365365
TAG_FLAG(enable_per_range_hash_schemas, advanced);
366366
TAG_FLAG(enable_per_range_hash_schemas, runtime);
367367

@@ -372,6 +372,10 @@ DEFINE_bool(enable_table_write_limit, false,
372372
TAG_FLAG(enable_table_write_limit, experimental);
373373
TAG_FLAG(enable_table_write_limit, runtime);
374374

375+
DEFINE_bool(enable_range_replica_placement, true,
376+
"Whether to use range aware replica placement for newly created tablets.");
377+
TAG_FLAG(enable_range_replica_placement, runtime);
378+
375379
DEFINE_int64(table_disk_size_limit, -1,
376380
"Set the target size in bytes of a table to write. "
377381
"This is a system wide configuration for every newly "
@@ -385,7 +389,7 @@ DEFINE_int64(table_row_count_limit, -1,
385389
TAG_FLAG(table_row_count_limit, experimental);
386390

387391
DEFINE_double(table_write_limit_ratio, 0.95,
388-
"Set the ratio of how much write limit can be reached");
392+
"Set the ratio of how much write limit can be reached.");
389393
TAG_FLAG(table_write_limit_ratio, experimental);
390394

391395
DEFINE_bool(enable_metadata_cleanup_for_deleted_tables_and_tablets, false,
@@ -410,7 +414,7 @@ TAG_FLAG(enable_chunked_tablet_writes, runtime);
410414
DEFINE_bool(require_new_spec_for_custom_hash_schema_range_bound, false,
411415
"Whether to require the client to use newer signature to specify "
412416
"range bounds when working with a table having custom hash schema "
413-
"per range");
417+
"per range.");
414418
TAG_FLAG(require_new_spec_for_custom_hash_schema_range_bound, experimental);
415419
TAG_FLAG(require_new_spec_for_custom_hash_schema_range_bound, runtime);
416420

@@ -5088,16 +5092,29 @@ bool AsyncAddReplicaTask::SendRequest(int attempt) {
50885092
TSDescriptorVector ts_descs;
50895093
master_->ts_manager()->GetDescriptorsAvailableForPlacement(&ts_descs);
50905094

5091-
// Get the dimension of the tablet. Otherwise, it will be nullopt.
5095+
// Get the dimension, table id, and range start key of the tablet.
50925096
optional<string> dimension = nullopt;
5097+
optional<string> table_id = nullopt;
5098+
optional<string> range_key_start = nullopt;
50935099
{
50945100
TabletMetadataLock l(tablet_.get(), LockMode::READ);
50955101
if (tablet_->metadata().state().pb.has_dimension_label()) {
50965102
dimension = tablet_->metadata().state().pb.dimension_label();
50975103
}
5104+
if (FLAGS_enable_range_replica_placement) {
5105+
Partition partition;
5106+
if (tablet_->metadata().state().pb.has_partition()) {
5107+
const auto& tablet_partition = tablet_->metadata().state().pb.partition();
5108+
Partition::FromPB(tablet_partition, &partition);
5109+
}
5110+
range_key_start = partition.begin().range_key();
5111+
VLOG(1) << Substitute("range_key_start is set to $1", range_key_start.value());
5112+
table_id = tablet_->metadata().state().pb.table_id();
5113+
VLOG(1) << Substitute("table_id is set to $1", table_id.value());
5114+
}
50985115
}
50995116

5100-
// Some of the tablet servers hosting the current members of the config
5117+
// Some tablet servers hosting the current members of the config
51015118
// (see the 'existing' populated above) might be presumably dead.
51025119
// Inclusion of a presumably dead tablet server into 'existing' is OK:
51035120
// PlacementPolicy::PlaceExtraTabletReplica() does not require elements of
@@ -5106,7 +5123,8 @@ bool AsyncAddReplicaTask::SendRequest(int attempt) {
51065123
// to host the extra replica is 'ts_descs' after blacklisting all elements
51075124
// common with 'existing'.
51085125
PlacementPolicy policy(std::move(ts_descs), rng_);
5109-
s = policy.PlaceExtraTabletReplica(std::move(existing), dimension, &extra_replica);
5126+
s = policy.PlaceExtraTabletReplica(
5127+
std::move(existing), dimension, range_key_start, table_id, &extra_replica);
51105128
}
51115129
if (PREDICT_FALSE(!s.ok())) {
51125130
auto msg = Substitute("no extra replica candidate found for tablet $0: $1",
@@ -6153,15 +6171,29 @@ Status CatalogManager::SelectReplicasForTablet(const PlacementPolicy& policy,
61536171
config->set_obsolete_local(nreplicas == 1);
61546172
config->set_opid_index(consensus::kInvalidOpIdIndex);
61556173

6156-
// Get the dimension of the tablet. Otherwise, it will be nullopt.
6174+
// Get the dimension, table id, and range start key of the tablet.
61576175
optional<string> dimension = nullopt;
6176+
optional<string> table_id = nullopt;
6177+
optional<string> range_key_start = nullopt;
61586178
if (tablet->metadata().state().pb.has_dimension_label()) {
61596179
dimension = tablet->metadata().state().pb.dimension_label();
61606180
}
6181+
if (FLAGS_enable_range_replica_placement) {
6182+
Partition partition;
6183+
if (tablet->metadata().state().pb.has_partition()) {
6184+
const auto& tablet_partition = tablet->metadata().state().pb.partition();
6185+
Partition::FromPB(tablet_partition, &partition);
6186+
}
6187+
range_key_start = partition.begin().range_key();
6188+
VLOG(1) << Substitute("range_key_start is set to $1", range_key_start.value());
6189+
table_id = tablet->metadata().state().pb.table_id();
6190+
VLOG(1) << Substitute("table_id is set to $1", table_id.value());
6191+
}
61616192

61626193
// Select the set of replicas for the tablet.
61636194
TSDescriptorVector descriptors;
6164-
RETURN_NOT_OK_PREPEND(policy.PlaceTabletReplicas(nreplicas, dimension, &descriptors),
6195+
RETURN_NOT_OK_PREPEND(policy.PlaceTabletReplicas(
6196+
nreplicas, dimension, range_key_start, table_id, &descriptors),
61656197
Substitute("failed to place replicas for tablet $0 "
61666198
"(table '$1')",
61676199
tablet->id(), table_guard.data().name()));

src/kudu/master/master.proto

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,17 @@ message TabletReportUpdatesPB {
335335
repeated ReportedTabletUpdatesPB tablets = 1;
336336
}
337337

338+
// The number of tablets that are BOOTSTRAPPING or RUNNING per range.
339+
message TabletsByRangePB {
340+
optional bytes range_start_key = 1;
341+
optional int32 tablets = 2;
342+
}
343+
344+
// The number of tablets that are BOOTSTRAPPING or RUNNING per each range for each table.
345+
message TabletsByRangePerTablePB {
346+
repeated TabletsByRangePB num_live_tablets_by_range = 1;
347+
}
348+
338349
// Heartbeat sent from the tablet-server to the master
339350
// to establish liveness and report back any status changes.
340351
message TSHeartbeatRequestPB {
@@ -370,9 +381,14 @@ message TSHeartbeatRequestPB {
370381
optional consensus.ReplicaManagementInfoPB replica_management_info = 7;
371382

372383
// The number of tablets that are BOOTSTRAPPING or RUNNING in each dimension.
373-
// Used by the master to determine load when creating new tablet replicas
384+
// Used by the master to determine load when placing new tablet replicas
374385
// based on dimension.
375386
map<string, int32> num_live_tablets_by_dimension = 8;
387+
388+
// Per table, the number of tablets that are BOOTSTRAPPING or RUNNING
389+
// in each range. Used by the master to determine load when placing
390+
// new tablet replicas based on range and table.
391+
map<string, TabletsByRangePerTablePB> num_live_tablets_by_range_per_table = 9;
376392
}
377393

378394
message TSHeartbeatResponsePB {

src/kudu/master/master_runner.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -348,9 +348,9 @@ void SetMasterFlagDefaults() {
348348
to_string(kDefaultRpcServiceQueueLength).c_str(),
349349
SET_FLAGS_DEFAULT));
350350
// Master always reads the latest data snapshot from the system catalog and
351-
// never uses any specific timestatmp in past for a read snapshot. With that,
352-
// here isn't much sense to keep long chain of UNDO deltas in addition to the
353-
// latest version in the MVCC. Keeping short history of deltas frees CPU
351+
// never uses any specific past timestamp for a read snapshot. With that,
352+
// it doesn't make sense to keep a long chain of UNDO deltas in addition to the
353+
// latest version in the MVCC. Keeping a short history of deltas frees CPU
354354
// cycles, memory, and IO bandwidth that otherwise would be consumed by
355355
// background maintenance jobs running compactions. In addition, less disk
356356
// space is consumed to store the system tablet's data.

src/kudu/master/master_service.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,11 +451,21 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
451451
}
452452

453453
// 4. Update tserver soft state based on the heartbeat contents.
454+
// TODO(mreddy) If --enable_range_replica_placement is set to false, don't populate ranges map.
454455
ts_desc->UpdateHeartbeatTime();
455456
ts_desc->set_num_live_replicas(req->num_live_tablets());
456457
ts_desc->set_num_live_replicas_by_dimension(
457458
TabletNumByDimensionMap(req->num_live_tablets_by_dimension().begin(),
458459
req->num_live_tablets_by_dimension().end()));
460+
for (auto it = req->num_live_tablets_by_range_per_table().begin();
461+
it != req->num_live_tablets_by_range_per_table().end(); ++it) {
462+
TabletNumByRangeMap ranges;
463+
for (auto range = it->second.num_live_tablets_by_range().begin();
464+
range != it->second.num_live_tablets_by_range().end(); ++range) {
465+
ranges[range->range_start_key()] = range->tablets();
466+
}
467+
ts_desc->set_num_live_replicas_by_range_per_table(it->first, ranges);
468+
}
459469

460470
// 5. Only leaders handle tablet reports.
461471
if (is_leader_master && req->has_tablet_report()) {

0 commit comments

Comments
 (0)