Skip to content

Commit

Permalink
JAVA-2654: Make AdminRequestHandler handle integer serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
GregBestland authored and olim7t committed Feb 24, 2020
1 parent 2ac7f83 commit 2027e27
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 5 deletions.
1 change: 1 addition & 0 deletions changelog/README.md
Expand Up @@ -4,6 +4,7 @@

### 4.5.0 (in progress)

- [bug] JAVA-2654: Make AdminRequestHandler handle integer serialization
- [improvement] JAVA-2618: Improve error handling in request handlers
- [new feature] JAVA-2064: Add support for DSE 6.8 graph options in schema builder
- [documentation] JAVA-2559: Fix GraphNode javadocs
Expand Down
Expand Up @@ -253,6 +253,8 @@ private static ByteBuffer serialize(Object parameter, ProtocolVersion protocolVe
@SuppressWarnings("unchecked")
List<String> l = (List<String>) parameter;
return AdminRow.LIST_OF_TEXT.encode(l, protocolVersion);
} else if (parameter instanceof Integer) {
return TypeCodecs.INT.encode((Integer) parameter, protocolVersion);
} else {
throw new IllegalArgumentException(
"Unsupported variable type for admin query: " + parameter.getClass());
Expand Down
Expand Up @@ -250,9 +250,15 @@ public CompletionStage<Void> forceCloseAsync() {
@VisibleForTesting
protected CompletionStage<AdminResult> query(
DriverChannel channel, String queryString, Map<String, Object> parameters) {
return AdminRequestHandler.query(
channel, queryString, parameters, timeout, INFINITE_PAGE_SIZE, logPrefix)
.start();
AdminRequestHandler<AdminResult> handler;
try {
handler =
AdminRequestHandler.query(
channel, queryString, parameters, timeout, INFINITE_PAGE_SIZE, logPrefix);
} catch (Exception e) {
return CompletableFutures.failedFuture(e);
}
return handler.start();
}

private CompletionStage<AdminResult> query(DriverChannel channel, String queryString) {
Expand Down
Expand Up @@ -333,8 +333,8 @@ private void setState(DefaultNode node, NodeState newState, String reason) {
(success, error) -> {
try {
if (error != null) {
LOG.debug(
"[{}] Error while refreshing info for {}", logPrefix, node, error);
Loggers.warnWithException(
LOG, "[{}] Error while refreshing info for {}", logPrefix, node, error);
}
// Fire the event whether the refresh succeeded or not
eventBus.fire(NodeStateEvent.changed(oldState, newState, node));
Expand Down
@@ -0,0 +1,95 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.core;

import static org.assertj.core.api.Assertions.assertThat;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.categories.ParallelizableTests;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.protocol.internal.request.Query;
import com.datastax.oss.simulacron.common.cluster.ClusterSpec;
import com.datastax.oss.simulacron.common.cluster.QueryLog;
import com.datastax.oss.simulacron.server.BoundCluster;
import com.datastax.oss.simulacron.server.Server;
import java.util.concurrent.ExecutionException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

/** Test for JAVA-2654. */
@Category(ParallelizableTests.class)
public class PeersV2NodeRefreshIT {

private static Server peersV2Server;
private static BoundCluster cluster;

@BeforeClass
public static void setup() {
peersV2Server = Server.builder().withMultipleNodesPerIp(true).build();
cluster = peersV2Server.register(ClusterSpec.builder().withNodes(2));
}

@AfterClass
public static void tearDown() {
cluster.stop();
peersV2Server.close();
}

@Test
public void should_successfully_send_peers_v2_node_refresh_query()
throws InterruptedException, ExecutionException {
CqlSession session =
CqlSession.builder().addContactPoint(cluster.node(1).inetSocketAddress()).build();
Node node = findNonControlNode(session);
((InternalDriverContext) session.getContext())
.getMetadataManager()
.refreshNode(node)
.toCompletableFuture()
.get();
assertThat(hasNodeRefreshQuery())
.describedAs("Expecting peers_v2 node refresh query to be present but it wasn't")
.isTrue();
}

private Node findNonControlNode(CqlSession session) {
EndPoint controlNode =
((InternalDriverContext) session.getContext())
.getControlConnection()
.channel()
.getEndPoint();
return session.getMetadata().getNodes().values().stream()
.filter(node -> !node.getEndPoint().equals(controlNode))
.findAny()
.orElseThrow(() -> new IllegalStateException("Expecting at least one non-control node"));
}

private boolean hasNodeRefreshQuery() {
for (QueryLog log : cluster.getLogs().getQueryLogs()) {
if (log.getFrame().message instanceof Query) {
if (((Query) log.getFrame().message)
.query.contains(
"SELECT * FROM system.peers_v2 WHERE peer = :address and peer_port = :port")) {
return true;
}
}
}
return false;
}
}
Expand Up @@ -32,6 +32,7 @@
public class SimulacronRule extends CassandraResourceRule {
// TODO perhaps share server some other way
// TODO: Temporarily do not release addresses to ensure IPs are always ordered
// TODO: Add a way to configure the server for multiple nodes per ip
public static final Server server =
Server.builder()
.withAddressResolver(
Expand Down

0 comments on commit 2027e27

Please sign in to comment.