Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,9 @@ private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final Task task = runningItem.getTask();

if (analysis.getBaseTableDataSource().isPresent()
&& task.getDataSource().equals(analysis.getBaseTableDataSource().get().getName())) {
if ((analysis.getBaseTableDataSource().isPresent()
&& task.getDataSource().equals(analysis.getBaseTableDataSource().get().getName()))
|| analysis.getBaseTableDataSourceNames().contains(task.getDataSource())) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);

if (taskQueryRunner != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.timeline.VersionedIntervalTimeline;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -97,12 +99,12 @@ public ServerManagerForQueryRetryTest(
}

@Override
<T> QueryRunner<T> buildQueryRunnerForSegment(
<T> List<QueryRunner<T>> buildQueryRunnersForSegment(
Query<T> query,
SegmentDescriptor descriptor,
QueryRunnerFactory<T, Query<T>> factory,
QueryToolChest<T, Query<T>> toolChest,
VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline,
List<VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines,
Function<SegmentReference, SegmentReference> segmentMapFn,
AtomicLong cpuTimeAccumulator
)
Expand All @@ -125,15 +127,15 @@ <T> QueryRunner<T> buildQueryRunnerForSegment(

if (isIgnoreSegment.isTrue()) {
LOG.info("Pretending I don't have segment[%s]", descriptor);
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
return Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor));
}
}
return super.buildQueryRunnerForSegment(
return super.buildQueryRunnersForSegment(
query,
descriptor,
factory,
toolChest,
timeline,
timelines,
segmentMapFn,
cpuTimeAccumulator
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public FluentQueryRunner applyPostMergeDecoration()

public FluentQueryRunner applyPreMergeDecoration()
{
return from(new UnionQueryRunner<>(toolChest.preMergeQueryDecoration(baseRunner)));
return from((toolChest.preMergeQueryDecoration(baseRunner)));
}

public FluentQueryRunner emitCPUTimeMetric(ServiceEmitter emitter)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query;

import org.apache.druid.timeline.Overshadowable;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.joda.time.Interval;

import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;

/**
* Represents a source of data for a query obtained from multiple base tables. Implementations of this interface
* must handle more than one table dataSource.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why not make TableDataSource an implementation of this interface, and give it a pretty basic retrieveSegmentsForIntervals method? Then, we know that anything that access tables directly is an implementation of this interface. It might make some of the logic in DataSourceAnalysis simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. Thanks

*/
public interface MultiTableDataSource extends DataSource
{
/**
* @param intervals The intervals to find the timeline objects for
* @param timelineMap Table dataSource names along with its corresponding timeline for a specific interval
* @param <ObjectType> Type of the overshadowable object handled by the timeline
* @return Map of table datasources mapped to their corresponding list of timeline objects which needs to be queried
*/
<ObjectType extends Overshadowable<ObjectType>> List<List<TimelineObjectHolder<String, ObjectType>>> retrieveSegmentsForIntervals(
List<Interval> intervals,
Map<String, TimelineLookup<String, ObjectType>> timelineMap,
BiFunction<Interval, TimelineLookup<String, ObjectType>, List<TimelineObjectHolder<String, ObjectType>>> biFunction
);

/**
* Returns the base table dataSources from which the data for a query is retrieved.
*/
List<TableDataSource> getDataSources();
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public static <T> Query<T> withSpecificSegments(final Query<T> query, final List
// Verify preconditions and invariants, just in case.
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(retVal.getDataSource());

if (!analysis.getBaseTableDataSource().isPresent()) {
if (analysis.getBaseTableDataSourceNames().isEmpty()) {
throw new ISE("Unable to apply specific segments to non-table-based dataSource[%s]", query.getDataSource());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,19 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.timeline.Overshadowable;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.joda.time.Interval;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

public class UnionDataSource implements DataSource
public class UnionDataSource implements MultiTableDataSource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, if we wanted to extend UnionDataSource to support unioning nontables (like, perhaps, unioning the results of queries) then do you see a good migration path to get there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, I would expect us to have a MultiTableDataSource implementation say UnionTableDataSource and the current UnionDataSource would go back to being a DataSource implementation that would start handling a list of DataSource objects.
Also, this would mean that we'd still have to keep the UnionQueryRunner to execute the nontable datasources and UnionTableDataSource separately and merge the results. Therefore for a union query with multiple tables and nontables, the tables would still be executed together under UnionTableDataSource and the non table datasources would be executed individually. There should be some refactoring within ClientQuerySegmentWalker in case we need to handle non tables that can be fulfilled with LocalQuerySegmentWalker.
Do you see any issues with the MultiTableDataSource design in this PR?

{
@JsonProperty
private final List<TableDataSource> dataSources;
Expand All @@ -51,6 +58,7 @@ public Set<String> getTableNames()
.collect(Collectors.toSet());
}

@Override
@JsonProperty
public List<TableDataSource> getDataSources()
{
Expand Down Expand Up @@ -79,6 +87,27 @@ public DataSource withChildren(List<DataSource> children)
);
}

@Override
public <ObjectType extends Overshadowable<ObjectType>> List<List<TimelineObjectHolder<String, ObjectType>>> retrieveSegmentsForIntervals(
List<Interval> intervals,
Map<String, TimelineLookup<String, ObjectType>> timelineMap,
BiFunction<Interval, TimelineLookup<String, ObjectType>, List<TimelineObjectHolder<String, ObjectType>>> biLookupFn
)
{
List<List<TimelineObjectHolder<String, ObjectType>>> segmentsList = new ArrayList<>();
for (String datasource : timelineMap.keySet()) {
List<TimelineObjectHolder<String, ObjectType>> dataSourceSegments = intervals.stream()
.flatMap(itvl -> biLookupFn.apply(
itvl,
timelineMap.get(datasource))
.stream())
.collect(Collectors.toList());
segmentsList.add(dataSourceSegments);
}
return segmentsList;

}

@Override
public boolean isCacheable()
{
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.MultiTableDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.spec.QuerySegmentSpec;

import javax.annotation.Nullable;
Expand All @@ -36,6 +36,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

/**
* Analysis of a datasource for purposes of deciding how to execute a particular query.
Expand Down Expand Up @@ -188,6 +189,19 @@ public Optional<TableDataSource> getBaseTableDataSource()
}
}

/**
* Returns the names of all table datasources associated with this datasource, only if the base datasource is a table
* or a union of tables like {@link MultiTableDataSource}
*/
public Set<String> getBaseTableDataSourceNames()
{
if (baseDataSource instanceof MultiTableDataSource || (baseDataSource instanceof TableDataSource)) {
return baseDataSource.getTableNames();
} else {
return Collections.emptySet();
}
}

/**
* Returns the bottommost (i.e. innermost) {@link Query} from a possible stack of outer queries at the root of
* the datasource tree. This is the query that will be applied to the base datasource plus any joinables that might
Expand Down Expand Up @@ -252,7 +266,7 @@ public boolean isConcreteTableBased()
// check is redundant. But in the future, we will likely want to support unions of things other than tables,
// so check anyway for future-proofing.
return isConcreteBased() && (baseDataSource instanceof TableDataSource
|| (baseDataSource instanceof UnionDataSource &&
|| (baseDataSource instanceof MultiTableDataSource &&
baseDataSource.getChildren()
.stream()
.allMatch(ds -> ds instanceof TableDataSource)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;

public class MultiTableDataSourceTest
{
private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper();

@Test
public void testSerialization() throws IOException
{
MultiTableDataSource dataSource = new UnionDataSource(ImmutableList.of(
new TableDataSource("datasource1"),
new TableDataSource("datasource2")
));
String json = JSON_MAPPER.writeValueAsString(dataSource);
MultiTableDataSource serdeDataSource = JSON_MAPPER.readValue(json, MultiTableDataSource.class);
Assert.assertEquals(dataSource, serdeDataSource);
}

@Test
public void testUnionDataSource() throws Exception
{
MultiTableDataSource dataSource = JSON_MAPPER.readValue(
"{\"type\":\"union\", \"dataSources\":[\"datasource1\", \"datasource2\"]}",
MultiTableDataSource.class
);
Assert.assertTrue(dataSource instanceof MultiTableDataSource);
Assert.assertEquals(
Lists.newArrayList(new TableDataSource("datasource1"), new TableDataSource("datasource2")),
Lists.newArrayList(dataSource.getDataSources())
);
Assert.assertEquals(
ImmutableSet.of("datasource1", "datasource2"),
dataSource.getTableNames()
);

final MultiTableDataSource serde = JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(dataSource), MultiTableDataSource.class);
Assert.assertEquals(dataSource, serde);
}
}
Loading