Skip to content

Commit

Permalink
[java] KUDU-3349 Fix the failure to demote a leader
Browse files Browse the repository at this point in the history
KuduScanToken gets a wrong tserver's uuid whose format is something
like: '<ByteString@6dffd497 size=32 contents="fc07f681d3ea4bab9bc5ec8090ab9437">',
the expected uuid should be "fc07f681d3ea4bab9bc5ec8090ab9437".

This issue caused RemoteTablet to fail to demote a leader, and the java
client always sends write ops to the demoted leader. As a result, there
are a lot of "PendingErrors overflowed. Failed to write at least 1000 rows to Kudu".

After this fix, the write ops, especially the deleting, will be faster.

Change-Id: I2974b6ec2cec2f0120b113d1bcf89fe3793a1ec5
Reviewed-on: http://gerrit.cloudera.org:8080/18166
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
  • Loading branch information
Hongjiang Zhang authored and alexeyserbin committed Feb 10, 2022
1 parent 205710c commit 90895ce
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 24 deletions.
Expand Up @@ -205,6 +205,44 @@ private static List<Integer> computeProjectedColumnIndexesForScanner(ScanTokenPB
return columns;
}

/**
* create a new RemoteTablet from TabletMetadata
* @param tabletMetadata the tablet metadata
* @param tableId the table Id
* @param partition the partition
* @return a RemoteTablet object
*/
public static RemoteTablet newRemoteTabletFromTabletMetadata(
Client.TabletMetadataPB tabletMetadata,
String tableId,
Partition partition) {
List<LocatedTablet.Replica> replicas = new ArrayList<>();
for (Client.TabletMetadataPB.ReplicaMetadataPB replicaMetadataPB :
tabletMetadata.getReplicasList()) {
Client.ServerMetadataPB server =
tabletMetadata.getTabletServers(replicaMetadataPB.getTsIdx());
LocatedTablet.Replica replica = new LocatedTablet.Replica(
server.getRpcAddresses(0).getHost(),
server.getRpcAddresses(0).getPort(),
replicaMetadataPB.getRole(), replicaMetadataPB.getDimensionLabel());
replicas.add(replica);
}

List<ServerInfo> servers = new ArrayList<>();
for (Client.ServerMetadataPB serverMetadataPB : tabletMetadata.getTabletServersList()) {
HostAndPort hostPort =
ProtobufHelper.hostAndPortFromPB(serverMetadataPB.getRpcAddresses(0));
final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
ServerInfo serverInfo = new ServerInfo(serverMetadataPB.getUuid().toStringUtf8(),
hostPort, inetAddress, serverMetadataPB.getLocation());
servers.add(serverInfo);
}

RemoteTablet remoteTablet = new RemoteTablet(tableId,
tabletMetadata.getTabletId(), partition, replicas, servers);
return remoteTablet;
}

@SuppressWarnings("deprecation")
private static KuduScanner.KuduScannerBuilder pbIntoScannerBuilder(
ScanTokenPB message, KuduClient client) throws KuduException {
Expand All @@ -226,30 +264,8 @@ private static KuduScanner.KuduScannerBuilder pbIntoScannerBuilder(
TableLocationsCache tableLocationsCache =
client.asyncClient.getOrCreateTableLocationsCache(table.getTableId());

List<LocatedTablet.Replica> replicas = new ArrayList<>();
for (Client.TabletMetadataPB.ReplicaMetadataPB replicaMetadataPB :
tabletMetadata.getReplicasList()) {
Client.ServerMetadataPB server =
tabletMetadata.getTabletServers(replicaMetadataPB.getTsIdx());
LocatedTablet.Replica replica = new LocatedTablet.Replica(
server.getRpcAddresses(0).getHost(),
server.getRpcAddresses(0).getPort(),
replicaMetadataPB.getRole(), replicaMetadataPB.getDimensionLabel());
replicas.add(replica);
}

List<ServerInfo> servers = new ArrayList<>();
for (Client.ServerMetadataPB serverMetadataPB : tabletMetadata.getTabletServersList()) {
HostAndPort hostPort =
ProtobufHelper.hostAndPortFromPB(serverMetadataPB.getRpcAddresses(0));
final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
ServerInfo serverInfo = new ServerInfo(serverMetadataPB.getUuid().toString(),
hostPort, inetAddress, serverMetadataPB.getLocation());
servers.add(serverInfo);
}

RemoteTablet remoteTablet = new RemoteTablet(table.getTableId(),
tabletMetadata.getTabletId(), partition, replicas, servers);
RemoteTablet remoteTablet =
newRemoteTabletFromTabletMetadata(tabletMetadata, table.getTableId(), partition);

tableLocationsCache.cacheTabletLocations(Collections.singletonList(remoteTablet),
partition.partitionKeyStart, 1, tabletMetadata.getTtlMillis());
Expand Down
Expand Up @@ -32,6 +32,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -41,6 +42,8 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.protobuf.CodedInputStream;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -51,6 +54,7 @@
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.test.KuduTestHarness;
import org.apache.kudu.test.cluster.KuduBinaryLocator;
import org.apache.kudu.test.cluster.MiniKuduCluster;

public class TestScanToken {
Expand Down Expand Up @@ -137,6 +141,104 @@ public void testScanTokens() throws Exception {
}
}

/**
* Regression test for KUDU-3349
*/
@Test
public void testScanTokenWithWrongUuidSerialization() throws Exception {
// Prepare the table for testing.
Schema schema = createManyStringsSchema();
CreateTableOptions createOptions = new CreateTableOptions();
final int buckets = 8;
createOptions.addHashPartitions(ImmutableList.of("key"), buckets);
client.createTable(testTableName, schema, createOptions);

KuduSession session = client.newSession();
KuduTable table = client.openTable(testTableName);
final int totalRows = 100;
for (int i = 0; i < totalRows; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addString("key", String.format("key_%02d", i));
row.addString("c1", "c1_" + i);
row.addString("c2", "c2_" + i);
assertEquals(session.apply(insert).hasRowError(), false);
}
KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table);
tokenBuilder.setProjectedColumnIndexes(ImmutableList.of());
List<KuduScanToken> tokens = tokenBuilder.build();
assertEquals(buckets, tokens.size());

// Create a new client, open the newly created kudu table, and new scanners.
AsyncKuduClient newAsyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(
harness.getMasterAddressesAsString())
.build();
KuduClient newClient = newAsyncClient.syncClient();
KuduTable newTable = newClient.openTable(testTableName);
List<KuduScanner> kuduScanners = new ArrayList<>(buckets);
List<String> tabletIds = new ArrayList<>(buckets);
for (KuduScanToken token : tokens) {
tabletIds.add(new String(token.getTablet().getTabletId(),
java.nio.charset.StandardCharsets.UTF_8));
KuduScanner kuduScanner = token.intoScanner(newAsyncClient.syncClient());
kuduScanners.add(kuduScanner);
}

// Step down all tablet leaders.
KuduBinaryLocator.ExecutableInfo exeInfo = null;
try {
exeInfo = KuduBinaryLocator.findBinary("kudu");
} catch (FileNotFoundException e) {
LOG.error(e.getMessage());
fail();
}
for (String tabletId : tabletIds) {
List<String> commandLine = Lists.newArrayList(exeInfo.exePath(),
"tablet",
"leader_step_down",
harness.getMasterAddressesAsString(),
tabletId);
ProcessBuilder processBuilder = new ProcessBuilder(commandLine);
processBuilder.environment().putAll(exeInfo.environment());
// Step down the tablet leaders one by one after a fix duration.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.error(e.getMessage());
}
}
// Delete all rows first through the new client.
KuduSession newSession = newClient.newSession();

for (int i = 0; i < totalRows; i++) {
Operation del = newTable.newDelete();
PartialRow row = del.getRow();
row.addString("key", String.format("key_%02d", i));
del.setRow(row);
OperationResponse response = newSession.apply(del);
assertEquals(response.hasRowError(), false);
}

// Insert all rows again through the new client.
for (int i = 0; i < totalRows; i++) {
Insert insert = newTable.newInsert();
PartialRow row = insert.getRow();
row.addString("key", String.format("key_%02d", i));
row.addString("c1", "c1_" + i);
row.addString("c2", "c2_" + i);
assertEquals(newSession.apply(insert).hasRowError(), false);
}

// Verify all the row count.
int rowCount = 0;
for (KuduScanner kuduScanner : kuduScanners) {
while (kuduScanner.hasMoreRows()) {
rowCount += kuduScanner.nextRows().numRows;
}
}
assertEquals(totalRows, rowCount);
}

/**
* Tests scan token creation and execution on a table with non-covering range partitions.
*/
Expand Down Expand Up @@ -694,6 +796,36 @@ public void testScanTokensWithExtraPredicate() throws IOException {
assertEquals(PREDICATE_VAL, Iterables.getOnlyElement(resultKeys).intValue());
}

/**
* Verify the deserialization of RemoteTablet from KuduScanToken.
* Regression test for KUDU-3349.
*/
@Test
public void testRemoteTabletVerification() throws IOException {
final int NUM_ROWS_DESIRED = 100;
KuduTable table = createDefaultTable(client, testTableName);
loadDefaultTable(client, testTableName, NUM_ROWS_DESIRED);
KuduScanToken.KuduScanTokenBuilder builder =
new KuduScanToken.KuduScanTokenBuilder(asyncClient, table);
List<KuduScanToken> tokens = builder.build();
List<HostAndPort> tservers = harness.getTabletServers();
for (KuduScanToken token : tokens) {
byte[] serialized = token.serialize();
Client.ScanTokenPB scanTokenPB =
Client.ScanTokenPB.parseFrom(CodedInputStream.newInstance(serialized));
Client.TabletMetadataPB tabletMetadata = scanTokenPB.getTabletMetadata();
Partition partition =
ProtobufHelper.pbToPartition(tabletMetadata.getPartition());
RemoteTablet remoteTablet = KuduScanToken.newRemoteTabletFromTabletMetadata(tabletMetadata,
table.getTableId(), partition);
for (ServerInfo si : remoteTablet.getTabletServersCopy()) {
assertEquals(si.getUuid().length(), 32);
HostAndPort hostAndPort = si.getHostAndPort();
assertEquals(tservers.contains(hostAndPort), true);
}
}
}

/**
* Regression test for KUDU-3205.
*/
Expand Down

0 comments on commit 90895ce

Please sign in to comment.