New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Key metadata in Avro format #6450
Core: Key metadata in Avro format #6450
Conversation
c87ef1b
to
19baeb2
Compare
public class EnvelopeKeyMetadata implements EncryptionKeyMetadata { | ||
|
||
public static final byte magic = 'I'; | ||
public static final byte version = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, we don't need this header, since we go directly to data+manifest encryption. The version can be added as a field in the Avro struct. TBD.
import org.apache.avro.specific.SpecificRecordBuilderBase; | ||
|
||
public class AvroKeyRecord extends SpecificRecordBase { | ||
public static final org.apache.avro.Schema SCHEMA$ = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg doesn't define Avro schemas directly. Instead, this should create an Iceberg schema and convert it to Avro if needed.
import org.apache.avro.specific.SpecificRecordBase; | ||
import org.apache.avro.specific.SpecificRecordBuilderBase; | ||
|
||
public class AvroKeyRecord extends SpecificRecordBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avro-serializable classes should use IndexedRecord
rather than specific classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a simple example, see GenericPartitionFieldSummary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will handle.
} | ||
} | ||
|
||
public ByteBuffer getEncryptionKey() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg classes do not use get
in method names. Get is not helpful or specific enough, so we prefer to either omit it for getters (like this) or replace it with a more helpful verb, like find
or load
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto-generated code (here, and in the 2 comments above..). I'll change this.
return this; | ||
} | ||
|
||
public Builder setWrappingKeyId(CharSequence value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use the term "wrapping" key? Is it to avoid terms like key-encryption-key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep; seems to be a standard / popular term, per google search (I looked for "encryption key wrapping")
return new Builder(); | ||
} | ||
|
||
public static class Builder extends SpecificRecordBuilderBase<AvroKeyRecord> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to expose any Avro classes here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
@@ -20,12 +20,13 @@ | |||
|
|||
import org.apache.iceberg.io.OutputFile; | |||
|
|||
class BaseEncryptedOutputFile implements EncryptedOutputFile { | |||
public class BaseEncryptedOutputFile implements EncryptedOutputFile { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this made public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be available in the envelope subpackage; but since we move classes to the encryption package, no need in this change anymore, I'll remove.
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
import org.apache.iceberg.util.PropertyUtil; | ||
|
||
public class EnvelopeEncryptionManager implements EncryptionManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep all of this package-private until we need to expose it.
public org.apache.avro.Schema getSchema() { | ||
return SCHEMA$; | ||
} | ||
// Used by DatumWriter. Applications should not call. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing whitespace.
throw new AvroRuntimeException("Bad index"); | ||
} | ||
} | ||
// Used by DatumReader. Applications should not call. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing whitespace.
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.encryption.envelope; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just put all this in the existing package. No need for a different one.
* @param encryptionProperties encryption properties | ||
*/ | ||
public EnvelopeEncryptionManager( | ||
String tableKeyId, KmsClient kmsClient, Map<String, String> encryptionProperties) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this makes this PR dependent on KMS changes.
Instead, can you just define the Avro format and classes for serialization/deserialization? I think we need to focus on the format and keep PRs separate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will move this to a separate PR
import org.apache.avro.specific.SpecificDatumWriter; | ||
import org.apache.iceberg.encryption.EncryptionKeyMetadata; | ||
|
||
public class EnvelopeKeyMetadata implements EncryptionKeyMetadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class duplicates quite a bit that's already done in IcebergDecoder
and IcebergEncoder
.
Those classes support Avro's single-message encoding, which we don't really need. But there's a RawDecoder
there that would be useful. I'd prefer to adapt classes in the data.avro package to do the raw encoding and decoding, then delegate to those. That way this is more of a refactor than a re-implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will handle this.
62a43fa
to
af0c163
Compare
I've leveraged the IcebergDecoder and IcebergEncoder as they are, including the 8-byte header (schema hash) - this enables updating the key metadata schema in the future, while making minimal / no changes in the code. But if it's better to use an explicit 1-2 byte version header, instead of the 8-byte hash (to save 6 bytes? to make versions more formal?) - and to call RawDecoder (plus add RawEncoder), lets discuss. |
|
||
@Override | ||
public EncryptionKeyMetadata copy() { | ||
KeyMetadata metadata = new KeyMetadata(encryptionKey(), wrappingKeyId(), aadPrefix()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not actually copying buffers here, is that an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the copy
method of the EncryptionKeyMetadata
interface is not called anywhere. Maybe we should deprecate it.
|
||
@Override | ||
public org.apache.avro.Schema getSchema() { | ||
return AvroSchemaUtil.convert(SCHEMA_V1, this.getClass().getCanonicalName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than doing this every time the schema is requested, can you make it a static final variable?
private static final ThreadLocal<KeyMetadataDecoder<IndexedRecord>> DECODER = | ||
ThreadLocal.withInitial(() -> new KeyMetadataDecoder<>(V1)); | ||
|
||
static final Map<Byte, Schema> supportedSchemaVersions = ImmutableMap.of(V1, SCHEMA_V1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be private. Rather than accessing the map directly, have a package-private method that exposes these.
import org.apache.iceberg.data.avro.RawDecoder; | ||
import org.apache.iceberg.relocated.com.google.common.collect.MapMaker; | ||
|
||
public class KeyMetadataDecoder<D> extends MessageDecoder.BaseDecoder<D> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this parameterized? Doesn't it always produce KeyMetadata?
* <p>The {@code readSchemaVersion} is as used the version of the expected (read) schema. Datum | ||
* instances created by this class will are described by the expected schema. | ||
*/ | ||
public KeyMetadataDecoder(byte readSchemaVersion) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need for this class to be public right?
try { | ||
writeSchemaVersion = (byte) stream.read(); | ||
} catch (IOException e) { | ||
throw new IOException("Failed to read the version byte", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why wrap an IOException
with a checked exception that has to be handled elsewhere? I'd just use UncheckedIOException
here and not worry about it in KeyMetadata
KeyMetadata.supportedSchemaVersions.get(writeSchemaVersion); | ||
|
||
if (writeSchema == null) { | ||
throw new MissingSchemaException("Cannot resolve schema for version: " + writeSchemaVersion); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not leak an Avro exception through the Iceberg API. Instead, I think this should use UnsupportedOperationException
. That's what is thrown most often in cases where we encounter something from a future version of Iceberg (which is what would trigger this).
|
||
if (decoder == null) { | ||
Function<Schema, DatumReader<?>> createReaderFunc = | ||
schema -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can just use GenericAvroReader::create
instead of all this.
decoders.put(writeSchemaVersion, decoder); | ||
} | ||
|
||
return decoder.decode(stream, reuse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catch IOException and wrap with UncheckedIOException here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the decoder.decode method doesn't throw an IOException
import org.apache.iceberg.avro.AvroSchemaUtil; | ||
import org.apache.iceberg.avro.GenericAvroWriter; | ||
|
||
public class KeyMetadataEncoder<D> implements MessageEncoder<D> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also doesn't need to be parameterized.
* <p>Buffers returned by {@code encode} are copied and will not be modified by future calls to | ||
* {@code encode}. | ||
*/ | ||
public KeyMetadataEncoder(byte schemaVersion) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't need to be public right?
class KeyMetadata implements EncryptionKeyMetadata, IndexedRecord { | ||
private static final String encryptionKeyField = "encryption_key"; | ||
private static final String wrappingKeyIdField = "wrapping_key_id"; | ||
private static final String aadPrefixField = "aad_prefix"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these constants needed? They're only used to construct the schema, right?
core/src/main/java/org/apache/iceberg/encryption/KeyMetadataDecoder.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/data/avro/RawDecoder.java
Outdated
Show resolved
Hide resolved
private static final Map<Byte, org.apache.avro.Schema> avroSchemaVersions = | ||
ImmutableMap.of(V1, AVRO_SCHEMA_V1); | ||
|
||
private static final KeyMetadataEncoder keyMetadataEncoder = new KeyMetadataEncoder(V1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't all the static final
names be ALL_CAPS?
core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java
Outdated
Show resolved
Hide resolved
return schemaVersions; | ||
} | ||
|
||
static Map<Byte, org.apache.avro.Schema> supportedAvroSchemaVersions() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this exposed? I don't think that we want to expose Avro schemas in the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for package-internal usage by KeyMetadataEncoder and KeyMetadataDecoder - they need an access to the map of supported schema versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least we can remove it if we refactor.
@@ -30,13 +30,17 @@ | |||
import org.apache.iceberg.common.DynClasses; | |||
import org.apache.iceberg.data.avro.DecoderResolver; | |||
|
|||
class GenericAvroReader<T> implements DatumReader<T>, SupportsRowPosition { | |||
public class GenericAvroReader<T> implements DatumReader<T>, SupportsRowPosition { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note about this to myself for later...
This needs to use GenericAvroReader
for a couple reasons. First, GenericAvroReader
is the only one that supports creating records with a specific class. We could extend support so that DataReader
can create StructLike
instances, but that hits the second reason: using Iceberg generics will use different representations for some types, like using LocalDate
instead of int
for date
type. The representation differences don't affect the KeyMetadata
class right now, but it's still more correct to use the internal representation.
AvroSchemaUtil.convert(SCHEMA_V1, KeyMetadata.class.getCanonicalName()); | ||
|
||
private static final Map<Byte, Schema> schemaVersions = ImmutableMap.of(V1, SCHEMA_V1); | ||
private static final Map<Byte, org.apache.avro.Schema> avroSchemaVersions = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should be ALL_CAPS as well.
Co-authored-by: Jack Ye yezhaoqin@gmail.com