-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extension to read and ingest iceberg data files #14329
Conversation
...ore/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceAdapterTest.java
Fixed
Show fixed
Hide fixed
...ore/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceAdapterTest.java
Fixed
Show fixed
Hide fixed
...ore/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceAdapterTest.java
Fixed
Show fixed
Hide fixed
...ruid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/HiveIcebergCatalog.java
Fixed
Show fixed
Hide fixed
...d-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergAndFilterTest.java
Fixed
Show fixed
Hide fixed
...id-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergOrFilterTest.java
Fixed
Show fixed
Hide fixed
processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a partial review. Will finish the detailed review by this week .
Super cool stuff 🚀 .
|
||
This extension provides [IcebergInputSource](../../ingestion/input-sources.md#iceberg-input-source) which enables ingestion of data stored in the Iceberg table format into Druid. | ||
|
||
Apache Iceberg is an open table format for huge analytic datasets. Even though iceberg manages most of its metadata on metadata files, it is still dependent on a metastore for managing a certain amount of metadata. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might need rephrasing. Did you mean a metadata store here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I'm referring to a metastore, also known as Iceberg metadata catalog or just Iceberg catalog. I've slightly reworded this, let me know if it helps.
|
||
### Hive Metastore catalog | ||
|
||
For Druid to seamlessly talk to the Hive Metastore, ensure that the Hive specific configuration files such as `hive-site.xml` and `core-site.xml` are available in the Druid classpath. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where are they needed. I am assuming they are only needed on the peon's ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes only on the peons, fixed it in the docs.
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<properties> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: do we require an empty block here ?
<properties> | ||
</properties> | ||
<dependencies> | ||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I donot see hadoop 2/ hadoop 3 profiles. For reference you can have a look here : https://github.com/apache/druid/blob/master/extensions-core/hdfs-storage/pom.xml#L142
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the limitations of the extensions, it is specified that hadoop 2.x support is not tested. Do we still need a hadoop2 profile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not required. though can we not even add this module if hadoop2 profile is activated? Assuming such a thing is possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can probably remove it from the distribution pom.xml under the hadoop2 profile, if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its fine since we will remove the hadoop 2 support very soon anyway.
/* | ||
* Druid wrapper for an iceberg catalog. | ||
* The configured catalog is used to load the specified iceberg table and retrieve the underlying live data files upto the latest snapshot. | ||
* This does not perform any projections on the table yet, therefore all the underlying columns will be retrieved from the data files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the icebergFilter
expression filtering happening.
Does the filtering happen while pruning the list of the data files that need to be fetched?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we create an iceberg table scan and feed it the set of filters before the plan files are identified. Therefore while the files are being planned, it can prune out the list based on the filters provided.
.forEach(dataFile -> dataFilePaths.add(dataFile.path().toString())); | ||
|
||
long duration = System.currentTimeMillis() - start; | ||
log.info("Data file scan and fetch took %d ms", duration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could also log the number of dataFilePaths
here
|
||
private void authenticate() | ||
{ | ||
String principal = catalogProperties.getOrDefault("principal", null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there other types of authentication methods or only we have support for krb5 in the initial version.
In any case we should document this explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a line in the doc.
private HiveCatalog setupCatalog() | ||
{ | ||
HiveCatalog catalog = new HiveCatalog(); | ||
authenticate(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to handle remote http/rpc related exceptions here ?
String tableIdentifier = tableNamespace + "." + tableName; | ||
|
||
Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); | ||
TableIdentifier icebergTableIdentifier = catalog.listTables(namespace).stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this call needs a special error handling to let the user know that there is some connectivity issue or bad configuration is passed.
|
||
Support for AWS Glue and REST based catalogs are not available yet. | ||
|
||
For a given catalog, iceberg table name and filters, The IcebergInputSource works by reading the table from the catalog, applying the filters and extracting all the underlying live data files up to the latest snapshot. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a given catalog, iceberg table name and filters, The IcebergInputSource works by reading the table from the catalog, applying the filters and extracting all the underlying live data files up to the latest snapshot. | |
For a given catalog, iceberg table name, and filters, the IcebergInputSource works by reading the table from the catalog, applying the filters, and extracting all the underlying live data files up to the latest snapshot. |
"fs.s3a.endpoint" : "S3_API_ENDPOINT" | ||
} | ||
``` | ||
Since the AWS connector uses the `s3a` filesystem based client, the warehouse path should be specified with the `s3a://` protocol instead of `s3://`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the AWS connector uses the `s3a` filesystem based client, the warehouse path should be specified with the `s3a://` protocol instead of `s3://`. | |
Since the Hadoop AWS connector uses the `s3a` filesystem based client, the warehouse path should be specified with the `s3a://` protocol instead of `s3://`. |
|
||
```json | ||
"catalogProperties": { | ||
"fs.s3a.access.key" : "S3_ACCESS_KEY", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you confirm that these get masked when we log these properties or when someone looks at the ingestion spec?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wasn't masked earlier, i've added support for the dynamicconfigprovider now, so it should be good.
<properties> | ||
</properties> | ||
<dependencies> | ||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not required. though can we not even add this module if hadoop2 profile is activated? Assuming such a thing is possible.
...ib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java
Show resolved
Hide resolved
return inputSource; | ||
} | ||
|
||
private static class EmptyInputSource implements SplittableInputSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should add a note here that this class exists because some underlying input sources might not accept an empty list of input sources. While an empty list is possible when working with iceberg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added docs, thanks.
import java.util.stream.Stream; | ||
|
||
/** | ||
* A wrapper on top of {@link SplittableInputSource} that handles input source creation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this class could be called LazyInputSourceBuilder or something to that effect. since it doesn't seem like an adapter. its primary responsibility is lazy on-demand instantiation of input sources.
import java.util.stream.Stream; | ||
|
||
/** | ||
* A wrapper on top of {@link SplittableInputSource} that handles input source creation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, this class could be split into one concrete class that does memoization and one interface that has a build
method. The extensions can just implement the interface.
import java.util.stream.Stream; | ||
|
||
/** | ||
* A wrapper on top of {@link SplittableInputSource} that handles input source creation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thinking a bit more about it, memoization doesn't require a class of its own at all. That's something IcebergInputSource can do itself. So all we require is the ability to generate an input source dynamically. and a single-method interface is good enough to achieve that. We can call it FileInputSourceBuilder
or FileInputSourceGenerator
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few minor comments @a2l007 - Looks good to me otherwise.
<properties> | ||
</properties> | ||
<dependencies> | ||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its fine since we will remove the hadoop 2 support very soon anyway.
|
||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-mapreduce-client-core</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started a discussion on #dev channel. It will be preferable to use the shaded jars to avoid dependency conflicts in the future. is this jar (hadoop-mapreduce-client-core) shaded? If you are not seeing any conflicts, it's fine for now.
@JsonProperty("filterValue") String filterValue | ||
) | ||
{ | ||
Preconditions.checkNotNull(filterColumn, "filterColumn can not be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can the error message be adjusted similar to how you have done in IcebergIntervalFilter?
List<Expression> expressions = new ArrayList<>(); | ||
for (Interval filterInterval : intervals) { | ||
Long dateStart = (long) Literal.of(filterInterval.getStart().toString()) | ||
.to(Types.TimestampType.withZone()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please add this bit as a doc here?
...-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergIntervalFilter.java
Show resolved
Hide resolved
@a2l007 - Looks good to me. Thank you. We have a release branch already cut. I was thinking that maybe you can backport just the core changes. That way, anyone can build the extension and try it on a production release. |
@a2l007 - I built this locally and the size of the extension directory is 431 MB. Half of that is coming from |
Some dependencies are not required in extension since they are already present in core lib. E.g. guava (2.1 MB) |
~ under the License. | ||
--> | ||
|
||
## Iceberg Ingest Extension |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
## Iceberg Ingest Extension | |
## Iceberg Ingest extension |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this heading is necessary. If you delete this heading, you can move the other headings up a level.
Since the topic is about Iceberg ingestion, consider introducing the feature first and then talk about the extension as a means of enabling the feature. For example:
Apache Iceberg is an open table format for huge analytic datasets.
Iceberg input source lets your ingest data stored in the Iceberg table format into Apache Druid. To enable the Iceberg input source, add druid-iceberg-extensions
to the list of extensions. See Loading extensions for more information.
Iceberg manages most of its metadata in metadata files in the object storage. In some cases, it uses a metastore to manage a certain amount of metadata.
|
||
This extension provides [IcebergInputSource](../../ingestion/input-sources.md#iceberg-input-source) which enables ingestion of data stored in the Iceberg table format into Druid. | ||
|
||
Apache Iceberg is an open table format for huge analytic datasets. Even though iceberg manages most of its metadata in metadata files in the object storage, it is still dependent on a metastore for managing a certain amount of metadata. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apache Iceberg is an open table format for huge analytic datasets. Even though iceberg manages most of its metadata in metadata files in the object storage, it is still dependent on a metastore for managing a certain amount of metadata. | |
Apache Iceberg is an open table format for huge analytic datasets. Iceberg manages most of its metadata in metadata files in the object storage. In some cases, it uses a metastore to manage a certain amount of metadata. |
See comment on line 25.
This extension provides [IcebergInputSource](../../ingestion/input-sources.md#iceberg-input-source) which enables ingestion of data stored in the Iceberg table format into Druid. | ||
|
||
Apache Iceberg is an open table format for huge analytic datasets. Even though iceberg manages most of its metadata in metadata files in the object storage, it is still dependent on a metastore for managing a certain amount of metadata. | ||
These metastores are defined as Iceberg catalogs and this extension supports connecting to the following catalog types: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These metastores are defined as Iceberg catalogs and this extension supports connecting to the following catalog types: | |
Iceberg refers to these metastores as catalogs. The Iceberg extension lets you connect to the following Iceberg catalog types: |
|
||
## Iceberg Ingest Extension | ||
|
||
This extension provides [IcebergInputSource](../../ingestion/input-sources.md#iceberg-input-source) which enables ingestion of data stored in the Iceberg table format into Druid. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment on line 25.
The data files are in either Parquet, ORC or Avro formats and all of these have InputFormat support in Druid. The data files typically reside in a warehouse location which could be in HDFS, S3 or the local filesystem. | ||
This extension relies on the existing InputSource connectors in Druid to read the data files from the warehouse. Therefore, the IcebergInputSource can be considered as an intermediate InputSource which provides the file paths for other InputSource implementations. | ||
|
||
### Load the Iceberg Ingest extension |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can completely omit this section if you roll it into the introduction.
|filterColumn|The column name from the iceberg table schema based on which filtering needs to happen|yes| | ||
|intervals|A JSON array containing ISO-8601 interval strings. This defines the time ranges to filter on. The start interval is inclusive and the end interval is exclusive. |yes| | ||
|
||
`and` Filter: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and
, or
, and not
filters all have the same properties. Consider not using tables to present this information or combine and
, or
, and not
into one table.
Also, the definition for the filters
property is confusing. What exactly do we pass into that property? A column name, a filter name, etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not
filter accepts a single filter whereas and
& or
accepts a list of iceberg filters.
filters
property accepts any of the other iceberg filters mentioned in this section.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reviewed this PR from the docs perspective and left some suggestions.
docs/ingestion/input-sources.md
Outdated
|--------|-----------|---------| | ||
|type|Set this value to `equals`.|yes| | ||
|filterColumn|The column name from the iceberg table schema based on which filtering needs to happen|yes| | ||
|filterValue|The value to filter on|yes| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|filterValue|The value to filter on|yes| | |
|`filterValue`|The value to filter on.|Yes| |
|
||
|Property|Description|Required| | ||
|--------|-----------|---------| | ||
|type|Set this value to `equals`.|yes| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|type|Set this value to `equals`.|yes| | |
|`type`|Set this value to `equals`.|Yes| |
|
||
|Property|Description|Required| | ||
|--------|-----------|---------| | ||
|type|Set this value to `interval`.|yes| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|type|Set this value to `interval`.|yes| | |
|`type`|Set this value to `interval`.|Yes| |
@@ -0,0 +1,121 @@ | |||
--- | |||
id: iceberg | |||
title: "Iceberg" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
title: "Iceberg" | |
title: "Iceberg extension" |
@ektravel Thank you for your review, I've addressed most of your comments. I haven't code formatted the input source properties as I'm following the same format as the other input sources described on that page. Let me know what you think. |
@abhishekagarwal87 Good catch! I've excluded the aws-java-sdk-bundle and changed the scope for few of the other dependencies. |
<exclusions> | ||
<exclusion> | ||
<groupId>com.amazonaws</groupId> | ||
<artifactId>aws-java-sdk-bundle</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't you need any aws dependency? For example in hdfs-storage, where we excluded this, we also added below
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>${aws.sdk.version}</version>
<scope>runtime</scope>
</dependency>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The metastore only needs the hadoop-aws jar which provides the org.apache.hadoop.fs.s3a.S3AFileSystem
class to resolve the s3a
client. The s3 druid extension(which has the aws-java-sdk-s3 dependency) takes care of operations on the objects retrieved by the metastore.
@a2l007 - PR looks good to me. I will let you merge it. |
Though I think there is still a lot of scope for reducing the number of dependencies that this extension has. It has jars for curator, jetty, jersey, protobuf, orc, mysql. There is iceberg spark runtime jar that I can't figure out how will be used. This will become an issue for the release manager as all these extra dependencies are going to have CVEs that require investigation before being suppressed. |
distribution/pom.xml
Outdated
@@ -258,6 +258,8 @@ | |||
<argument>-c</argument> | |||
<argument>org.apache.druid.extensions:druid-kubernetes-extensions</argument> | |||
<argument>-c</argument> | |||
<argument>org.apache.druid.extensions:druid-iceberg-extensions</argument> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a contrib extension so we shouldn't be shipping it in the distribution bundle.
@ektravel Does the doc changes look good to you? @abhishekagarwal87 I agree that the dependencies need pruning and this is something that I'm working on. Bunch of the pruning work is going to be on the transitive deps for |
Sounds good. I just merged your PR. |
@a2l007 - do you want to backport the core changes to 27 so folks can try out the extension with the 27 release? |
@a2l007 Thank you for making the requested changes. They look good to me. |
@abhishekagarwal87 Sure, raised #14608 |
This adds a new contrib extension: druid-iceberg-extensions which can be used to ingest data stored in Apache Iceberg format. It adds a new input source of type iceberg that connects to a catalog and retrieves the data files associated with an iceberg table and provides these data file paths to either an S3 or HDFS input source depending on the warehouse location. Two important dependencies associated with Apache Iceberg tables are: Catalog : This extension supports reading from either a Hive Metastore catalog or a Local file-based catalog. Support for AWS Glue is not available yet. Warehouse : This extension supports reading data files from either HDFS or S3. Adapters for other cloud object locations should be easy to add by extending the AbstractInputSourceAdapter.
This adds a new contrib extension: druid-iceberg-extensions which can be used to ingest data stored in Apache Iceberg format. It adds a new input source of type iceberg that connects to a catalog and retrieves the data files associated with an iceberg table and provides these data file paths to either an S3 or HDFS input source depending on the warehouse location. Two important dependencies associated with Apache Iceberg tables are: Catalog : This extension supports reading from either a Hive Metastore catalog or a Local file-based catalog. Support for AWS Glue is not available yet. Warehouse : This extension supports reading data files from either HDFS or S3. Adapters for other cloud object locations should be easy to add by extending the AbstractInputSourceAdapter.
@a2l007 Does the icebergFilter works if filtering on column that is not the partitions? My understanding is that if we are filtering on column that is not the partition then a data file returned from the scan may have rows that does not satisfy the filter (basically residual). How do we deal with this since we are just passing the list of file paths to Druid to ingest? |
@maytasm afaik, dynamic filtering cannot be performed on non-partitioned columns. As a workaround for this, we filter based on partitioned columns in the iceberg input source spec and add another filter in the transformspec for the non partitioned columns. |
@a2l007 Thanks for getting back to me. My understanding is that if we pass filters that is on non-partitioned columns, we would still get a list of files that may have values not matching our filters. If that is the case, then should we call that out in the docs or fails the ingestion job? Unless we convert the iceberg filters into Druid filters (in the transformspec) and push the iceberg filters down into the Druid ingestion job (after we get the list of files) |
@maytasm Yeah, we should definitely call that out in the docs and also print some warning error messages. I'll make sure to include these in my next PR. |
@a2l007 |
Fixes #13923.
Description
This adds a new contrib extension:
druid-iceberg-extensions
which can be used to ingest data stored in Apache Iceberg format. It adds a new input source of typeiceberg
that connects to a catalog and retrieves the data files associated with an iceberg table and provides these data file paths to either an S3 or HDFS input source depending on the warehouse location.Two important dependencies associated with Apache Iceberg tables are:
AbstractInputSourceAdapter
.Sample ingestion spec:
Release note
Enhanced ingestion capabilities to support ingestion of Apache Iceberg data.
Key changed/added classes in this PR
IcebergCatalog.java
IcebergInputSource.java
This PR has: