Skip to content

Commit

Permalink
DRILL-5015: Randomly select the drillbit from the list provided by us…
Browse files Browse the repository at this point in the history
…er in connection string

            Note: Improved the connection string validation and error handling during parsing.
                  Added unit test for the new parsing mechanism.

close #648
  • Loading branch information
Sorabh Hamirwasia authored and Aman Sinha committed Dec 5, 2016
1 parent 68bd27a commit 351dea6
Show file tree
Hide file tree
Showing 4 changed files with 386 additions and 13 deletions.
Expand Up @@ -79,6 +79,7 @@
import org.apache.drill.exec.rpc.NamedThreadFactory; import org.apache.drill.exec.rpc.NamedThreadFactory;
import org.apache.drill.exec.rpc.RpcConnectionHandler; import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.InvalidConnectionInfoException;
import org.apache.drill.exec.rpc.TransportCheck; import org.apache.drill.exec.rpc.TransportCheck;
import org.apache.drill.exec.rpc.user.QueryDataBatch; import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserClient; import org.apache.drill.exec.rpc.user.UserClient;
Expand Down Expand Up @@ -223,19 +224,100 @@ public void connect(Properties props) throws RpcException {
connect(null, props); connect(null, props);
} }


/**
* Populates the endpointlist with drillbits information provided in the connection string by client.
* For direct connection we can have connection string with drillbit property as below:
* <dl>
* <dt>drillbit=ip</dt>
* <dd>use the ip specified as the Foreman ip with default port in config file</dd>
* <dt>drillbit=ip:port</dt>
* <dd>use the ip and port specified as the Foreman ip and port</dd>
* <dt>drillbit=ip1:port1,ip2:port2,...</dt>
* <dd>randomly select the ip and port pair from the specified list as the Foreman ip and port.</dd>
* </dl>
*
* @param drillbits string with drillbit value provided in connection string
* @param defaultUserPort string with default userport of drillbit specified in config file
* @return list of drillbit endpoints parsed from connection string
* @throws InvalidConnectionInfoException if the connection string has invalid or no drillbit information
*/
static List<DrillbitEndpoint> parseAndVerifyEndpoints(String drillbits, String defaultUserPort)
throws InvalidConnectionInfoException {
// If no drillbits is provided then throw exception
drillbits = drillbits.trim();
if (drillbits.isEmpty()) {
throw new InvalidConnectionInfoException("No drillbit information specified in the connection string");
}

final List<DrillbitEndpoint> endpointList = new ArrayList<>();
final String[] connectInfo = drillbits.split(",");

// Fetch ip address and port information for each drillbit and populate the list
for (String drillbit : connectInfo) {

// Trim all the empty spaces and check if the entry is empty string.
// Ignore the empty ones.
drillbit = drillbit.trim();

if (!drillbit.isEmpty()) {
// Verify if we have only ":" or only ":port" pattern
if (drillbit.charAt(0) == ':') {
// Invalid drillbit information
throw new InvalidConnectionInfoException("Malformed connection string with drillbit hostname or " +
"hostaddress missing for an entry: " + drillbit);
}

// We are now sure that each ip:port entry will have both the values atleast once.
// Split each drillbit connection string to get ip address and port value
final String[] drillbitInfo = drillbit.split(":");

// Check if we have more than one port
if (drillbitInfo.length > 2) {
throw new InvalidConnectionInfoException("Malformed connection string with more than one port in a " +
"drillbit entry: " + drillbit);
}

// At this point we are sure that drillbitInfo has atleast hostname or host address
// trim all the empty spaces which might be present in front of hostname or
// host address information
final String ipAddress = drillbitInfo[0].trim();
String port = defaultUserPort;

if (drillbitInfo.length == 2) {
// We have a port value also given by user. trim all the empty spaces between : and port value before
// validating the correctness of value.
port = drillbitInfo[1].trim();
}

try {
final DrillbitEndpoint endpoint = DrillbitEndpoint.newBuilder()
.setAddress(ipAddress)
.setUserPort(Integer.parseInt(port))
.build();

endpointList.add(endpoint);
} catch (NumberFormatException e) {
throw new InvalidConnectionInfoException("Malformed port value in entry: " + ipAddress + ":" + port + " " +
"passed in connection string");
}
}
}
if (endpointList.size() == 0) {
throw new InvalidConnectionInfoException("No valid drillbit information specified in the connection string");
}
return endpointList;
}

public synchronized void connect(String connect, Properties props) throws RpcException { public synchronized void connect(String connect, Properties props) throws RpcException {
if (connected) { if (connected) {
return; return;
} }


final DrillbitEndpoint endpoint; final List<DrillbitEndpoint> endpoints = new ArrayList<>();
if (isDirectConnection) { if (isDirectConnection) {
final String[] connectInfo = props.getProperty("drillbit").split(":"); // Populate the endpoints list with all the drillbit information provided in the connection string
final String port = connectInfo.length==2?connectInfo[1]:config.getString(ExecConstants.INITIAL_USER_PORT); endpoints.addAll(parseAndVerifyEndpoints(props.getProperty("drillbit"),
endpoint = DrillbitEndpoint.newBuilder() config.getString(ExecConstants.INITIAL_USER_PORT)));
.setAddress(connectInfo[0])
.setUserPort(Integer.parseInt(port))
.build();
} else { } else {
if (ownsZkConnection) { if (ownsZkConnection) {
try { try {
Expand All @@ -245,14 +327,15 @@ public synchronized void connect(String connect, Properties props) throws RpcExc
throw new RpcException("Failure setting up ZK for client.", e); throw new RpcException("Failure setting up ZK for client.", e);
} }
} }

endpoints.addAll(clusterCoordinator.getAvailableEndpoints());
final ArrayList<DrillbitEndpoint> endpoints = new ArrayList<>(clusterCoordinator.getAvailableEndpoints()); // Make sure we have at least one endpoint in the list
checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found"); checkState(!endpoints.isEmpty(), "No active Drillbit endpoint found from ZooKeeper");
// shuffle the collection then get the first endpoint
Collections.shuffle(endpoints);
endpoint = endpoints.iterator().next();
} }


// shuffle the collection then get the first endpoint
Collections.shuffle(endpoints);
final DrillbitEndpoint endpoint = endpoints.get(0);

if (props != null) { if (props != null) {
final UserProperties.Builder upBuilder = UserProperties.newBuilder(); final UserProperties.Builder upBuilder = UserProperties.newBuilder();
for (final String key : props.stringPropertyNames()) { for (final String key : props.stringPropertyNames()) {
Expand Down
@@ -0,0 +1,258 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.client;

import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.DrillSystemTestBase;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.rpc.InvalidConnectionInfoException;
import org.junit.Test;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;


/**
* The unit test case will read a physical plan in json format. The physical plan contains a "trace" operator,
* which will produce a dump file. The dump file will be input into DumpCat to test query mode and batch mode.
*/

public class DrillClientTest extends DrillSystemTestBase {

private final DrillConfig config = DrillConfig.create();

@Test
public void testParseAndVerifyEndpointsSingleDrillbitIp() throws Exception {

// Test with single drillbit ip
final String drillBitConnection = "10.10.100.161";
final List<CoordinationProtos.DrillbitEndpoint> endpointsList = DrillClient.parseAndVerifyEndpoints
(drillBitConnection, config.getString(ExecConstants.INITIAL_USER_PORT));
final CoordinationProtos.DrillbitEndpoint endpoint = endpointsList.get(0);
assertEquals(endpointsList.size(), 1);
assertEquals(endpoint.getAddress(), drillBitConnection);
assertEquals(endpoint.getUserPort(), config.getInt(ExecConstants.INITIAL_USER_PORT));
}

@Test
public void testParseAndVerifyEndpointsSingleDrillbitIpPort() throws Exception {

// Test with single drillbit ip:port
final String drillBitConnection = "10.10.100.161:5000";
final String[] ipAndPort = drillBitConnection.split(":");
final List<CoordinationProtos.DrillbitEndpoint> endpointsList = DrillClient.parseAndVerifyEndpoints
(drillBitConnection, config.getString(ExecConstants.INITIAL_USER_PORT));
assertEquals(endpointsList.size(), 1);

final CoordinationProtos.DrillbitEndpoint endpoint = endpointsList.get(0);
assertEquals(endpoint.getAddress(), ipAndPort[0]);
assertEquals(endpoint.getUserPort(), Integer.parseInt(ipAndPort[1]));
}

@Test
public void testParseAndVerifyEndpointsMultipleDrillbitIp() throws Exception {

// Test with multiple drillbit ip
final String drillBitConnection = "10.10.100.161,10.10.100.162";
final List<CoordinationProtos.DrillbitEndpoint> endpointsList = DrillClient.parseAndVerifyEndpoints
(drillBitConnection, config.getString(ExecConstants.INITIAL_USER_PORT));
assertEquals(endpointsList.size(), 2);

CoordinationProtos.DrillbitEndpoint endpoint = endpointsList.get(0);
assertEquals(endpoint.getAddress(), "10.10.100.161");
assertEquals(endpoint.getUserPort(), config.getInt(ExecConstants.INITIAL_USER_PORT));

endpoint = endpointsList.get(1);
assertEquals(endpoint.getAddress(), "10.10.100.162");
assertEquals(endpoint.getUserPort(), config.getInt(ExecConstants.INITIAL_USER_PORT));
}

@Test
public void testParseAndVerifyEndpointsMultipleDrillbitIpPort() throws Exception {

// Test with multiple drillbit ip:port
final String drillBitConnection = "10.10.100.161:5000,10.10.100.162:5000";
final List<CoordinationProtos.DrillbitEndpoint> endpointsList = DrillClient.parseAndVerifyEndpoints
(drillBitConnection, config.getString(ExecConstants.INITIAL_USER_PORT));
assertEquals(endpointsList.size(), 2);

CoordinationProtos.DrillbitEndpoint endpoint = endpointsList.get(0);
assertEquals(endpoint.getAddress(), "10.10.100.161");
assertEquals(endpoint.getUserPort(), 5000);

endpoint = endpointsList.get(1);
assertEquals(endpoint.getAddress(), "10.10.100.162");
assertEquals(endpoint.getUserPort(), 5000);
}

@Test
public void testParseAndVerifyEndpointsMultipleDrillbitIpPortIp() throws Exception {

// Test with multiple drillbit with mix of ip:port and ip
final String drillBitConnection = "10.10.100.161:5000,10.10.100.162";
final List<CoordinationProtos.DrillbitEndpoint> endpointsList = DrillClient.parseAndVerifyEndpoints
(drillBitConnection, config.getString(ExecConstants.INITIAL_USER_PORT));
assertEquals(endpointsList.size(), 2);

CoordinationProtos.DrillbitEndpoint endpoint = endpointsList.get(0);
assertEquals(endpoint.getAddress(), "10.10.100.161");
assertEquals(endpoint.getUserPort(), 5000);

endpoint = endpointsList.get(1);
assertEquals(endpoint.getAddress(), "10.10.100.162");
assertEquals(endpoint.getUserPort(), config.getInt(ExecConstants.INITIAL_USER_PORT));
}

@Test
public void testParseAndVerifyEndpointsEmptyString() throws Exception {

// Test with empty string
final String drillBitConnection = "";
try {
final List<CoordinationProtos.DrillbitEndpoint> endpointsList = DrillClient.parseAndVerifyEndpoints
(drillBitConnection, config.getString(ExecConstants.INITIAL_USER_PORT));
fail();
} catch (InvalidConnectionInfoException e) {
System.out.println(e.getMessage());
}
}

@Test
public void testParseAndVerifyEndpointsOnlyPortDelim() throws Exception{
// Test to check when connection string only has delimiter
final String drillBitConnection = ":";

try {
final List<CoordinationProtos.DrillbitEndpoint> endpointsList = DrillClient.parseAndVerifyEndpoints
(drillBitConnection, config.getString(ExecConstants.INITIAL_USER_PORT));
fail();
} catch (InvalidConnectionInfoException e) {
System.out.println(e.getMessage());
}
}

@Test
public void testParseAndVerifyEndpointsWithOnlyPort() throws Exception{
// Test to check when connection string has port with no ip
final String drillBitConnection = ":5000";

try {
final List<CoordinationProtos.DrillbitEndpoint> endpointsList = DrillClient.parseAndVerifyEndpoints
(drillBitConnection, config.getString(ExecConstants.INITIAL_USER_PORT));
fail();
} catch (InvalidConnectionInfoException e) {
System.out.println(e.getMessage());
}
}

@Test
public void testParseAndVerifyEndpointsWithMultiplePort() throws Exception{
// Test to check when connection string has multiple port with one ip
final String drillBitConnection = "10.10.100.161:5000:6000";

try {
final List<CoordinationProtos.DrillbitEndpoint> endpointsList = DrillClient.parseAndVerifyEndpoints
(drillBitConnection, config.getString(ExecConstants.INITIAL_USER_PORT));
fail();
} catch (InvalidConnectionInfoException e) {
System.out.println(e.getMessage());
}
}

@Test
public void testParseAndVerifyEndpointsIpWithDelim() throws Exception{
// Test to check when connection string has ip with delimiter
final String drillBitConnection = "10.10.100.161:";
final List<CoordinationProtos.DrillbitEndpoint> endpointsList = DrillClient.parseAndVerifyEndpoints
(drillBitConnection, config.getString(ExecConstants.INITIAL_USER_PORT));
final CoordinationProtos.DrillbitEndpoint endpoint = endpointsList.get(0);
assertEquals(endpointsList.size(), 1);
assertEquals(endpoint.getAddress(), "10.10.100.161");
assertEquals(endpoint.getUserPort(), config.getInt(ExecConstants.INITIAL_USER_PORT));
}

@Test
public void testParseAndVerifyEndpointsIpWithEmptyPort() throws Exception{
// Test to check when connection string has ip with delimiter
final String drillBitConnection = "10.10.100.161: ";
final List<CoordinationProtos.DrillbitEndpoint> endpointsList = DrillClient.parseAndVerifyEndpoints
(drillBitConnection, config.getString(ExecConstants.INITIAL_USER_PORT));
final CoordinationProtos.DrillbitEndpoint endpoint = endpointsList.get(0);
assertEquals(endpointsList.size(), 1);
assertEquals(endpoint.getAddress(), "10.10.100.161");
assertEquals(endpoint.getUserPort(), config.getInt(ExecConstants.INITIAL_USER_PORT));
}

@Test
public void testParseAndVerifyEndpointsIpWithSpaces() throws Exception{
// Test to check when connection string has spaces in between
final String drillBitConnection = "10.10.100.161 : 5000, 10.10.100.162:6000 ";
final List<CoordinationProtos.DrillbitEndpoint> endpointsList = DrillClient.parseAndVerifyEndpoints
(drillBitConnection, config.getString(ExecConstants.INITIAL_USER_PORT));

CoordinationProtos.DrillbitEndpoint endpoint = endpointsList.get(0);
assertEquals(endpointsList.size(), 2);
assertEquals(endpoint.getAddress(), "10.10.100.161");
assertEquals(endpoint.getUserPort(), 5000);

endpoint = endpointsList.get(1);
assertEquals(endpoint.getAddress(), "10.10.100.162");
assertEquals(endpoint.getUserPort(), 6000);
}

@Test
public void testParseAndVerifyEndpointsStringWithSpaces() throws Exception{
// Test to check when connection string has ip with delimiter
final String drillBitConnection = "10.10.100.161 : 5000";
final List<CoordinationProtos.DrillbitEndpoint> endpointsList = DrillClient.parseAndVerifyEndpoints
(drillBitConnection, config.getString(ExecConstants.INITIAL_USER_PORT));
final CoordinationProtos.DrillbitEndpoint endpoint = endpointsList.get(0);
assertEquals(endpointsList.size(), 1);
assertEquals(endpoint.getAddress(), "10.10.100.161");
assertEquals(endpoint.getUserPort(), 5000);
}

@Test
public void testParseAndVerifyEndpointsNonNumericPort() throws Exception{
// Test to check when connection string has non-numeric port
final String drillBitConnection = "10.10.100.161:5ab0";

try{
final List<CoordinationProtos.DrillbitEndpoint> endpointsList = DrillClient.parseAndVerifyEndpoints
(drillBitConnection, config.getString(ExecConstants.INITIAL_USER_PORT));
fail();
} catch (InvalidConnectionInfoException e) {
System.out.println(e.getMessage());
}
}

@Test
public void testParseAndVerifyEndpointsOnlyDelim() throws Exception{
// Test to check when connection string has only delimiter coma
final String drillBitConnection = " , ";

try{
final List<CoordinationProtos.DrillbitEndpoint> endpointsList = DrillClient.parseAndVerifyEndpoints
(drillBitConnection, config.getString(ExecConstants.INITIAL_USER_PORT));
fail();
} catch (InvalidConnectionInfoException e) {
System.out.println(e.getMessage());
}
}
}

0 comments on commit 351dea6

Please sign in to comment.