Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public class BalancedRandomRoutingTableBuilder extends BaseRoutingTableBuilder {
public void init(Configuration configuration, TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore,
BrokerMetrics brokerMetrics) {
super.init(configuration, tableConfig, propertyStore, brokerMetrics);
_numRoutingTables = configuration.getInt(NUM_ROUTING_TABLES_KEY, DEFAULT_NUM_ROUTING_TABLES);
int numReplicas = tableConfig.getValidationConfig().getReplicationNumber();
_numRoutingTables = numReplicas == 1 ? 1 : configuration.getInt(NUM_ROUTING_TABLES_KEY, DEFAULT_NUM_ROUTING_TABLES);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this configuration coming from? My main concern about this change is if the replicationFactor changes and we invoke rebalance, the number of routingTable remains the same

Copy link
Contributor Author

@siddharthteotia siddharthteotia Dec 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The replication is coming from tableConfig -- segmentsConfig part of tableConfig.

"segmentsConfig": {
      "completionConfig": null, 
      "hllConfig": null, 
      "replicaGroupStrategyConfig": {
        "numInstancesPerPartition": 2, 
        "partitionColumn": null
      }, 
      "replicasPerPartition": null, 
      "replication": "3",
      "retentionTimeUnit": "DAYS", 
      "retentionTimeValue": "7320", 
    }

If the replication changes, the change should be reflected in table config right? So on the next change to external view (after changing the table config and ideal state with new servers, rebalance etc), the routing table builder should see the updated config and thus the new replication factor. Is my understanding correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI.... we had seen this in couple of non prod deployments where there were 1000s of segments across 20+ servers but replication 1 and the routing table looked weird with duplicate server -> segments mapping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are not guaranteed to see the updated table config after the replication factor changes then yes we should not be doing this change. AFAIK, there is no way to change the replication factor without going via table config

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.core.transport.ServerInstance;


Expand All @@ -40,10 +45,21 @@
public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTableBuilder {

/** Number of routing tables to keep */
private static final int ROUTING_TABLE_COUNT = 500;
private int routingTableCount = 500;

/** Number of routing tables to generate during the optimization phase */
private static final int ROUTING_TABLE_GENERATION_COUNT = 1000;
private int routingTableGenerationCount = 1000;

@Override
public void init(Configuration configuration, TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore,
BrokerMetrics brokerMetrics) {
super.init(configuration, tableConfig, propertyStore, brokerMetrics);
int numReplicas = tableConfig.getValidationConfig().getReplicationNumber();
if (numReplicas == 1) {
routingTableCount = 1;
routingTableGenerationCount = 1;
}
}

/**
* Generates a routing table, decorated with a metric.
Expand Down Expand Up @@ -254,18 +270,18 @@ protected List<Map<ServerInstance, List<String>>> computeRoutingTablesFromSegmen
// according to a per-routing table metric and discard the worst routing tables.

PriorityQueue<Pair<Map<ServerInstance, List<String>>, Float>> topRoutingTables =
new PriorityQueue<>(ROUTING_TABLE_COUNT, (left, right) -> {
new PriorityQueue<>(routingTableCount, (left, right) -> {
// Float.compare sorts in ascending order and we want a max heap, so we need to return the negative
// of the comparison
return -Float.compare(left.getValue(), right.getValue());
});

for (int i = 0; i < ROUTING_TABLE_COUNT; i++) {
for (int i = 0; i < routingTableCount; i++) {
topRoutingTables.add(generateRoutingTableWithMetric(segmentToServersMap));
}

// Generate routing more tables and keep the ROUTING_TABLE_COUNT top ones
for (int i = 0; i < (ROUTING_TABLE_GENERATION_COUNT - ROUTING_TABLE_COUNT); ++i) {
for (int i = 0; i < (routingTableGenerationCount - routingTableCount); ++i) {
Pair<Map<ServerInstance, List<String>>, Float> newRoutingTable =
generateRoutingTableWithMetric(segmentToServersMap);
Pair<Map<ServerInstance, List<String>>, Float> worstRoutingTable = topRoutingTables.peek();
Expand Down