Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JoinQuery #4118

Closed
wants to merge 20 commits into from
Closed

Add JoinQuery #4118

wants to merge 20 commits into from

Conversation

jihoonson
Copy link
Contributor

@jihoonson jihoonson commented Mar 26, 2017

Second patch for #4032.

Here are the highlights of the changes.

  • Extended Query to be able to have multiple data sources.
  • Added JoinQuery
  • Added a new method annotateDistributionTarget to QueryToolChest. CachingClusteredClient can figure out which data source is the target of query distribution when choosing nodes for query processing.
  • DimensionSpec is changed to have an optional dataSource name field. Another option for this is to force the name of DimensionSpec to always be prefixed with its dataSource name like 'foo.dim1'. I think this is error-prone, and thus the former option is better.
  • Currently, metrics are represented as a list of simple strings, so they must be forced to be prefixed their dataSource names. However, in the future, I think we need a new data structure for metrics like DimensionSpec, or extend DimensionSpec to cover metrics as well. (It's possible because it now supports long and double columns.)

This change is Reviewable

@jihoonson jihoonson changed the title Extend Query to be able to have multiple data sources and add JoinQuery Add JoinQuery Mar 26, 2017
@weijietong
Copy link

weijietong commented Mar 27, 2017

It's good to define a SingleSourceBaseQuery and let Query interface support multiple DataSources !

@fjy fjy added the Feature label Mar 27, 2017
@fjy fjy added this to the 0.10.1 milestone Mar 27, 2017
@jon-wei jon-wei self-requested a review April 3, 2017 21:54
Copy link
Member

@leventov leventov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review up to TimewarpOperator.java


public BaseQuery(
DataSource dataSource,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't change public API of BaseQuery.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change is inevitable because a query can now involve multiple data sources. If you're concerned with the compatibility with existing user-defined queries, I added SingleSourceBaseQuery for them which keeps the original APIs of BaseQuery.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will break source compatibility anyway.

Instead of changing BaseQuery and adding SingleSourceBaseQuery, maybe leave BaseQuery compatible and add "MultiSourceBaseQuery"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I reverted BaseQuery and added MultiSourceBaseQuery.

public Sequence<T> run(QueryRunner<T> runner, Map<String, Object> context)
{
return runner.run(this, context);
}

@Override
public List<Interval> getIntervals()
public static Duration initDuration(QuerySegmentSpec querySegmentSpec)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method name is unclear, I don't see connection with what this method actually does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

@@ -61,7 +62,11 @@ public BySegmentQueryRunner(
new BySegmentResultValueClass<T>(
results,
segmentIdentifier,
query.getIntervals().get(0)
Iterables.getOnlyElement(
Iterables.getOnlyElement(query.getDataSources())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query should still have a method getDataSource(), which fails if there are multiple data sources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you tell me why you think so? I think every query should be regarded to have one or more data sources basically.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For convenience, you had to add a lot of boilerplate, effectively emulating the behaviour which I suggested, because Iterables.getOnlyElement() throws exception if there are more than one element.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think getDataSources() is mostly used internally like ServerManager or QueryManager, and developers should keep in mind that a query can have multiple data sources when they modify the codes where do something with data sources.

There are some exceptions like BySegmentQueryRunner, SinkQuerySegmentWalker, and SpecificSegmentQueryRunner which expect a query must have a single data source. I think this will be rare, and would like to keep the current implementation.

default String getConcatenatedName()
{
final List<String> names = getNames();
return names.size() > 1 ? names.toString() : names.get(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getFirstName() considers empty getNames(), getConcatenatedName() doesn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


public static String getMetricName(Iterable<DataSourceWithSegmentSpec> dataSources)
{
return StreamSupport.stream(dataSources.spliterator(), false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that Iterables.toString() does this, however it adds spaces after commas.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Changed.


Query<T> replaceQuerySegmentSpecWith(DataSource dataSource, QuerySegmentSpec spec);

Query<T> replaceQuerySegmentSpecWith(String dataSource, QuerySegmentSpec spec);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

@@ -24,4 +24,5 @@
public static final String PRIORITY = "priority";
public static final String TIMEOUT = "timeout";
public static final String CHUNK_PERIOD = "chunkPeriod";
public static final String DIST_TARGET_SOURCE = "distTargetSource";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested "DISTRIBUTION_TARGET_SOURCE", I don't see why it should be abbreviated. Same for the String value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

final Iterable<DataSourceWithSegmentSpec> sourceSpecs = query.getDataSources();
return StreamSupport.stream(sourceSpecs.spliterator(), false)
.flatMap(spec -> spec.getDataSource().getNames().stream())
.collect(Collectors.toList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there are duplicates in this stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since every data source in druid has a unique name, there is only one case if there are duplicated names in this stream. That is, the same data source appears multiple times in the query's data sources like in self-join queries. In this case, that data source's name should be included multiple times in the result.


/**
* Wraps a QueryRunner. The output QueryRunner must contain the query distribution information
* required by CachingClusteredClient in its context. The query distribution information represents that
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make Javadoc link

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to do, but can't due to dependency problem.

(Map<String, List<SegmentDescriptor>>) responseContext.computeIfAbsent(
Result.MISSING_SEGMENTS_KEY, k -> new HashMap<>()
);
missingSegments.putAll(segmentDescMap);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge value lists instead of replacing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because every missing segments must be reported via responseContext.

@jihoonson
Copy link
Contributor Author

@leventov thanks for your review. I addressed your comments.

@jihoonson
Copy link
Contributor Author

I don't understand why travis failed. Another travis test succeeded. Would anyone restart test please?

@gianm
Copy link
Contributor

gianm commented Apr 14, 2017

I just did. You can also get travis to run again by closing and re-opening your PR.

@jihoonson
Copy link
Contributor Author

@gianm thanks. I realized some codes of master branch causes the test failure. I'll fix it soon.

@jihoonson jihoonson closed this Apr 25, 2017
@jihoonson jihoonson reopened this Apr 25, 2017
@jihoonson
Copy link
Contributor Author

Reopened this pr due to a travis failure. Also, raised an issue for the failure investigation.

import java.util.Map;
import java.util.concurrent.TimeUnit;

public abstract class AbstractQueryMetrics<QueryType extends Query<?>> implements QueryMetrics<QueryType>
Copy link
Member

@leventov leventov Apr 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm against fragmentation of QueryMetrics implementations. The other day some other query type added or changed that will require to generify some of existing QueryMetrics methods, and neither AbstractQueryMetrics nor DefaultQueryMetrics will help.

I suggest to remove dataSource(), interval() and duration() methods from QueryMetrics and instead add a single method dataSourcesAndIntervalsAndDurations().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed above, intervals are not quite useful. I added dataSourcesAndDurations() and intervals() as two separate methods, and intervals are not included in JoinQueryMetrics by default.

{
/**
* Creates a {@link QueryMetrics} for query, which doesn't have predefined QueryMetrics subclass. This method must
* call {@link QueryMetrics#query(Query)} with the given query on the created QueryMetrics object before returning.
*/
QueryMetrics<Query<?>> makeMetrics(Query<?> query);
QueryMetrics<QueryType> makeMetrics(QueryType query);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't make sense, GenericQueryMetricsFactory is not a "generic base" for other QueryMetricsFactories, it is a query metrics factory specifically for "any" queries. It must be able to accept any query type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right. My bad.

* Sets {@link Query#getDuration()} of the given query as dimension.
*/
void duration(QueryType query);
// /**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@Override
public void intervals(JoinQuery query)
{
builder.setDimension(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the comment: #4118 you said intervals are not included by default, but they are included here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant, intervals() is not called in DefaultJoinQueryMetrics.query().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's contrary to the contract of QueryMetrics, which says that it calls all methods of "the first type" (with Query parameter, extracting something from it) from query() method. So intervals() should be called from query(), but it's body should be empty by in DefaultJoinQueryMetrics.

{
builder.setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource()));
}
// /**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

{
builder.setDimension(
"dataSourcesAndDurations",
DataSourceUtil.getMetricName(query.getDataSources())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should emit list of values, using setDimension(String, String[]). Also the dimension is called "dataSourcesAndDurations", but only data source names are emitted. Also if this change is done, getMetricName() method name will become confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Removed getMetricName(List<DataSourceWithSegmentSpec>).

@@ -45,47 +45,64 @@ public DefaultQueryMetrics(ObjectMapper jsonMapper)
@Override
public void query(QueryType query)
{
dataSource(query);
// dataSource(query);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented lines should be removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

hasFilters(query);
duration(query);
// duration(query);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

// @Override
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

@leventov leventov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review until JoinQuery.java


Query<T> withDataSource(DataSource dataSource);
Query<T> replaceDataSourceWith(DataSource src, DataSource dst);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this method accepts the old data source, I think it shouldn't have "With" suffix, just "replaceDataSource". Also I would call parameters "oldDataSource" and "newDataSource"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

)
)
);
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could fall through and have only one return baseRunner.run(query, responseContext); statement in this method

@@ -41,31 +41,36 @@ public UnionQueryRunner(
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) {
if (query instanceof BaseQuery) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why it doesn't apply for multi data source queries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The processing part of multi data source queries is not considered in this patch and will be in a follow-up pr. This method should be fixed to support multi data source queries. I changed to throw an exception if the query is not BaseQuery.

query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval))),
query.withQuerySegmentSpec(
spec.getDataSource(),
new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer singletonList

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -64,17 +64,20 @@ public DimensionSpec apply(String input)
}

private static final byte CACHE_TYPE_ID = 0x0;
private final String dataSourceName;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this field is not a part of DimensionSpec, it's better if it goes last in the list of field and constructor parameters, rather than first.

@JsonProperty("dimension") DimensionSpec dimension
)
{
this.dimension = dimension;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requireNonNull

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.


import java.util.Objects;

public class DimExtractPredicate implements JoinPredicate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add class comment and explain the meaning of this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this class simply represents a dimension in join predicates. For example, given a sql SELECT count(*) from t1 JOIN t2 ON t1.bar = t2.bar, t1.bar is a DimExtractPredicate. Maybe DimensionPredicate is more appropriate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add this as a comment to the class.

{
default JoinPredicate visit(AndPredicate predicate)
{
for (JoinPredicate eachPredicate: predicate.getPredicates()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be predicate.getPredicates().forEach(p -> p.accept(this));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem to be done


default JoinPredicate visit(OrPredicate predicate)
{
for (JoinPredicate eachPredicate: predicate.getPredicates()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same


package io.druid.query.join;

public interface JoinPredicateVisitor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no implementations committed so hard to tell, but doesn't seem useful to make all methods default. Also they all return the parameter as return value, that is seems pointless

Copy link
Contributor Author

@jihoonson jihoonson May 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose interface because it doesn't have any variables and its methods can be overridden according to callers' purpose. The return value is useful when rewriting predicates. Please refer to JoinSpecVisitor.

@leventov leventov modified the milestones: 0.11.0, 0.10.1 Apr 28, 2017
@leventov
Copy link
Member

Since this PR breaks compatibility of Query interface, it couldn't be released in 0.10.x. Changed milestone to 0.11.0

@@ -186,6 +215,7 @@ public boolean equals(Object o)
public int hashCode()
{
int result = dimension != null ? dimension.hashCode() : 0;
result = 31 * result + (dataSourceName != null ? dataSourceName.hashCode() : 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow the same order in fields, toString, hashCode and equals

private final String dimension;
private final String outputName;
private final ValueType outputType;

@JsonCreator
public DefaultDimensionSpec(
@JsonProperty("dataSource") String dataSourceName,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test where it demonstrated that old JSON is successfully deserialized?

private final String dimension;
private final ExtractionFn extractionFn;
private final String outputName;
private final ValueType outputType;

@JsonCreator
public ExtractionDimensionSpec(
@JsonProperty("dataSource") String dataSourceName,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test where it it demonstrated that old JSON is successfully deserialized

@@ -173,6 +187,7 @@ public boolean equals(Object o)
public int hashCode()
{
int result = dimension != null ? dimension.hashCode() : 0;
result = 31 * result + (dataSourceName != null ? dataSourceName.hashCode() : 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow the same order

@@ -169,6 +195,9 @@ public boolean equals(Object o)

DefaultDimensionSpec that = (DefaultDimensionSpec) o;

if (dataSourceName != null ? !dataSourceName.equals(that.dataSourceName) : that.dataSourceName != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Objects.equals()

{
private final DataSource dataSource;
private final QuerySegmentSpec querySegmentSpec;
private volatile Duration duration;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's safe to make it non-volatile because Joda-time ensures "final" semantics for it's basic immutable classes: http://cs.oswego.edu/pipermail/concurrency-interest/2011-June/007976.html

@Override
public void intervals(JoinQuery query)
{
builder.setDimension(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's contrary to the contract of QueryMetrics, which says that it calls all methods of "the first type" (with Query parameter, extracting something from it) from query() method. So intervals() should be called from query(), but it's body should be empty by in DefaultJoinQueryMetrics.

@Override
public void numDataSources(JoinQuery query)
{
builder.setDimension("numDataSources", String.valueOf(query.getDataSources().size()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to emit this from dataSourcesAndDurations() and not having separate "numDataSources" method. The idea of dataSourcesAndDurations() is "emit everything related to data sources and durations from this query object with whatever detailization you want".

{
default JoinPredicate visit(AndPredicate predicate)
{
for (JoinPredicate eachPredicate: predicate.getPredicates()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem to be done


default JoinPredicate visit(OrPredicate predicate)
{
for (JoinPredicate eachPredicate: predicate.getPredicates()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

@Override
public Sequence<T> run(QuerySegmentWalker walker, Map<String, Object> context)
{
return run(getDistributionTarget().getQuerySegmentSpec().lookup(this, walker), context);
Copy link

@janpychou janpychou Aug 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MultiSourceBaseQuery should override method getDistributionTarget(), otherwise NullPointerException will be thrown.

@leventov
Copy link
Member

leventov commented Sep 9, 2017

@jihoonson do you plan to continue to work on this issue?

@jihoonson
Copy link
Contributor Author

@leventov yes, sorry for the delay. However, I'm currently working on #4479 and I can do after that issue is finished first. I don't want to block others from working on this issue just for me. If anyone is interested in this issue, please go ahead. Also, I'll try to finish #4479 as soon as possible and come back to this issue if it's still opened.

@jon-wei jon-wei modified the milestones: 0.11.0, 0.11.1 Sep 20, 2017
@jon-wei jon-wei modified the milestones: 0.12.0, 0.13.0 Jan 9, 2018
@gianm gianm removed this from the 0.13.0 milestone Mar 14, 2018
@jihoonson
Copy link
Contributor Author

I'm closing this PR now because I couldn't spend much time for this issue for a while and finally it has gone too stale. Also, probably there's a better way to not modify too many classes. I'll think about it and make another PR later.

@jihoonson jihoonson closed this Oct 17, 2018
@gianm gianm mentioned this pull request Oct 24, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants