-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Druid Catalog Proposal #12546
Comments
Let's add additional design details as notes to keep the description light. SecurityThe catalog is implemented as another table in Druid's metastore DB. Operations are via the REST API. When we support SQL DDL statements, the implementation of those statements will use the same REST API (with some for of user impersonation.) In other systems, there are separate permissions for table metadata and table data. Druid only has read and write access to each datasource, so we work within those limitations. The basic security rules for the catalog are:
Druid allows granting permissions on via a regular expression. So, the admin could cobble together a form of private/temporary tables by allowing, say, write access to all tables of the form "-.*" such as "paul-test", "paul-temp-metrics" and "paul-temp3". A future enhancement would be to invent a more advanced security model, but that is seen as a task separate from the catalog itself. |
First, I want to say, this is a great proposal that lays some foundation work to introduce DDL to Druid. My question is what's the relationship between current INFORMATION_SCHEMA in Druid and the proposed Catalog here? |
@FrankChen021, thanks for your encouragement! Yes, I hope that borrowing a bit of RDBMS functionality we can make Druid a bit easier to use by letting Druid keep track of its own details. You asked about INFORMATION_SCHEMA, so let's address that. Revised on June 1. INFORMATION SCHEMA RevisionsDruid's Since In this proposal, we modify the We may add a "hide" option to columns to mark a column that exists in segments, but, for whatever reason, is unwanted. A hidden column would not appear in
|
drive by comment re |
@clintropolis, thanks for the note on The key fact is that Updated the |
Work is far enough along to propose the REST API. This material (with any needed updates) will go into the API docs as part of the PR.
REST APIThe catalog adds two sets of APIs: one for users, the other internal for use by Brokers. All APIs are based at
Note that all these APIs are for the metadata catalog entries; not the actual datasource. The table metadata can exist before the underlying datasource. Also, the definition can be deleted without deleting the datasource. The primary catalog entry is the "table specification" ( Create or Update Table Specification
Configuration-as-code API to create or update a table definition within the indicated schema. Payload is a The schema must be the name of a valid writable Druid to which the user has write access. The valid schemas at present are The user must have write access to the underlying datasource, even if the datasource does not yet exist. (That is, the user requires the same permissions as they will require when they first ingest data into the datasource.) For input tables, the user must have the extended Creates or replaces the table spec for the given table depending on the
In all cases, the operation uses the provided spec as is. See the For the Update Table Specification
Incremental API to update an existing table within the indicated schema, and with the given name. Payload is a The table spec can be partial and is merged with the existing spec. Merge rules are:
Columns are merged differently for different table types.
The API supports two "levels" of synchronization. By default, the new entry simply overwrites the existing entry. However, if Read Table
Configuration-as-code API to read the table specification for the table given by a schema and table name. The user must have read access to the table. Returns a 404 (NOT FOUND) if the entry does not exist. Remember: the metadata entry exists independent of the datasource itself. The result is the Note that the above are defined so that if one does a Drop Table
Drop the catalog entry for the given table and schema. The schema must exist. The table must also exist, and the user must have write access to the underlying datasource. The optional Read Full Table Metadata
Returns the full metadata for a table including system-maintained properties such as name, update time, table spec, and more. Use this form to obtain the update timestamp used for the
Reorder ColumnColumns a
The payload is a JSON object of type {
"column": "<name>",
"where": "first|last|before|after",
"anchor": "<name>"
} A column can be moved to the start or end of the list. Or, it can be move to appear before or after some other column. The The operation fails if either the column, or the anchor (if provided) do not exist (which may occur if another writer deleted the column in the meantime.) The columns refer to entires in the catalog schema. A column may exist in the datasource but not in the catalog. Such columns can't be referenced in this API. Delete Column MetadataA client may wish to remove a specific datasource column. The
The payload is a JSON list of strings that identifies the columns to be dropped. Note that deleting a column means to remove the metadata entry for the column. This is not the same as hiding the column. This operation does not physically remove the column: if the column still exists in any segment, then the column will appear in the merged schema. Use this operation for the case that a metadata column entry was added by mistake, or if all instances of a previously-existing physical column have expired out of the segments. Dropping a column does not drop the column from the hidden columns list. It is expected that, if a column is deleted, it would likely appear in the hidden columns list until all old segments with that column expire out of the system. Hide (and Unhide) Datasource ColumnsDatasources provide a "hide" operation for columns. Segments may contain columns which are no longer needed. To avoid the need to rewrite segments, the catalog can simply "hide" existing columns. The The Payload is a {
"hide": [ "a", "b", "c" ],
"unhide": [ "d", "e" ]
} List Schema Names
Retrieves the list of the names of schemas known to the catalog, which includes the same set of schemas in the The list is not filtered by permissions as Druid does not have schema-level permissions. All users can see all schemas (but not necessarily the contents of the schemas.) List All Table Names
Retrieves the list of the names all tables known to the catalog across all schemas. Only some schemas allow definitions, only definitions appear. This is not a list of actual datasources or system tables: only a list of definitions. The list is filtered based on user permissions: the list will omit tables for which the user does not have read access. List Tables Names in a Schema
Retrieves the list of the names of tables within the given schema. This list contains only those tables for which metadata entries appear, and is thus a subset of those returned by List Tables in a Schema
Returns the list of all tables within the given schema for which the user has access. The return value is a list of the same objects returned from Flush Cache
Causes the catalog to invalidate any caches. Available on both the Coordinator and the Broker. This API is required only if the catalog DB changes outside of Druid, and is primarily for testing. Synchronize Table (Internal)
Retrieve the entry for a single table within the given schema as a Synchronize Schema (Internal)Returns the list of all table metadata, as
|
Extended Table FunctionsThe present version of Druid uses a Calcite feature to specify an ingest input table: INSERT INTO dst SELECT *
FROM TABLE(extern(
'{
"type": "inline",
"data": "a,b,1\nc,d,2\n"
}',
'{
"type": "csv",
"columns": ["x","y","z"],
"listDelimiter": null,
"findColumnsFromHeader": false,
"skipHeaderRows": 0
}',
'[
{"name": "x", "type": "STRING"},
{"name": "y", "type": "STRING"},
{"name": "z", "type": "LONG"}
]'
))
PARTITIONED BY ALL TIME As it turns out, SQL (and Calcite) allow the use of named parameters. We can rewrite the above as follows. Notice the INSERT INTO dst SELECT *
FROM TABLE(extern(
inputSource => '{
"type": "inline",
"data": "a,b,1\nc,d,2\n"
}',
inputFormat => '{
"type": "csv",
"columns": ["x","y","z"],
"listDelimiter": null,
"findColumnsFromHeader": false,
"skipHeaderRows": 0
}',
signature => '[
{"name": "x", "type": "STRING"},
{"name": "y", "type": "STRING"},
{"name": "z", "type": "LONG"}
]'
))
PARTITIONED BY ALL TIME The above is great, but can be a bit awkward: we have to encode JSON in SQL (which, when we send via the REST API, we encode again in JSON.) Let's how we can use SQL named parameters to streamline the syntax (and set ourselves up for the catalog.) SQL requires that parameter names be "simple identifiers": that is, no dots. So, we can't just say:
Instead, we have to "flatten" the names. That is, define SQL names that, internally, we map the the JSON names. The mapping is just code, so we omit the details here. Suppose we do the mapping. We now have a different set of arguments, so we need a different function. For now, let's call it We also need a way to specify the input table schema. Here we borrow another bit of Calcite functionality, the INSERT INTO dst SELECT *
FROM TABLE(staged(
source => 'inline',
data => 'a,b,1
c,d,2
',
format => 'csv'
))
(x VARCHAR NOT NULL, y VARCHAR NOT NULL, z BIGINT NOT NULL)
PARTITIONED BY ALL TIME Notice how the keywords in the Table "Templates"Using the above, the catalog allows the definition of a template table. To motivate this, let's start with a complete input table: {
"type": "input",
"properties": {
"source": "local",
"file": "wikipedia.csv",
"format": "csv",
},
"columns": ...
} The above can be run by referencing the name, say SELECT * FROM `input`.`myWiki` Druid, however, never ingests the same data twice: we want to read different files each time. Say {
"type": "input",
"properties": {
"source": "local",
"format": "csv",
},
"columns": ...
} We have to parameterize the template to run it, using a table function with the same name as the input table: SELECT * FROM (TABLE(`input`.myWiki`(file => `wiki-2016-06-02.csv`)) The result of the table function is a complete table, ready to run as if it was fully defined in the catalog. Notice that how three pieces come together:
|
Primary PartitionThe catalog allows the user to define the ingest partitioning: {
"dbSchema": "druid",
"name": "fiveMinDs",
"spec": {
"type": "datasource",
"segmentGranularity": "PT5M"
}
} By doing so, the user can drop the Supported values include:
Only the standard Druid values are supported: providing a non-standard interval will raise an error. Secondary Partitioning (Clustering)The multi-stage ingest engine allows the user to specify secondary partitioning, expressed as the The catalog models this with a list of JSON objects: {
"dbSchema": "druid",
"name": "fiveMinDs",
"spec": {
"type": "datasource",
"segmentGranularity": "PT5M",
"clusterKeys": [
{"column": "x"},
{"column": "y", "desc": true} ]
}
} At present, the catalog supports only column names: additional work is needed in the SQL query layer to support expressions. (There is also some debate about whether the optimizer can correctly use expression columns, and so whether we actually need them.) When this information is present in the catalog, the user can omit the Target Segment Row CountThe multi-stage engine allows the user to specify the desired number of rows per output segment. This is presently done as a context setting. With the catalog, it can be specified in table metadata: {
"dbSchema": "druid",
"name": "fiveMinDs",
"spec": {
"type": "datasource",
"segmentGranularity": "PT5M",
"targetSegmentRows": 4000000
}
} A user-provided context setting takes precedence. If unset, the value is the default set by the multi-stage engine, which is currently 3 million rows. |
Input FormatsThis section describes Input format properties. It will make more sense if you read the following "Metadata structure" comment first. The joys of using issue comments for documentation...) Every input table definition (or
CSVDefines a CSV input format.
Only the TODO: Need a way to escape special characters for the When used in an input table, Druid does not yet support the ability to infer column names from the input file, so the CSV requires a list of columns. Provide these in the SELECT *
FROM TABLE(staged(
source => 'inline',
data => 'a,b,1
c,d,2
',
format => 'csv'
))
(x VARCHAR NOT NULL, y VARCHAR NOT NULL, z BIGINT NOT NULL) Delimited TextDefines a Delimited text input format as a generalization of the CSV format. Properties default to provide a TSV (tab-separated values) format.
Usage and limitations are the same as for the CSV format above. JSONDefines a JSON input format.
The catalog does not yet support the Although batch ingestion does not require a list of columns, the multi-stage engine does. Provide columns the same way as described for the CSV format above. GenericThe |
Metadata StructureThe catalog stores metadata in a generalized format designed to support a number of operations:
The general approach is to divide metadata into top-level objects. At present, only one object is available: tables. Others (connections, secrets, schedules) are envisioned for later. Within each object, there are one or more types. For tables, these are different kinds of datasources, different kinds of input tables, views, and so on. Each object has a set of properties, described as key/value pairs. Tables also have a set of columns. The term specification (or spec) is used for the JSON object which the application writes into the catalog. The term metadata includes the spec, plus other general information such as a name, timestamp and so on. Table MetadataThe table metadata object holds two kinds of information: the system-defined metadata about the entry, and the user-provided table specification ( Example: {
"Id": {
"schema":"druid",
"name":"read"
},
"creationTime":1654634106432,
"updateTime":1654634106432,
"state":"ACTIVE",
"spec": <TableSpec>
}
} Fields:
The user provides the id (schema and name) and the spec; the system maintains the other fields. Table Specification (
|
Thanks for the additional details 👍
How do you plan to support Druid's complex typed columns? (Such as the recently added The reason I'm asking is that i'm still a bit worried about how we are going to cleanly map this to Druids type system. Is it going to be a strict mapping, like exactly 1 SQL type to 1 Druid type? Or will it be permissive? (e.g. |
@clint, thanks for the head's up on the complex types. Can you point me to documentation on the details of the type? To any SQL support we already have? One question is whether a If a JSON column is a blob, then we could look at the Drill If the user must declare the structure of a JSON object, then we do have a compound type. In that case, each column is, itself, a record, and can have a nested schema, to however many levels we choose to support. Experience with Drill showed that users ability to deal with schema is limited when it is one level, and rapidly falls to zero when schemas are nested: most of us just don't want to think that hard! Java (and Go, etc.) have language tools to work with nested records, SQL was designed for an earlier era and working with nested data is painful. Regardless of how we proceed, we can use the Posgres JSON operators to access fields within a JSON blob, but that would require Calcite parser changes. On the other hand, Drill did coax Calcite into allowing references to |
@clintropolis, you also asked about SQL mapping. My suggestion is to enforce a limited set of types: |
(heh, I think you tagged the wrong person in your comments, sorry other @clint 😅 ). Nested data columns are described in proposal #12695 and PR #12753. They are wired up to SQL, though I'm mostly just using them as an example. Like all complex types is currently handled in a more or less opaque manner (functions which know how to deal with Complex types can be defined as dimensions or metrics, so we can't count on defining them all in terms of aggregators. Internally, we currently build the SQL schemas for Calcite with
This is my main point, these are not the only Druid storage types, the current proposal is only able to model a rather small subset of the types which can appear in Druid segments. The complex type system is extensible, meaning there is potentially a large set of complex types based on what set of extensions is loaded. Internally these are all basically opaque, which is why we have the generic There will also likely be |
Based on advice from others, I've dropped the ideas around rollup tables: there will be no attempt to describe the aggregations for a rollup table. We'll leave that to the user to decide how to structure rollups. |
@clintropolis notes:
The intention is that a combination of the column spec and column type provides a description of all possible column types. Sorry if that was not clear: the focus in the aggregate section was on, well, aggregates. I just hadn't gotten far enough to need to deal with the others yet. One constraint I want us to keep in mind is that we'd like to eventually allow DDL statements something like: CREATE ROLLUP TABLE foo (
__time TIMESTAMP,
a IP_ADDRESS,
b ARRAY(STRING),
c SUM(LONG),
d STRUCT(e STRING, f DOUBLE),
g VARCHAR WITH COMPACT INDEX
)
PARTITION BY DAY
CLUSTER BY a, g So, the type names have to be SQL-like and SQL-parsable. With a bit more research on complex types, it sounds like we have three categories:
My proposal (since withdrawn until we rethink it) is:
There is no good answer for user-visible structures because those are not part of the SQL domain of discourse. There is an ill-fated project, SQL++ that tried to find a solution. Seems it was adopted by Apache Asterix and CouchBase. In Drill, we handled the types outside of SQL by using (an earlier version of) an Arrow-like format. The current thinking is to adapt that pattern to be more SQL and Druid-like for use in the catalog, and in eventual SQL DDL statements. For example we could invent syntax such as Array columns can be represented similarly: For the first catalog PR, the types are "to be named later": we're just focusing on storing the type names, whatever we decide they are. This gives us time to continue the type name discussion. The catalog proposes using a different column "kind" for dimensions and measures. (Where "kind" is the Jackson type field in the JSON.) In this way, we know the difference between a complex dimension (such as The kind, by the way, allows us to specify other per-column-kind information. For example, if there are multiple choices for an index type for dimensions, that would be a dimension-only attribute of a column as suggested in the DDL sketch above. Anyway, the point is taken: we do need a full design for all column types. I'll work up something. |
Updated the proposal to remove the idea of a rollup table. That idea will come as a separate proposal later. The non-spec comments above preserve the discussion: the "spec" comments describe the updated design. Since column type is now just the storage type, we can use the Druid names and optional SQL aliases. The type used in the catalog is the Druid type, converted to SQL syntax. That is, |
Druid is a powerful database optimized for time series at extreme scale. Druid provides features beyond those of a typical RDBMS: a flexible schema, ability to ingest from external data sources, support for long-running ingest jobs, and more. Users coming from a traditional database can be overwhelmed by the many choices. Any given application or datasource uses a subset of those features; it would be convenient for Druid, rather than the user, to remember the feature choices which the user made.
For example, Druid provides many different segment granularities, yet any given datasource tends to prefer one of them on ingest. Druid allows each segment to have a distinct schema, but many datasources want to ensure that at least some minimal set of “key” columns exist. Most datasources use the same metric definitions on each ingest. And so on.
Traditional RDBMS systems use a catalog to record the schema of tables, the structure of indexes, entity-relationships between tables and so on. In such systems, the catalog is an essential part of the system: it is the only way to interpret the layout of the binary table data, say, or to know which indexes relate to which tables. Druid is much different: each segment is self-contained: it has its own “mini-catalog.”
Still, as Druid adds more SQL functionality, we believe it will be convenient for users to have an optional catalog of table (datasource) definitions to avoid the need to repeat common table properties. This is especially useful for the proposed multi-stage ingest project.
Proposal Summary
Proposed is an add-on metadata catalog that allows the user to record data shape decisions in Druid and reuse them. The catalog contains:
Technically, the proposal envisions the following:
Motivation
With the catalog, a user can define an ingestion input source separate from a SQL INSERT statement. This is handy as the current EXTERN syntax requires that the user write out the input definition in JSON within a SQL statement.
The user first defines the input table, using the REST API or (eventually) the SQL DDL statements. Then, the user references the input table as if it were a SQL table. An example of one of the
CalciteInsertDmlTest
cases using an input table definition:Here
input
is a schema that contains input table definition, whileinline
is a user-defined table that is an in-line CSV input source.Similarly, when using SQL to ingest into a datasource, the user can define things like segment granularity in the catalog rather than manually including it in each SQL statement.
We expect to support additional use cases over time: the above should provide a sense of how the catalog can be used.
Catalog as "Hints"
Druid has gotten by this long without a catalog, so the use of the catalog is entirely optional: use it if it is convenient, specify things explicitly if that is more convenient. For this reason, the catalog can be seen as a set of hints. The "hint" idea contrasts with the traditional RDBMS (or the Hive) model in which the catalog is required.
External Tables
Unlike query tools such as Druid, Impala or Presto, Druid never reads the same input twice: each read ingests a distinct set of input files. The external table definition provides a way to parameterize the actual set of files: perhaps the S3 bucket, or HDFS location is the same, the file layout is the same, but the specific files differ on each run.
Resolve the Chicken-and-Egg Dilemma
We noted above that segments are their own "mini-catalogs" and provide the information needed for compaction and native queries to do their job. The problem is, however, creating segments, especially the first ones: there is no "mini-catalog" to consult: the user has to spell out the details. The catalog resolves this dilemma by allowing the metadata to exist before the first segment. As a bonus, once a table (datasource) is defined in the catalog, it can be queried, though the query will obviously return no rows. A
SELECT *
will return, the defined schema. Similarly, if a user adds a column to the table, that column is immediately available for querying, even it returns allNULL
values. This makes the Druid experience just a bit simpler as the user need not remember when a datasource (or column) will appear (after the first ingestion of non-null data.)Query Column Governance
Druid allows columns to contain can kind of data: you might start with a
long
(BIGINT
) column, later ingestdouble
(DOUBLE
) values, and even later decide to make the column astring
(VARCHAR
). The SQL layer uses the latest segment type to define the one type which SQL uses. The catalog lets the user specify this type: if the catalog defines a type for a column, then all values are cast to that type. This means that, even if a column is all-null (or never ingested), SQL still knows the type.Cold Storage
Druid caches segments locally in Historical nodes. Historicals report the schema of each segment to the Broker, which uses them, as described above, to work out the "SQL schema" for a datasource. But, what if Druid were to provide a "cold tier" mode in which seldom-used data resides only in cold storage? No Historical would load the segment, so the Broker would be unaware of the schema. The catalog resolves this issue by letting the user define the schema separately from the segments that make up the datasource.
Components
The major components of the metadata system follow along the lines of similar mechanisms within Druid: basic authentication, segment publish state, etc. There appears to be no single Druid sync framework to keep nodes synchronized with the Coordinator, so we adopt bits and pieces from each.
Metadata DB Extension
Defines a new table, perversely named "tables", that holds the metadata for a "table." A datasource is a table, but so is a view or an input source. The metadata DB extension is modeled after many others: it provides the basic CRUD semantics. It also maintains a simple version (timestamp) to catch concurrent updates.
REST Endpoint
Provides the usual CRUD operations via REST calls as operations on the Coordinator, proxied through the Router. Security in these endpoints is simple: it is based on security of the underlying object: view, datasource, etc.
DB Synchronization
Keeps Broker nodes updated to the latest state of the catalog DB. Patterned after the mechanism in the basic auth extension, but with a delta update feature borrowed from an extension that has that feature.
Planner Integration
Primary focus on this project is using catalog metadata for SQL statements, and, in particular, INSERT and REPLACE statements. Input tables replace the need for the EXERN macro; datasource metadata replaces the need to spell out partitioning and clustering.
SQL DDL Statements
As Druid extends its SQL support, an obvious part of this catalog proposal would be DDL statements such as
CREATE/ALTER/DROP TABLE
, etc. This support is considered a lower priority because:Rollup Datasources
NOTE: This section is now out of scope and is no longer planned. Leaving this here to spur future discussion.
The main challenge is around rollup datasources. In rollup, the datasource performs aggregation. It is easy to think that ingestion does the aggregation, but consider this example: ingest a set of files, each with one row. You'll get a set of, day, dozens of single-row segments, each with the "aggregation" of a single row. The compaction mechanism then combines these segments to produce one with overall totals. This process continues if we add more segments in the same time interval and compact again.
This little example points out that compaction knows how to further aggregate segments: even those with a single row. Of course, ingestion can do the same trick, if there happen to be rows with the same dimensions. But, since compaction can also do it, we know that there is sufficient state in the one-row "seed" aggregate for further compaction to occur. We want to leverage this insight.
The idea is, in SQL INSERT-style ingestion, the work happens in three parts:
This means that we can convert the following example:
To this form:
Here:
__time
column, the metadata says the rollup grain, so that the user can omit theTIME_FLOOR
it in the SQL: the metadata will cause the planner to insert the properTIME_FLOOR
function.COUNT
column is not specified: it is implicitly 1 for every row: no need to have the user tell us that. Later stages use a "sum" to accumulate the counts, as today.APPROX_COUNT_DISTINCT_DS_HLL
function takes a single argument, so the planner can infer to use that function to convert from a scalar to a "seed" aggregate.The column-level rules operate much line the built-in type coercion rules which SQL provides. Instead of simply converting a
INT
toBIGINT
, Druid add rules to implicitly convert a scalarBIGINT
to aSUM(BIGINT)
column.Extensions
A possible feature is to allow an external service to provide the catalog via an extension. An example of this is to use the Confluent schema registry, the Hive Metastore, etc. We'll flesh out this option a bit more as we get further along.
Alternatives
The three main alternatives are:
Since the catalog will become core to Druid, we tend to favor creating one focused on the rather unique needs which Druid has.
Development Phases
This project has multiple parts. A basic plan is:
CREATE/ALTER/DROP TABLE
, etc.)Backward Compatibility
Existing Druid installations will create the new catalog table upon upgrade. It will start empty. If a datasource has no metadata then Druid will behave exactly as it did before the upgrade.
If a version with the catalog is downgraded, the old Druid version will simply ignore the catalog and the user must explicitly provide the properties formerly provided by the catalog.
The text was updated successfully, but these errors were encountered: