Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SystemSchema tables (#5989) #6094

Merged
merged 61 commits into from Oct 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
44c6337
Added SystemSchema with following tables (#5989)
Jul 31, 2018
52b6115
Add documentation for system schema
Aug 1, 2018
335fc2b
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 1, 2018
4f34202
Fix static-analysis warnings
Aug 1, 2018
59e996b
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 2, 2018
7991720
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 13, 2018
3fd41de
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 19, 2018
816552e
Address PR comments
Aug 22, 2018
d74040c
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 22, 2018
a8038ee
Fix a test
Aug 22, 2018
456a0ad
Try to fix a test
Aug 22, 2018
5728364
Fix a bug around replica count
Aug 22, 2018
05fd4ce
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 27, 2018
cec8737
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 30, 2018
54dd64c
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 30, 2018
c99f027
Merge branch 'master' of github.com:druid-io/druid into system-table
Aug 31, 2018
7a57b1e
rename io.druid to org.apache.druid
Aug 31, 2018
3cb0f52
Merge branch 'master' of github.com:druid-io/druid into system-table
Sep 7, 2018
cf18959
Major change is to make tasks and segment queries streaming
Sep 7, 2018
68d45a0
Fix docs, make num_rows column nullable, some unit test changes
Sep 11, 2018
0239f94
Merge branch 'master' of github.com:druid-io/druid into system-table
Sep 11, 2018
4e3b013
make num_rows column type long, allow it to be null
Sep 11, 2018
495883a
Filter null rows for segments table from Linq4j enumerable
Sep 11, 2018
b6fe553
change num_replicas datatype to long in segments table
Sep 11, 2018
bab61c6
Fix some tests and address comments
Sep 22, 2018
14064c5
Merge branch 'master' of github.com:druid-io/druid into system-table
Sep 22, 2018
1f44382
Doc updates, other PR comments
Sep 24, 2018
8f7b0b6
Merge branch 'master' of github.com:druid-io/druid into system-table
Sep 24, 2018
b66a81b
Update tests
Sep 25, 2018
e92237f
Merge branch 'master' of github.com:druid-io/druid into system-table
Sep 25, 2018
9efbe96
Merge branch 'master' of github.com:druid-io/druid into system-table
Sep 27, 2018
95b5bc8
Address comments
Oct 1, 2018
b605ab9
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 1, 2018
1569aa5
Fix teamcity warning, change the getQueryableServer in TimelineServer…
Oct 1, 2018
b1a219a
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 1, 2018
ba7afe9
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 2, 2018
44d7285
Fix compilation after rebase
Oct 2, 2018
be5e9d7
Use the stream API from AuthorizationUtils
Oct 2, 2018
f53600f
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 2, 2018
100fa46
Added LeaderClient interface and NoopDruidLeaderClient class
Oct 2, 2018
a0dc468
Revert "Added LeaderClient interface and NoopDruidLeaderClient class"
Oct 3, 2018
0f96043
Make the naming consistent to server_segments for the join table
Oct 3, 2018
689f655
Try to fix a test in CalciteQueryTest due to rename of server_segments
Oct 3, 2018
3806a9c
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 3, 2018
dc9fa4c
Fix the json output format in the coordinator API
Oct 4, 2018
132404d
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 4, 2018
7ffc2b4
Use annonymous class object instead of mock for DruidLeaderClient in …
Oct 4, 2018
1bdff58
Fix test failures, type long/BIGINT can be nullable
Oct 4, 2018
26acfe8
Revert long nullability to fix tests
Oct 5, 2018
3fbbdc6
Fix style for tests
Oct 5, 2018
ccc7f18
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 5, 2018
23112a5
PR comments
Oct 8, 2018
b84d728
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 8, 2018
3cd1722
Address PR comments
Oct 8, 2018
1022693
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 8, 2018
d63469d
Add the missing BytesAccumulatingResponseHandler class
Oct 8, 2018
9f396aa
Use Sequences.withBaggage in DruidPlanner
Oct 8, 2018
e0657e5
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 9, 2018
83c74fe
Fix docs, add comments
Oct 9, 2018
1873c92
Merge branch 'master' of github.com:druid-io/druid into system-table
Oct 9, 2018
892ee80
Close the iterator if hasNext returns false
Oct 10, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -44,6 +44,8 @@
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.planner.PlannerResult;
import org.apache.druid.sql.calcite.schema.DruidSchema;
import org.apache.druid.sql.calcite.schema.SystemSchema;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -111,10 +113,12 @@ public void setup()
.createQueryRunnerFactoryConglomerate();
final QueryRunnerFactoryConglomerate conglomerate = conglomerateCloserPair.lhs;
final PlannerConfig plannerConfig = new PlannerConfig();

final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker);
this.walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index);
plannerFactory = new PlannerFactory(
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig),
druidSchema,
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
CalciteTests.createOperatorTable(),
CalciteTests.createExprMacroTable(),
Expand Down
100 changes: 100 additions & 0 deletions docs/content/querying/sql.md
Expand Up @@ -477,6 +477,11 @@ plan SQL queries. This metadata is cached on broker startup and also updated per
[SegmentMetadata queries](segmentmetadataquery.html). Background metadata refreshing is triggered by
segments entering and exiting the cluster, and can also be throttled through configuration.

Druid exposes system information through special system tables. There are two such schemas available: Information Schema and Sys Schema.
Information schema provides details about table and column types. The "sys" schema provides information about Druid internals like segments/tasks/servers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sys is the real name. Got it.


## INFORMATION SCHEMA

You can access table and column metadata through JDBC using `connection.getMetaData()`, or through the
INFORMATION_SCHEMA tables described below. For example, to retrieve metadata for the Druid
datasource "foo", use the query:
Expand Down Expand Up @@ -528,6 +533,101 @@ SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 'druid' AND TABLE_
|COLLATION_NAME||
|JDBC_TYPE|Type code from java.sql.Types (Druid extension)|

## SYSTEM SCHEMA

The "sys" schema provides visibility into Druid segments, servers and tasks.
For example to retrieve all segments for datasource "wikipedia", use the query:
```sql
SELECT * FROM sys.segments WHERE datasource = 'wikipedia'
```

### SEGMENTS table
Segments table provides details on all Druid segments, whether they are published yet or not.


|Column|Notes|
|------|-----|
|segment_id|Unique segment identifier|
|datasource|Name of datasource|
|start|Interval start time (in ISO 8601 format)|
|end|Interval end time (in ISO 8601 format)|
|size|Size of segment in bytes|
|version|Version string (generally an ISO8601 timestamp corresponding to when the segment set was first started). Higher version means the more recently created segment. Version comparing is based on string comparison.|
|partition_num|Partition number (an integer, unique within a datasource+interval+version; may not necessarily be contiguous)|
|num_replicas|Number of replicas of this segment currently being served|
|num_rows|Number of rows in current segment, this value could be null if unkown to broker at query time|
|is_published|Boolean is represented as long type where 1 = true, 0 = false. 1 represents this segment has been published to the metadata store|
|is_available|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is currently being served by any server(historical or realtime)|
|is_realtime|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is being served on any type of realtime tasks|
|payload|JSON-serialized data segment payload|

### SERVERS table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a blurb here explaining what this table is all about. Currently, it's listing all data servers (anything that might host a segment) and that includes both historicals and ingestion tasks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added blurb

Servers table lists all data servers(any server that hosts a segment). It includes both historicals and peons.

|Column|Notes|
|------|-----|
|server|Server name in the form host:port|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be useful to have another field for just the host. Imagine doing stuff like GROUP BY server_host to collect together all servers run on the same machine.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, added another column for server_host

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does SystemSchema support concatenating strings? Then, server isn't necessary. Users can just do select concat(host, cast(plaintext_port as VARCHAR)) from sys.servers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does support concatenating, but I think we should keep this here. It's basically the primary key of the server table, and is used for joins with the segment_servers table. The other fields (host, plaintext_port, etc) are provided too as conveniences. With system tables, since they're all generated dynamically anyway, it's ok to have some redundancy when it makes the user experience more convenient.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tried concat, it does not support that call cannot translate call CONCAT($t1, $t2). May be there is a way to support concat in future. But for now users can just do select server from sys.servers; :). Also server is the primary key of this table and is used in the join table segment_servers.

|host|Hostname of the server|
|plaintext_port|Unsecured port of the server, or -1 if plaintext traffic is disabled|
|tls_port|TLS port of the server, or -1 if TLS is disabled|
|server_type|Type of Druid service. Possible values include: historical, realtime and indexer_executor(peon).|
|tier|Distribution tier see [druid.server.tier](#../configuration/index.html#Historical-General-Configuration)|
|current_size|Current size of segments in bytes on this server|
|max_size|Max size in bytes this server recommends to assign to segments see [druid.server.maxSize](#../configuration/index.html#Historical-General-Configuration)|

To retrieve information about all servers, use the query:
```sql
SELECT * FROM sys.servers;
```

### SERVER_SEGMENTS table

SERVER_SEGMENTS is used to join servers with segments table

|Column|Notes|
|------|-----|
|server|Server name in format host:port (Primary key of [servers table](#SERVERS-table))|
|segment_id|Segment identifier (Primary key of [segments table](#SEGMENTS-table))|

JOIN between "servers" and "segments" can be used to query the number of segments for a specific datasource,
grouped by server, example query:
```sql
SELECT count(segments.segment_id) as num_segments from sys.segments as segments
INNER JOIN sys.server_segments as server_segments
ON segments.segment_id = server_segments.segment_id
INNER JOIN sys.servers as servers
ON servers.server = server_segments.server
WHERE segments.datasource = 'wikipedia'
GROUP BY servers.server;
```

### TASKS table

The tasks table provides information about active and recently-completed indexing tasks. For more information
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"check out" not "checkout out"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

check out [ingestion tasks](#../ingestion/tasks.html)

|Column|Notes|
|------|-----|
|task_id|Unique task identifier|
|type|Task type, for example this value is "index" for indexing tasks. See [tasks-overview](../ingestion/tasks.md)|
|datasource|Datasource name being indexed|
|created_time|Timestamp in ISO8601 format corresponding to when the ingestion task was created. Note that this value is populated for completed and waiting tasks. For running and pending tasks this value is set to 1970-01-01T00:00:00Z|
|queue_insertion_time|Timestamp in ISO8601 format corresponding to when this task was added to the queue on the overlord|
|status|Status of a task can be RUNNING, FAILED, SUCCESS|
|runner_status|Runner status of a completed task would be NONE, for in-progress tasks this can be RUNNING, WAITING, PENDING|
|duration|Time it took to finish the task in milliseconds, this value is present only for completed tasks|
|location|Server name where this task is running in the format host:port, this information is present only for RUNNING tasks|
|host|Hostname of the server where task is running|
|plaintext_port|Unsecured port of the server, or -1 if plaintext traffic is disabled|
|tls_port|TLS port of the server, or -1 if TLS is disabled|
|error_msg|Detailed error message in case of FAILED tasks|

For example, to retrieve tasks information filtered by status, use the query
```sql
SELECT * FROM sys.tasks where status='FAILED';
```


## Server configuration

The Druid SQL server is configured through the following properties on the broker.
Expand Down
Expand Up @@ -60,6 +60,7 @@
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.planner.PlannerResult;
import org.apache.druid.sql.calcite.schema.DruidSchema;
import org.apache.druid.sql.calcite.schema.SystemSchema;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.QueryLogHook;
Expand Down Expand Up @@ -150,13 +151,15 @@ public void setUp() throws Exception

final PlannerConfig plannerConfig = new PlannerConfig();
final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker);
final DruidOperatorTable operatorTable = new DruidOperatorTable(
ImmutableSet.of(new QuantileSqlAggregator()),
ImmutableSet.of()
);

plannerFactory = new PlannerFactory(
druidSchema,
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
CalciteTests.createExprMacroTable(),
Expand Down
10 changes: 10 additions & 0 deletions server/src/main/java/org/apache/druid/client/BrokerServerView.java
Expand Up @@ -45,12 +45,14 @@
import org.apache.druid.timeline.partition.PartitionChunk;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
*/
Expand Down Expand Up @@ -322,4 +324,12 @@ private void runTimelineCallbacks(final Function<TimelineCallback, CallbackActio
);
}
}

@Override
public List<ImmutableDruidServer> getDruidServers()
{
return clients.values().stream()
.map(queryableDruidServer -> queryableDruidServer.getServer().toImmutableDruidServer())
.collect(Collectors.toList());
}
}
119 changes: 1 addition & 118 deletions server/src/main/java/org/apache/druid/client/DirectDruidClient.java
Expand Up @@ -19,9 +19,6 @@

package org.apache.druid.client;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
Expand All @@ -32,7 +29,6 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
Expand All @@ -52,14 +48,12 @@
import org.apache.druid.query.BySegmentResultValueClass;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.jboss.netty.buffer.ChannelBuffer;
Expand All @@ -71,20 +65,16 @@
import org.joda.time.Duration;

import javax.ws.rs.core.MediaType;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -550,7 +540,7 @@ public void onFailure(Throwable t)
@Override
public JsonParserIterator<T> make()
{
return new JsonParserIterator<T>(typeRef, future, url, query);
return new JsonParserIterator<T>(typeRef, future, url, query, host, objectMapper);
}

@Override
Expand All @@ -576,113 +566,6 @@ public void cleanup(JsonParserIterator<T> iterFromMake)
return retVal;
}

private class JsonParserIterator<T> implements Iterator<T>, Closeable
{
private JsonParser jp;
private ObjectCodec objectCodec;
private final JavaType typeRef;
private final Future<InputStream> future;
private final Query<T> query;
private final String url;

public JsonParserIterator(JavaType typeRef, Future<InputStream> future, String url, Query<T> query)
{
this.typeRef = typeRef;
this.future = future;
this.url = url;
this.query = query;
jp = null;
}

@Override
public boolean hasNext()
{
init();

if (jp.isClosed()) {
return false;
}
if (jp.getCurrentToken() == JsonToken.END_ARRAY) {
CloseQuietly.close(jp);
return false;
}

return true;
}

@Override
public T next()
{
init();

try {
final T retVal = objectCodec.readValue(jp, typeRef);
jp.nextToken();
return retVal;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}

@Override
public void remove()
{
throw new UnsupportedOperationException();
}

private void init()
{
if (jp == null) {
try {
InputStream is = future.get();
if (is == null) {
throw new QueryInterruptedException(
new ResourceLimitExceededException(
"query[%s] url[%s] timed out or max bytes limit reached.",
query.getId(),
url
),
host
);
} else {
jp = objectMapper.getFactory().createParser(is);
}
final JsonToken nextToken = jp.nextToken();
if (nextToken == JsonToken.START_OBJECT) {
QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class);
throw new QueryInterruptedException(cause, host);
} else if (nextToken != JsonToken.START_ARRAY) {
throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url);
} else {
jp.nextToken();
objectCodec = jp.getCodec();
}
}
catch (IOException | InterruptedException | ExecutionException e) {
throw new RE(
e,
"Failure getting results for query[%s] url[%s] because of [%s]",
query.getId(),
url,
e.getMessage()
);
}
catch (CancellationException e) {
throw new QueryInterruptedException(e, host);
}
}
}

@Override
public void close() throws IOException
{
if (jp != null) {
jp.close();
}
}
}

@Override
public String toString()
{
Expand Down
Expand Up @@ -65,6 +65,16 @@ public String getHost()
return metadata.getHost();
}

public String getHostAndPort()
{
return metadata.getHostAndPort();
}

public String getHostAndTlsPort()
{
return metadata.getHostAndTlsPort();
}

public long getCurrSize()
{
return currSize;
Expand Down