Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SystemSchema tables (#5989) #6094

Merged
merged 61 commits into from Oct 11, 2018
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
44c6337
Added SystemSchema with following tables (#5989)
Jul 31, 2018
52b6115
Add documentation for system schema
Aug 1, 2018
335fc2b
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 1, 2018
4f34202
Fix static-analysis warnings
Aug 1, 2018
59e996b
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 2, 2018
7991720
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 13, 2018
3fd41de
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 19, 2018
816552e
Address PR comments
Aug 22, 2018
d74040c
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 22, 2018
a8038ee
Fix a test
Aug 22, 2018
456a0ad
Try to fix a test
Aug 22, 2018
5728364
Fix a bug around replica count
Aug 22, 2018
05fd4ce
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 27, 2018
cec8737
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 30, 2018
54dd64c
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 30, 2018
c99f027
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 31, 2018
7a57b1e
rename io.druid to org.apache.druid
Aug 31, 2018
3cb0f52
Merge branch 'master' of github.com:druid-io/druid into system-table
Sep 7, 2018
cf18959
Major change is to make tasks and segment queries streaming
Sep 7, 2018
68d45a0
Fix docs, make num_rows column nullable, some unit test changes
Sep 11, 2018
0239f94
Merge branch 'master' of github.com:druid-io/druid into system-table
Sep 11, 2018
4e3b013
make num_rows column type long, allow it to be null
Sep 11, 2018
495883a
Filter null rows for segments table from Linq4j enumerable
Sep 11, 2018
b6fe553
change num_replicas datatype to long in segments table
Sep 11, 2018
bab61c6
Fix some tests and address comments
Sep 22, 2018
14064c5
Merge branch 'master' of github.com:druid-io/druid into system-table
Sep 22, 2018
1f44382
Doc updates, other PR comments
Sep 24, 2018
8f7b0b6
Merge branch 'master' of github.com:druid-io/druid into system-table
Sep 24, 2018
b66a81b
Update tests
Sep 25, 2018
e92237f
Merge branch 'master' of github.com:druid-io/druid into system-table
Sep 25, 2018
9efbe96
Merge branch 'master' of github.com:druid-io/druid into system-table
Sep 27, 2018
95b5bc8
Address comments
Oct 1, 2018
b605ab9
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 1, 2018
1569aa5
Fix teamcity warning, change the getQueryableServer in TimelineServer…
Oct 1, 2018
b1a219a
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 1, 2018
ba7afe9
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 2, 2018
44d7285
Fix compilation after rebase
Oct 2, 2018
be5e9d7
Use the stream API from AuthorizationUtils
Oct 2, 2018
f53600f
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 2, 2018
100fa46
Added LeaderClient interface and NoopDruidLeaderClient class
Oct 2, 2018
a0dc468
Revert "Added LeaderClient interface and NoopDruidLeaderClient class"
Oct 3, 2018
0f96043
Make the naming consistent to server_segments for the join table
Oct 3, 2018
689f655
Try to fix a test in CalciteQueryTest due to rename of server_segments
Oct 3, 2018
3806a9c
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 3, 2018
dc9fa4c
Fix the json output format in the coordinator API
Oct 4, 2018
132404d
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 4, 2018
7ffc2b4
Use annonymous class object instead of mock for DruidLeaderClient in …
Oct 4, 2018
1bdff58
Fix test failures, type long/BIGINT can be nullable
Oct 4, 2018
26acfe8
Revert long nullability to fix tests
Oct 5, 2018
3fbbdc6
Fix style for tests
Oct 5, 2018
ccc7f18
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 5, 2018
23112a5
PR comments
Oct 8, 2018
b84d728
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 8, 2018
3cd1722
Address PR comments
Oct 8, 2018
1022693
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 8, 2018
d63469d
Add the missing BytesAccumulatingResponseHandler class
Oct 8, 2018
9f396aa
Use Sequences.withBaggage in DruidPlanner
Oct 8, 2018
e0657e5
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 9, 2018
83c74fe
Fix docs, add comments
Oct 9, 2018
1873c92
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 9, 2018
892ee80
Close the iterator if hasNext returns false
Oct 10, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.data.input.Row;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
Expand All @@ -45,9 +46,11 @@
import org.apache.druid.sql.calcite.planner.PlannerResult;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.util.TestServerInventoryView;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.commons.io.FileUtils;
import org.easymock.EasyMock;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -111,16 +114,20 @@ public void setup()
.createQueryRunnerFactoryConglomerate();
final QueryRunnerFactoryConglomerate conglomerate = conglomerateCloserPair.lhs;
final PlannerConfig plannerConfig = new PlannerConfig();
final DruidLeaderClient druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would suggest to add NoopDruidLeaderClient rather than mocking with EasyMock. Even though this is not being used in this benchmark, I feel a bit weird because EasyMock is mostly for testing purpose not for benchmark.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created an LeaderClient interface and added NoopDruidLeaderClient class
using empty anonymous subclass instead


this.walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index);
plannerFactory = new PlannerFactory(
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig),
new TestServerInventoryView(walker.getSegments()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this to /druid/sql/src/main/java/org/apache/druid/sql/calcite/util and rename it to something like SimpleServerView.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm TestServerInventoryView is already in /druid/sql/src/main/java/org/apache/druid/sql/calcite/util. And not sure if I should rename it in this PR, as it was already existing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i didn't look carefully, it's in /druid/sql/src/test/java/org/apache/druid/sql/calcite/util, I think we can keep it there, as it's mostly being used by tests. Will use a mock in this benchmark instead of TestServerInventoryView.

CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
CalciteTests.createOperatorTable(),
CalciteTests.createExprMacroTable(),
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper()
CalciteTests.getJsonMapper(),
druidLeaderClient,
druidLeaderClient
);
groupByQuery = GroupByQuery
.builder()
Expand Down
88 changes: 88 additions & 0 deletions docs/content/querying/sql.md
Expand Up @@ -468,6 +468,11 @@ plan SQL queries. This metadata is cached on broker startup and also updated per
[SegmentMetadata queries](segmentmetadataquery.html). Background metadata refreshing is triggered by
segments entering and exiting the cluster, and can also be throttled through configuration.

Druid exposes system information through special system tables. There are two such schemas available: Information Schema and System Schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use better grammar here: some punctuation is missing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the missing period, let me know if you want to restructure it further.

Information schema provides details about table and column types. Sys schema provides information about Druid internals like segments/tasks/servers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sys -> System?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we'll keep it "sys"


## INFORMATION SCHEMA

You can access table and column metadata through JDBC using `connection.getMetaData()`, or through the
INFORMATION_SCHEMA tables described below. For example, to retrieve metadata for the Druid
datasource "foo", use the query:
Expand Down Expand Up @@ -519,6 +524,89 @@ SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 'druid' AND TABLE_
|COLLATION_NAME||
|JDBC_TYPE|Type code from java.sql.Types (Druid extension)|

## SYSTEM SCHEMA

The SYS schema provides visibility into Druid segments, servers and tasks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's "sys" now (not SYS).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

For example to retrieve all segments for datasource "wikipedia", use the query:
```sql
SELECT * FROM sys.segments WHERE datasource = 'wikipedia'
```

### SEGMENTS table
Segments table provides details on all Druid segments, whether they are published yet or not.


|Column|Notes|
|------|-----|
|segment_id|Unique segment identifier|
|datasource|Name of datasource|
|start|Interval start time (in ISO 8601 format)|
|end|Interval end time (in ISO 8601 format)|
|size|Size of segment in bytes|
|version|Version number (generally an ISO8601 timestamp corresponding to when the segment set was first started)|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The version format can be changed in the future. Probably better to tell This is a version string and higher version means the more recently created segment. Version comparing is based on string comparison.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i copied it from some other documentation for version, so if version format changes in future, the documentation would need to change at all places. But the extra information you provided seems useful and will append that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Let's do this later.

|partition_num|Partition number (an integer, unique within a datasource+interval+version; may not necessarily be contiguous)|
|num_replicas|Number replicas of this segment currently being served|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Number of replicas

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

|num_rows|Number of rows in current segment, this value could be null if unkown to broker at query time|
|is_published|True if this segment has been published to the metadata store|
|is_available|True if this segment is currently being served by any server|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think server here is also ambiguous. Does it mean historicals and realtime tasks?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means is_available is true for a segment, if it's being served by any server( either historical or realtime).

|is_realtime|True if this segment is being served on a realtime server|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

realtime server sounds ambiguous. How about any types of realtime tasks?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't understand your suggestion. Do you mean to provide some example of realtime servers? Or like True if this segment is being served by any realtime tasks ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have realtime node and realtime tasks (kafka index tasks, realtime index task). I believe you meant the all types of realtime tasks. So, True if this segment is being served on any type of realtime tasks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, changed the doc

|payload|Jsonified datasegment payload|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest "JSON-serialized DataSegment payload" instead

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed


### SERVERS table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a blurb here explaining what this table is all about. Currently, it's listing all data servers (anything that might host a segment) and that includes both historicals and ingestion tasks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added blurb

Servers table lists all data servers(any server that hosts a segment). It includes both historicals and peons.

|Column|Notes|
|------|-----|
|server|Server name in the form host:port|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be useful to have another field for just the host. Imagine doing stuff like GROUP BY server_host to collect together all servers run on the same machine.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, added another column for server_host

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does SystemSchema support concatenating strings? Then, server isn't necessary. Users can just do select concat(host, cast(plaintext_port as VARCHAR)) from sys.servers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does support concatenating, but I think we should keep this here. It's basically the primary key of the server table, and is used for joins with the segment_servers table. The other fields (host, plaintext_port, etc) are provided too as conveniences. With system tables, since they're all generated dynamically anyway, it's ok to have some redundancy when it makes the user experience more convenient.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tried concat, it does not support that call cannot translate call CONCAT($t1, $t2). May be there is a way to support concat in future. But for now users can just do select server from sys.servers; :). Also server is the primary key of this table and is used in the join table segment_servers.

|scheme|Server scheme http or https|
|server_type|Type of druid service for example historical, realtime, bridge, indexer_executor|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capitalize Druid, and this sentence needs a bit more punctuation. How about:

Type of Druid service. Possible values include: historical, realtime, bridge, and indexer_executor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

|tier|Distribution tier see [druid.server.tier](#../configuration/index.html#Historical-General-Configuration)|
|current_size|Current size of segments in bytes on this server|
|max_size|Max size in bytes this server recommends to assign to segments see [druid.server.maxSize](#../configuration/index.html#Historical-General-Configuration)|

To retrieve information about all servers, use the query:
```sql
SELECT * FROM sys.servers;
```

### SEGMENT_SERVERS table

SEGMENT_SERVERS is used to join SEGMENTS with SERVERS table

|Column|Notes|
|------|-----|
|server|Server name in format host:port (Primary key of [servers table](#SERVERS-table))|
|segment_id|Segment identifier (Primary key of [segments table](#SEGMENTS-table))|

To retrieve information from segment_servers table, use the query:
```sql
SELECT * FROM sys.segment_servers;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be awesome to include an example here of a JOIN between "segments" and "servers". Maybe a query that shows the breakdown of number of segments for a specific datasource, by server. Something like grouping by server, filter by datasource, count segments.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an example of query using JOIN

```

### TASKS table

The tasks table provides information about active and recently-completed indexing tasks. For more information
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"check out" not "checkout out"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

checkout out [ingestion tasks](#../ingestion/tasks.html)

|Column|Notes|
|------|-----|
|task_id|Unique task identifier|
|type|Task type, this should be "index" for indexing tasks|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be "index" for indexing tasks looks not necessary. Better to add a link to http://druid.io/docs/latest/ingestion/tasks.html.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modified it

|datasource|Datasource name being indexed|
|created_time|Timestamp in ISO8601 format corresponding to when the ingestion task was created. Note that this value is populated for completed and waiting tasks. For running and pending tasks this value is set to DateTimes.EPOCH|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DateTimes.EPOCH is in Druid's source code, and I don't think every user checks Druid's internal code. Better to say 1970-01-01T00:00:00Z instead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok done

|queue_insertion_time|Timestamp in ISO8601 format corresponding to when this task was added to the queue on the overlord|
|status|Status of a task can be RUNNING, FAILED, SUCCESS|
|runner_status|Runner status of a completed task would be NONE, for in-progress tasks this can be RUNNING, WAITING, PENDING|
|duration|Time it took to finish the task in milliseconds, this value is present only for completed tasks|
|location|Server name where this task is running in the format host:port, this information is present only for RUNNING tasks|
|error_msg|Detailed error message in case of FAILED tasks|

For example, to retrieve tasks information filtered by status, use the query
```sql
SELECT * FROM sys.tasks where status='FAILED';
```


## Server configuration

The Druid SQL server is configured through the following properties on the broker.
Expand Down
Expand Up @@ -23,7 +23,9 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
Expand Down Expand Up @@ -64,8 +66,10 @@
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.QueryLogHook;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.util.TestServerInventoryView;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
Expand Down Expand Up @@ -149,6 +153,8 @@ public void setUp() throws Exception
);

final PlannerConfig plannerConfig = new PlannerConfig();
final TimelineServerView serverView = new TestServerInventoryView(walker.getSegments());
final DruidLeaderClient druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class);
final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
final DruidOperatorTable operatorTable = new DruidOperatorTable(
ImmutableSet.of(new QuantileSqlAggregator()),
Expand All @@ -157,12 +163,15 @@ public void setUp() throws Exception

plannerFactory = new PlannerFactory(
druidSchema,
serverView,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
CalciteTests.createExprMacroTable(),
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper()
CalciteTests.getJsonMapper(),
druidLeaderClient,
druidLeaderClient
);
}

Expand Down
Expand Up @@ -322,4 +322,10 @@ private void runTimelineCallbacks(final Function<TimelineCallback, CallbackActio
);
}
}

@Override
public Map<String, QueryableDruidServer> getClients()
{
return clients;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller needs only DruidServer, not entire map or QueryableDruidServer. Also, it's much better to not return mutable objects. Please change this to like below:

return clients.values().stream()
           .map(queryableDruidServer -> queryableDruidServer.getServer().toImmutableDruidServer())
           .collect(Collectors.toList());

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion, changed.

}
}
119 changes: 1 addition & 118 deletions server/src/main/java/org/apache/druid/client/DirectDruidClient.java
Expand Up @@ -19,9 +19,6 @@

package org.apache.druid.client;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
Expand All @@ -32,7 +29,6 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
Expand All @@ -52,14 +48,12 @@
import org.apache.druid.query.BySegmentResultValueClass;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.jboss.netty.buffer.ChannelBuffer;
Expand All @@ -71,20 +65,16 @@
import org.joda.time.Duration;

import javax.ws.rs.core.MediaType;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -543,7 +533,7 @@ public void onFailure(Throwable t)
@Override
public JsonParserIterator<T> make()
{
return new JsonParserIterator<T>(typeRef, future, url, query);
return new JsonParserIterator<T>(typeRef, future, url, query, host, objectMapper);
}

@Override
Expand All @@ -569,113 +559,6 @@ public void cleanup(JsonParserIterator<T> iterFromMake)
return retVal;
}

private class JsonParserIterator<T> implements Iterator<T>, Closeable
{
private JsonParser jp;
private ObjectCodec objectCodec;
private final JavaType typeRef;
private final Future<InputStream> future;
private final Query<T> query;
private final String url;

public JsonParserIterator(JavaType typeRef, Future<InputStream> future, String url, Query<T> query)
{
this.typeRef = typeRef;
this.future = future;
this.url = url;
this.query = query;
jp = null;
}

@Override
public boolean hasNext()
{
init();

if (jp.isClosed()) {
return false;
}
if (jp.getCurrentToken() == JsonToken.END_ARRAY) {
CloseQuietly.close(jp);
return false;
}

return true;
}

@Override
public T next()
{
init();

try {
final T retVal = objectCodec.readValue(jp, typeRef);
jp.nextToken();
return retVal;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}

@Override
public void remove()
{
throw new UnsupportedOperationException();
}

private void init()
{
if (jp == null) {
try {
InputStream is = future.get();
if (is == null) {
throw new QueryInterruptedException(
new ResourceLimitExceededException(
"query[%s] url[%s] timed out or max bytes limit reached.",
query.getId(),
url
),
host
);
} else {
jp = objectMapper.getFactory().createParser(is);
}
final JsonToken nextToken = jp.nextToken();
if (nextToken == JsonToken.START_OBJECT) {
QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class);
throw new QueryInterruptedException(cause, host);
} else if (nextToken != JsonToken.START_ARRAY) {
throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url);
} else {
jp.nextToken();
objectCodec = jp.getCodec();
}
}
catch (IOException | InterruptedException | ExecutionException e) {
throw new RE(
e,
"Failure getting results for query[%s] url[%s] because of [%s]",
query.getId(),
url,
e.getMessage()
);
}
catch (CancellationException e) {
throw new QueryInterruptedException(e, host);
}
}
}

@Override
public void close() throws IOException
{
if (jp != null) {
jp.close();
}
}
}

@Override
public String toString()
{
Expand Down