Skip to content

Commit

Permalink
DRILL-1752: Fixing query failures of unsharded db/collection in shard…
Browse files Browse the repository at this point in the history
…ed cluster

- Add logback.xml in test/resources
- Add testcases and upgrading pom version of mongo and mongo test dependency
  • Loading branch information
kameshb authored and jacques-n committed Nov 2, 2015
1 parent fafde48 commit 454b499
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 33 deletions.
4 changes: 2 additions & 2 deletions contrib/storage-mongo/pom.xml
Expand Up @@ -43,7 +43,7 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.0.2</version>
<version>3.1.0</version>
</dependency>

<!-- Test dependencie -->
Expand All @@ -70,7 +70,7 @@
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
<version>1.48.0</version>
<version>1.50.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Expand Up @@ -42,4 +42,10 @@ public interface DrillMongoConstants {
public static final String MIN = "min";

public static final String MAX = "max";

public static final String PARTITIONED = "partitioned";

public static final String PRIMARY = "primary";

public static final String DATABASES = "databases";
}
Expand Up @@ -47,6 +47,7 @@
import org.apache.drill.exec.store.mongo.MongoSubScan.MongoSubScanSpec;
import org.apache.drill.exec.store.mongo.common.ChunkInfo;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.MaxKey;
import org.bson.types.MinKey;
import org.slf4j.Logger;
Expand All @@ -59,6 +60,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -201,6 +203,7 @@ private void init() throws IOException {
projection = new Document();
projection.put(HOST, select);

boolean hasChunks = false;
while (iterator.hasNext()) {
Document chunkObj = iterator.next();
String shardName = (String) chunkObj.get(SHARD);
Expand Down Expand Up @@ -260,30 +263,62 @@ private void init() throws IOException {
chunkInfo.setMaxFilters(maxFilters);
chunkList.add(chunkInfo);
}
hasChunks = true;
}
} else {
String chunkName = scanSpec.getDbName() + "."
+ scanSpec.getCollectionName();
List<String> hosts = storagePluginConfig.getHosts();
Set<ServerAddress> addressList = getPreferredHosts(client, hosts);
if (addressList == null) {
addressList = Sets.newHashSet();
for (String host : hosts) {
addressList.add(new ServerAddress(host));
}
// In a sharded environment, if a collection doesn't have any chunks, it is considered as an
// unsharded collection and it will be stored in the primary shard of that database.
if (!hasChunks) {
handleUnshardedCollection(getPrimaryShardInfo(client));
}
chunksMapping.put(chunkName, addressList);

String host = hosts.get(0);
ServerAddress address = new ServerAddress(host);
ChunkInfo chunkInfo = new ChunkInfo(hosts, chunkName);
chunkInfo.setMinFilters(Collections.<String, Object> emptyMap());
chunkInfo.setMaxFilters(Collections.<String, Object> emptyMap());
List<ChunkInfo> chunksList = Lists.newArrayList();
chunksList.add(chunkInfo);
chunksInverseMapping.put(address.getHost(), chunksList);
} else {
handleUnshardedCollection(storagePluginConfig.getHosts());
}

}

private void handleUnshardedCollection(List<String> hosts) {
String chunkName = Joiner.on('.').join(scanSpec.getDbName(), scanSpec.getCollectionName());
Set<ServerAddress> addressList = Sets.newHashSet();

for (String host : hosts) {
addressList.add(new ServerAddress(host));
}
chunksMapping.put(chunkName, addressList);

String host = hosts.get(0);
ServerAddress address = new ServerAddress(host);
ChunkInfo chunkInfo = new ChunkInfo(hosts, chunkName);
chunkInfo.setMinFilters(Collections.<String, Object> emptyMap());
chunkInfo.setMaxFilters(Collections.<String, Object> emptyMap());
List<ChunkInfo> chunksList = Lists.newArrayList();
chunksList.add(chunkInfo);
chunksInverseMapping.put(address.getHost(), chunksList);
}

private List<String> getPrimaryShardInfo(MongoClient client) {
MongoDatabase database = storagePlugin.getClient().getDatabase(CONFIG);
//Identify the primary shard of the queried database.
MongoCollection<Document> collection = database.getCollection(DATABASES);
Bson filter = new Document(ID, this.scanSpec.getDbName());
Bson projection = new Document(PRIMARY, select);
Document document = collection.find(filter).projection(projection).first();
Preconditions.checkNotNull(document);
String shardName = document.getString(PRIMARY);
Preconditions.checkNotNull(shardName);

//Identify the host(s) on which this shard resides.
MongoCollection<Document> shardsCol = database.getCollection(SHARDS);
filter = new Document(ID, shardName);
projection = new Document(HOST, select);
Document hostInfo = shardsCol.find(filter).projection(projection).first();
Preconditions.checkNotNull(hostInfo);
String hostEntry = hostInfo.getString(HOST);
Preconditions.checkNotNull(hostEntry);

String[] tagAndHost = StringUtils.split(hostEntry, '/');
String[] hosts = tagAndHost.length > 1 ? StringUtils.split(tagAndHost[1],
',') : StringUtils.split(tagAndHost[0], ',');
return Lists.newArrayList(hosts);
}

@SuppressWarnings("unchecked")
Expand Down
Expand Up @@ -32,11 +32,8 @@ public interface MongoTestConstants {

public static final int MONGOS_PORT = 27017;

public static final String CONFIG_DB = "config";
public static final String ADMIN_DB = "admin";

public static final String TEST_DB = "testDB";
public static final String EMPLOYEE_DB = "employee";
public static final String DONUTS_DB = "donuts";

public static final String DONUTS_COLLECTION = "donuts";
public static final String EMPINFO_COLLECTION = "empinfo";
Expand Down Expand Up @@ -70,4 +67,7 @@ public interface MongoTestConstants {
public static final String TEST_BOOLEAN_FILTER_QUERY_TEMPLATE3 = "select `employee_id` from mongo.%s.`%s` where position_id = 16 and isFTE = true";
public static final String TEST_BOOLEAN_FILTER_QUERY_TEMPLATE4 = "select `employee_id` from mongo.%s.`%s` where (position_id = 16 and isFTE = true) or last_name = 'Yonce'";

public static final String TEST_STAR_QUERY_UNSHARDED_DB = "select * from mongo.%s.`%s`";
public static final String TEST_STAR_QUERY_UNSHARDED_DB_PROJECT_FILTER = "select t.name as name,t.topping.type as type from mongo.%s.`%s` t where t.sales >= 150";
public static final String TEST_STAR_QUERY_UNSHARDED_DB_GROUP_PROJECT_FILTER = "select t.topping.type as type,count(t.topping.type) as typeCount from mongo.%s.`%s` t group by t.topping.type order by typeCount";
}
Expand Up @@ -107,14 +107,14 @@ private static void setup() throws Exception {
mongosTestFactory = new MongosSystemForTestFactory(mongosConfig,
replicaSets, configServers, EMPLOYEE_DB, EMPINFO_COLLECTION,
"employee_id");

try {
mongosTestFactory.start();
mongoClient = (MongoClient) mongosTestFactory.getMongo();
} catch (Throwable e) {
logger.error(" Error while starting shrded cluster. ", e);
throw new Exception(" Error while starting shrded cluster. ", e);
}
createDbAndCollections(DONUTS_DB, DONUTS_COLLECTION, "id");
}

private static IMongodConfig crateConfigServerConfig(int configServerPort,
Expand Down Expand Up @@ -194,7 +194,7 @@ private static void cleanup() {
mongod.stop();
}
if (mongodExecutable != null) {
mongodExecutable.cleanup();
mongodExecutable.stop();
}
}
}
Expand All @@ -211,6 +211,7 @@ public static void initMongo() throws Exception {
}
TestTableGenerator.importData(EMPLOYEE_DB, EMPINFO_COLLECTION, EMP_DATA);
TestTableGenerator.importData(EMPLOYEE_DB, SCHEMA_CHANGE_COLLECTION, SCHEMA_CHANGE_DATA);
TestTableGenerator.importData(DONUTS_DB, DONUTS_COLLECTION, DONUTS_DATA);
}
}

Expand All @@ -232,7 +233,6 @@ private static void createDbAndCollections(String dbName,
@AfterClass
public static void tearDownCluster() throws Exception {
if (mongoClient != null) {
mongoClient.dropDatabase(TEST_DB);
mongoClient.dropDatabase(EMPLOYEE_DB);
mongoClient.close();
}
Expand Down
Expand Up @@ -35,8 +35,6 @@ public void testFilterPushDownLessThanWithSingleField() throws Exception {
String queryString = String.format(
TEST_FILTER_PUSH_DOWN_LESS_THAN_QUERY_TEMPLATE_1, EMPLOYEE_DB,
EMPINFO_COLLECTION);
System.out.println("testFilterPushDownLessThanWithSingleField Query: "
+ queryString);
String expectedExpr = "\"$lt\" : 52.17";
testHelper(queryString, expectedExpr, 9);
}
Expand All @@ -46,8 +44,6 @@ public void testFilterPushDownGreaterThanWithSingleField() throws Exception {
String queryString = String.format(
TEST_FILTER_PUSH_DOWN_GREATER_THAN_QUERY_TEMPLATE_1, EMPLOYEE_DB,
EMPINFO_COLLECTION);
System.out.println("testFilterPushDownGreaterThanWithSingleField Query: "
+ queryString);
String expectedExpr = "\"$gt\" : 52.17";
testHelper(queryString, expectedExpr, 9);
}
Expand Down
Expand Up @@ -52,4 +52,24 @@ public void testResultCount() throws Exception {
runMongoSQLVerifyCount(queryString, 5);
}

@Test
public void testUnShardedDBInShardedCluster() throws Exception {
String queryString = String.format(TEST_STAR_QUERY_UNSHARDED_DB,
DONUTS_DB, DONUTS_COLLECTION);
runMongoSQLVerifyCount(queryString, 5);
}

@Test
public void testUnShardedDBInShardedClusterWithProjectionAndFilter() throws Exception {
String queryString = String.format(TEST_STAR_QUERY_UNSHARDED_DB_PROJECT_FILTER,
DONUTS_DB, DONUTS_COLLECTION);
runMongoSQLVerifyCount(queryString, 2);
}

@Test
public void testUnShardedDBInShardedClusterWithGroupByProjectionAndFilter() throws Exception {
String queryString = String.format(TEST_STAR_QUERY_UNSHARDED_DB_GROUP_PROJECT_FILTER,
DONUTS_DB, DONUTS_COLLECTION);
runMongoSQLVerifyCount(queryString, 5);
}
}
59 changes: 59 additions & 0 deletions contrib/storage-mongo/src/test/resources/logback.xml
@@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<timestamp key="bySecond" datePattern="yyyyMMdd'T'HHmmss"/>

<appender name="SOCKET"
class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender">
<Compressing>true</Compressing>
<ReconnectionDelay>10000</ReconnectionDelay>
<IncludeCallerData>true</IncludeCallerData>
<RemoteHosts>${LILITH_HOSTNAME:-localhost}</RemoteHosts>
</appender>

<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<!-- The property 'logback.log.dir' is defined in pom.xml -->
<file>${logback.log.dir:-./target/surefire-reports}/mongo-tests-${bySecond}.log</file>
<append>false</append>
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<logger name="org.apache.drill" additivity="false">
<level value="info" />
<appender-ref ref="FILE" />
</logger>

<logger name="org.apache.drill" additivity="false">
<level value="debug" />
<appender-ref ref="SOCKET" />
</logger>

<root>
<level value="error" />
<appender-ref ref="STDOUT" />
</root>

</configuration>

0 comments on commit 454b499

Please sign in to comment.