Skip to content

Commit

Permalink
Some changes from the review.
Browse files Browse the repository at this point in the history
  • Loading branch information
akkapur committed Jun 22, 2020
1 parent e0f8665 commit 967ed22
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 50 deletions.
2 changes: 1 addition & 1 deletion contrib/storage-druid/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Filters pushed down to native druid filter structure, converting SQL where claus

### Plugin Registration

The plugin can be register in Apache Drill using the drill web interface by navigating to the ```storage``` page.
The plugin can be registered in Apache Drill using the drill web interface by navigating to the ```storage``` page.
Following is the default registration configuration.
```json
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,22 +152,17 @@ public DruidScanSpec visitFunctionCall(FunctionCall call, Void value)
DruidCompareFunctionProcessor processor = DruidCompareFunctionProcessor
.process(call);
if (processor.isSuccess()) {
try {
DruidScanSpec scanSpec = groupScan.getScanSpec();
nodeScanSpec =
druidScanSpecBuilder
.build(scanSpec.getDataSourceName(),
scanSpec.getDataSourceSize(),
scanSpec.getDataSourceMinTime(),
scanSpec.getDataSourceMaxTime(),
processor.getFunctionName(),
processor.getPath(),
processor.getValue()
);
} catch (Exception e) {
logger.error("Failed to create Filter ", e);
throw new RuntimeException(e.getMessage(), e);
}
DruidScanSpec scanSpec = groupScan.getScanSpec();
nodeScanSpec =
druidScanSpecBuilder
.build(scanSpec.getDataSourceName(),
scanSpec.getDataSourceSize(),
scanSpec.getDataSourceMinTime(),
scanSpec.getDataSourceMaxTime(),
processor.getFunctionName(),
processor.getPath(),
processor.getValue()
);
}
} else {
switch (functionName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
Expand Down Expand Up @@ -66,6 +66,7 @@ public class DruidRecordReader extends AbstractRecordReader {
private VectorContainerWriter writer;

private final FragmentContext fragmentContext;
private final DruidQueryClient druidQueryClient;

public DruidRecordReader(DruidSubScan.DruidSubScanSpec subScanSpec,
List<SchemaPath> projectedColumns,
Expand All @@ -79,6 +80,7 @@ public DruidRecordReader(DruidSubScan.DruidSubScanSpec subScanSpec,
scanSpec = subScanSpec;
fragmentContext = context;
this.filter = subScanSpec.getFilter();
this.druidQueryClient = plugin.getDruidQueryClient();
}

@Override
Expand Down Expand Up @@ -111,10 +113,10 @@ public void setup(OperatorContext context, OutputMutator output) {
public int next() {
writer.allocate();
writer.reset();
DruidQueryClient druidQueryClient = plugin.getDruidQueryClient();
Stopwatch watch = Stopwatch.createStarted();
try {
DruidSelectResponse druidSelectResponse = druidQueryClient.executeQuery(getQuery());
String query = getQuery();
DruidSelectResponse druidSelectResponse = druidQueryClient.executeQuery(query);
setNextPagingIdentifiers(druidSelectResponse);

int docCount = 0;
Expand All @@ -124,9 +126,13 @@ public int next() {
try {
jsonReader.write(writer);
} catch (IOException e) {
String msg = "Failure while reading document. - Parser was at record: " + eventNode.toString();
logger.error(msg, e);
throw new DrillRuntimeException(msg, e);
throw UserException
.dataReadError(e)
.message("Failure while reading document")
.addContext("Failed Query", query)
.addContext("Parser was at record", eventNode.toString())
.addContext(e.getMessage())
.build(logger);
}
docCount++;
}
Expand All @@ -135,9 +141,11 @@ public int next() {
logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), docCount);
return docCount;
} catch (Exception e) {
String msg = "Failure while reading documents";
logger.error(msg, e);
throw new DrillRuntimeException(msg, e);
throw UserException
.dataReadError(e)
.message("Failure while executing druid query")
.addContext(e.getMessage())
.build(logger);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ public void setup() {
}

@Test
public void parse_tree_with_and_of_two_selector_filters() {
public void parseTreeWithAndOfTwoSelectorFilters() {
DruidScanSpec parsedSpec = druidFilterBuilder.parseTree();
String expectedFilterJson = "{\"type\":\"and\",\"fields\":[{\"type\":\"selector\",\"dimension\":\"some dimension\",\"value\":\"some value\"},{\"type\":\"selector\",\"dimension\":\"some other dimension\",\"value\":\"some other value\"}]}";
String actual = parsedSpec.getFilter().toJson();
assertThat(actual).isEqualTo(expectedFilterJson);
}

@Test
public void visit_boolean_operator_with_and_operator() {
public void visitBooleanOperatorWithAndOperator() {
LogicalExpression logicalExpression2 = mock(LogicalExpression.class);
try {
when(logicalExpression.accept(any(), any())).thenReturn(druidScanSpecLeft);
Expand All @@ -103,7 +103,7 @@ public void visit_boolean_operator_with_and_operator() {
}

@Test
public void visit_boolean_operator_with_or_operator() {
public void visitBooleanOperatorWithOrOperator() {
LogicalExpression logicalExpression2 = mock(LogicalExpression.class);
try {
when(logicalExpression.accept(any(), any())).thenReturn(druidScanSpecLeft);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void setup() {
}

@Test
public void build_called_with_equal_fx_should_build_selector_filter() {
public void buildCalledWithEqualFxShouldBuildSelectorFilter() {
SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
DruidScanSpec druidScanSpec =
druidScanSpecBuilder
Expand All @@ -59,7 +59,7 @@ public void build_called_with_equal_fx_should_build_selector_filter() {
}

@Test
public void build_called_with_equal_fx_but_interval_field_should_build_interval_filter() {
public void buildCalledWithEqualFxIntervalFieldShouldBuildIntervalFilter() {
SchemaPath schemaPath = SchemaPath.getSimplePath(DruidConstants.INTERVAL_DIMENSION_NAME);
DruidScanSpec druidScanSpec =
druidScanSpecBuilder.build(
Expand All @@ -75,7 +75,7 @@ public void build_called_with_equal_fx_but_interval_field_should_build_interval_
}

@Test
public void build_called_with_not_equal_fx_should_build_selector_filter() {
public void buildCalledWithNotEqualFxShouldBuildSelectorFilter() {
SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
DruidScanSpec druidScanSpec =
druidScanSpecBuilder.build(
Expand All @@ -91,7 +91,7 @@ public void build_called_with_not_equal_fx_should_build_selector_filter() {
}

@Test
public void build_called_with_greater_than_or_equal_to_fx_should_build_bound_filter() {
public void buildCalledWithGreaterThanOrEqualToFxShouldBuildBoundFilter() {
SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
DruidScanSpec druidScanSpec =
druidScanSpecBuilder.build(
Expand All @@ -108,7 +108,7 @@ public void build_called_with_greater_than_or_equal_to_fx_should_build_bound_fil
}

@Test
public void build_called_with_greater_than_fx_should_build_bound_filter() {
public void buildCalledWithGreaterThanFxShouldBuildBoundFilter() {
SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
DruidScanSpec druidScanSpec =
druidScanSpecBuilder.build(
Expand All @@ -125,7 +125,7 @@ public void build_called_with_greater_than_fx_should_build_bound_filter() {
}

@Test
public void build_called_with_greater_than_fx_and_numeric_value_should_build_bound_filter() {
public void buildCalledWithGreaterThanFxAndNumericValueShouldBuildBoundFilter() {
SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
DruidScanSpec druidScanSpec =
druidScanSpecBuilder.build(
Expand All @@ -142,7 +142,7 @@ public void build_called_with_greater_than_fx_and_numeric_value_should_build_bou
}

@Test
public void build_called_with_less_than_or_equal_to_fx_should_build_bound_filter() {
public void buildCalledWithLessThanOrEqualToFxShouldBuildBoundFilter() {
SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
DruidScanSpec druidScanSpec =
druidScanSpecBuilder.build(
Expand All @@ -158,7 +158,7 @@ public void build_called_with_less_than_or_equal_to_fx_should_build_bound_filter
}

@Test
public void build_called_with_less_than_fx_should_build_bound_filter() {
public void buildCalledWithLessThanFxShouldBuildBoundFilter() {
SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
DruidScanSpec druidScanSpec =
druidScanSpecBuilder.build(SOME_DATASOURCE_NAME,
Expand All @@ -173,7 +173,7 @@ public void build_called_with_less_than_fx_should_build_bound_filter() {
}

@Test
public void build_called_with_less_than_fx_and_numeric_value_should_build_bound_filter() {
public void buildCalledWithLessThanFxAndNumericValueShouldBuildBoundFilter() {
SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
DruidScanSpec druidScanSpec =
druidScanSpecBuilder.build(SOME_DATASOURCE_NAME,
Expand All @@ -188,7 +188,7 @@ public void build_called_with_less_than_fx_and_numeric_value_should_build_bound_
}

@Test
public void build_called_with_is_null_fx_should_build_selector_filter() {
public void buildCalledWithIsNullFxShouldBuildSelectorFilter() {
SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
DruidScanSpec druidScanSpec =
druidScanSpecBuilder.build(
Expand All @@ -204,7 +204,7 @@ public void build_called_with_is_null_fx_should_build_selector_filter() {
}

@Test
public void build_called_with_is_not_null_fx_should_build_selector_filter() {
public void buildCalledWithIsNotNullFxShouldBuildSelectorFilter() {
SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
DruidScanSpec druidScanSpec =
druidScanSpecBuilder.build(
Expand All @@ -220,7 +220,7 @@ public void build_called_with_is_not_null_fx_should_build_selector_filter() {
}

@Test
public void build_called_with_like_fx__but_if_value_is_prefixed_with_regex_keyword_hint_should_build_regex_filter() {
public void buildCalledWithLikeFxButIfValueIsPrefixedWithRegexKeywordHintShouldBuildRegexFilter() {
SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
DruidScanSpec druidScanSpec =
druidScanSpecBuilder
Expand All @@ -236,7 +236,7 @@ public void build_called_with_like_fx__but_if_value_is_prefixed_with_regex_keywo
}

@Test
public void build_called_with_like_fx__but_should_build_search_filter() {
public void buildCalledWithLikeFxShouldBuildSearchFilter() {
SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
DruidScanSpec druidScanSpec =
druidScanSpecBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
public class DruidStoragePluginConfigTest {

@Test
public void test_druid_storage_plugin_config_successfully_parsed()
public void testDruidStoragePluginConfigSuccessfullyParsed()
throws URISyntaxException, IOException {
ObjectMapper mapper = new ObjectMapper();
JsonNode storagePluginJson = mapper.readTree(new File(
Expand All @@ -47,7 +47,7 @@ public void test_druid_storage_plugin_config_successfully_parsed()
}

@Test
public void test_default_row_size_used_when_not_provided_in_config()
public void testDefaultRowSizeUsedWhenNotProvidedInConfig()
throws JsonProcessingException {
String druidConfigStr = "{\n" + " \"storage\":{\n" + " \"druid\" : {\n"
+ " \"type\" : \"druid\",\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public class DruidQueryClientTest {
private HttpEntity httpEntity;

private DruidQueryClient druidQueryClient;
private static String BROKER_URI = "some broker uri";
private static String QUERY = "some query";
private static Header ENCODING_HEADER =
private static final String BROKER_URI = "some broker uri";
private static final String QUERY = "some query";
private static final Header ENCODING_HEADER =
new BasicHeader(HttpHeaders.CONTENT_ENCODING, StandardCharsets.UTF_8.name());

@Before
Expand All @@ -77,14 +77,14 @@ public void setup() throws IOException {
}

@Test(expected=Exception.class)
public void executeQuery_called_druid_returns_non_200_should_throw_error()
public void executeQueryCalledDruidReturnsNon200ShouldThrowError()
throws Exception {
when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR);
druidQueryClient.executeQuery(QUERY);
}

@Test
public void executeQuery_called_no_responses_found_returns_empty_event_list()
public void executeQueryCalledNoResponsesFoundReturnsEmptyEventList()
throws Exception {
InputStream inputStream =
new ByteArrayInputStream("[]".getBytes(StandardCharsets.UTF_8.name()));
Expand All @@ -96,7 +96,7 @@ public void executeQuery_called_no_responses_found_returns_empty_event_list()
}

@Test
public void executeQuery_called_successfully_parse_query_results()
public void executeQueryCalledSuccessfullyParseQueryResults()
throws Exception {
String result = "[{\"result\":{\"pagingIdentifiers\":{\"some_segment_identifier\":500,\"some_other_segment_identifier\":501},\"events\":[{\"event\":{\"some_property\":\"some value\"}},{\"event\":{\"some_property\":\"some other value\"}}]}}]";
InputStream inputStream =
Expand Down

0 comments on commit 967ed22

Please sign in to comment.