From e39e74007034346eee415733f1002eba3cc9eada Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Mon, 5 Jun 2023 09:00:53 +0300 Subject: [PATCH 1/7] draft commit update snapshot producer update the patch clean up fix for previous patch address review comments move key wrapping to metadata encryption encrypt manifest list key metadata new aad util null key needs no encryption comment; clearer method/var names use key encryption key for manifest list keys add encryption util changes update EncryptionTestHelpers handle api change remove unused lines revert revapi.yml KEK cache unitest update rename var address review comments fix timeout default change writer kek timeout default Updates from review. cache unwrapped keys --- .../org/apache/iceberg/ManifestListFile.java | 43 ++++++ .../java/org/apache/iceberg/Snapshot.java | 10 ++ .../iceberg/encryption/EncryptingFileIO.java | 13 +- .../java/org/apache/iceberg/io/FileIO.java | 10 ++ .../apache/iceberg/BaseManifestListFile.java | 69 +++++++++ .../java/org/apache/iceberg/BaseSnapshot.java | 39 ++++- .../org/apache/iceberg/CatalogProperties.java | 6 + .../apache/iceberg/ManifestListWriter.java | 58 ++++++- .../org/apache/iceberg/ManifestLists.java | 25 ++- .../org/apache/iceberg/ManifestWriter.java | 18 ++- .../org/apache/iceberg/SnapshotParser.java | 33 +++- .../org/apache/iceberg/SnapshotProducer.java | 31 ++-- .../iceberg/encryption/AesGcmInputFile.java | 24 ++- .../iceberg/encryption/EncryptionUtil.java | 84 +++++++++- .../NativeEncryptionKeyMetadata.java | 11 ++ .../encryption/StandardEncryptionManager.java | 133 ++++++++++++++-- .../encryption/StandardKeyMetadata.java | 43 +++++- .../encryption/WrappedEncryptionKey.java | 56 +++++++ .../iceberg/TestManifestEncryption.java | 2 +- .../iceberg/TestManifestListEncryption.java | 143 ++++++++++++++++++ .../org/apache/iceberg/TestTableMetadata.java | 9 +- .../encryption/EncryptionTestHelpers.java | 8 +- 22 files changed, 800 insertions(+), 68 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/ManifestListFile.java create mode 100644 core/src/main/java/org/apache/iceberg/BaseManifestListFile.java create mode 100644 core/src/main/java/org/apache/iceberg/encryption/WrappedEncryptionKey.java create mode 100644 core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java diff --git a/api/src/main/java/org/apache/iceberg/ManifestListFile.java b/api/src/main/java/org/apache/iceberg/ManifestListFile.java new file mode 100644 index 000000000000..e536a0b286ac --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/ManifestListFile.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.nio.ByteBuffer; +import org.apache.iceberg.encryption.EncryptionManager; + +public interface ManifestListFile { + + /** Location of manifest list file. */ + String location(); + + /** Snapshot ID of the manifest list. */ + long snapshotId(); + + /** + * The manifest list key metadata is encrypted with a "key encryption key" (KEK). Returns the KEK + * ID for this manifest file. + */ + String keyMetadataKeyId(); + + /** Returns the manifest list key metadata, encrypted with its KEK. */ + ByteBuffer encryptedKeyMetadata(); + + /** Decrypt and return the encrypted key metadata */ + ByteBuffer decryptKeyMetadata(EncryptionManager em); +} diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java b/api/src/main/java/org/apache/iceberg/Snapshot.java index c035259e0e2c..71ba5dbe4955 100644 --- a/api/src/main/java/org/apache/iceberg/Snapshot.java +++ b/api/src/main/java/org/apache/iceberg/Snapshot.java @@ -162,6 +162,16 @@ default Iterable removedDeleteFiles(FileIO io) { */ String manifestListLocation(); + /** + * This snapshot's manifest list file info: size, encryption key metadata and location + * + * @return manifest list file info + */ + default ManifestListFile manifestListFile() { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement manifestListFile method"); + } + /** * Return the id of the schema used when this snapshot was created, or null if this information is * not available. diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java index 0203361844a5..cb2bc93ba657 100644 --- a/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java +++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java @@ -28,6 +28,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -109,13 +110,21 @@ public InputFile newInputFile(ManifestFile manifest) { } } + @Override + public InputFile newInputFile(ManifestListFile manifestList) { + if (manifestList.encryptedKeyMetadata() != null) { + ByteBuffer keyMetadata = manifestList.decryptKeyMetadata(em); + return newDecryptingInputFile(manifestList.location(), keyMetadata); + } else { + return newInputFile(manifestList.location()); + } + } + public InputFile newDecryptingInputFile(String path, ByteBuffer buffer) { return em.decrypt(wrap(io.newInputFile(path), buffer)); } public InputFile newDecryptingInputFile(String path, long length, ByteBuffer buffer) { - // TODO: is the length correct for the encrypted file? It may be the length of the plaintext - // stream return em.decrypt(wrap(io.newInputFile(path, length), buffer)); } diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java b/api/src/main/java/org/apache/iceberg/io/FileIO.java index de4bc2e12a81..eff084c7dd34 100644 --- a/api/src/main/java/org/apache/iceberg/io/FileIO.java +++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java @@ -24,6 +24,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** @@ -70,6 +71,15 @@ default InputFile newInputFile(ManifestFile manifest) { return newInputFile(manifest.path(), manifest.length()); } + default InputFile newInputFile(ManifestListFile manifestList) { + Preconditions.checkArgument( + manifestList.encryptedKeyMetadata() == null, + "Cannot decrypt manifest list: %s (use EncryptingFileIO)", + manifestList.location()); + // cannot pass length because it is not tracked outside of key metadata + return newInputFile(manifestList.location()); + } + /** Get a {@link OutputFile} instance to write bytes to the file at the given path. */ OutputFile newOutputFile(String path); diff --git a/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java b/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java new file mode 100644 index 000000000000..d253f884779b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.util.ByteBuffers; + +class BaseManifestListFile implements ManifestListFile, Serializable { + private final String location; + private final long snapshotId; + private final String keyMetadataKeyID; + // stored as a byte array to be Serializable + private final byte[] encryptedKeyMetadata; + + BaseManifestListFile( + String location, + long snapshotId, + String keyMetadataKeyID, + ByteBuffer encryptedKeyMetadata) { + this.location = location; + this.snapshotId = snapshotId; + this.encryptedKeyMetadata = ByteBuffers.toByteArray(encryptedKeyMetadata); + this.keyMetadataKeyID = keyMetadataKeyID; + } + + @Override + public String location() { + return location; + } + + @Override + public long snapshotId() { + return snapshotId; + } + + @Override + public String keyMetadataKeyId() { + return keyMetadataKeyID; + } + + @Override + public ByteBuffer encryptedKeyMetadata() { + return ByteBuffer.wrap(encryptedKeyMetadata); + } + + @Override + public ByteBuffer decryptKeyMetadata(EncryptionManager em) { + return EncryptionUtil.decryptSnapshotKeyMetadata(this, em); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index a3c4fc8738cd..649ebe865ed5 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -26,6 +26,7 @@ import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -38,11 +39,11 @@ class BaseSnapshot implements Snapshot { private final Long parentId; private final long sequenceNumber; private final long timestampMillis; - private final String manifestListLocation; private final String operation; private final Map summary; private final Integer schemaId; private final String[] v1ManifestLocations; + private final ManifestListFile manifestListFile; // lazily initialized private transient List allManifests = null; @@ -53,6 +54,7 @@ class BaseSnapshot implements Snapshot { private transient List addedDeleteFiles = null; private transient List removedDeleteFiles = null; + @VisibleForTesting BaseSnapshot( long sequenceNumber, long snapshotId, @@ -62,6 +64,26 @@ class BaseSnapshot implements Snapshot { Map summary, Integer schemaId, String manifestList) { + this( + sequenceNumber, + snapshotId, + parentId, + timestampMillis, + operation, + summary, + schemaId, + new BaseManifestListFile(manifestList, snapshotId, null, null)); + } + + BaseSnapshot( + long sequenceNumber, + long snapshotId, + Long parentId, + long timestampMillis, + String operation, + Map summary, + Integer schemaId, + ManifestListFile manifestListFile) { this.sequenceNumber = sequenceNumber; this.snapshotId = snapshotId; this.parentId = parentId; @@ -69,7 +91,7 @@ class BaseSnapshot implements Snapshot { this.operation = operation; this.summary = summary; this.schemaId = schemaId; - this.manifestListLocation = manifestList; + this.manifestListFile = manifestListFile; this.v1ManifestLocations = null; } @@ -89,7 +111,7 @@ class BaseSnapshot implements Snapshot { this.operation = operation; this.summary = summary; this.schemaId = schemaId; - this.manifestListLocation = null; + this.manifestListFile = new BaseManifestListFile(null, snapshotId, null, null); this.v1ManifestLocations = v1ManifestLocations; } @@ -128,6 +150,11 @@ public Integer schemaId() { return schemaId; } + @Override + public ManifestListFile manifestListFile() { + return manifestListFile; + } + private void cacheManifests(FileIO fileIO) { if (fileIO == null) { throw new IllegalArgumentException("Cannot cache changes: FileIO is null"); @@ -143,7 +170,7 @@ private void cacheManifests(FileIO fileIO) { if (allManifests == null) { // if manifests isn't set, then the snapshotFile is set and should be read to get the list - this.allManifests = ManifestLists.read(fileIO.newInputFile(manifestListLocation)); + this.allManifests = ManifestLists.read(fileIO.newInputFile(manifestListFile)); } if (dataManifests == null || deleteManifests == null) { @@ -216,7 +243,7 @@ public Iterable removedDeleteFiles(FileIO fileIO) { @Override public String manifestListLocation() { - return manifestListLocation; + return manifestListFile.location(); } private void cacheDeleteFileChanges(FileIO fileIO) { @@ -317,7 +344,7 @@ public String toString() { .add("timestamp_ms", timestampMillis) .add("operation", operation) .add("summary", summary) - .add("manifest-list", manifestListLocation) + .add("manifest-list", manifestListFile.location()) .add("schema-id", schemaId) .toString(); } diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index 339c59b45d1b..27d7cd91131b 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -160,4 +160,10 @@ private CatalogProperties() {} public static final String ENCRYPTION_KMS_TYPE = "encryption.kms-type"; public static final String ENCRYPTION_KMS_IMPL = "encryption.kms-impl"; + public static final String WRITER_KEK_TIMEOUT_SEC = "encryption.kek-timeout-sec"; + + /** + * Default time-out of key encryption keys. Per NIST SP 800-57 P1 R5 section 5.3.6, set to 1 week. + */ + public static final long WRITER_KEK_TIMEOUT_SEC_DEFAULT = TimeUnit.DAYS.toSeconds(7); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java index b17eedad18af..f7b193caaee2 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java @@ -19,9 +19,15 @@ package org.apache.iceberg; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Iterator; import java.util.Map; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata; +import org.apache.iceberg.encryption.NativeEncryptionOutputFile; +import org.apache.iceberg.encryption.StandardEncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; @@ -29,10 +35,29 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; abstract class ManifestListWriter implements FileAppender { + private final long snapshotId; + private final StandardEncryptionManager em; + private final NativeEncryptionKeyMetadata keyMetadata; private final FileAppender writer; + private final OutputFile file; + + private ManifestListWriter( + OutputFile file, EncryptionManager em, long snapshotId, Map meta) { + if (em instanceof StandardEncryptionManager) { + // only encrypt the manifest list if standard table encryption is used because the ability to + // encrypt the manifest list key was introduced for standard encryption. + this.em = (StandardEncryptionManager) em; + NativeEncryptionOutputFile encryptedFile = this.em.encrypt(file); + this.file = encryptedFile.encryptingOutputFile(); + this.keyMetadata = encryptedFile.keyMetadata(); + } else { + this.em = null; + this.file = file; + this.keyMetadata = null; + } - private ManifestListWriter(OutputFile file, Map meta) { - this.writer = newAppender(file, meta); + this.snapshotId = snapshotId; + this.writer = newAppender(this.file, meta); } protected abstract ManifestFile prepare(ManifestFile manifest); @@ -70,6 +95,7 @@ public long length() { return writer.length(); } +<<<<<<< HEAD static class V3Writer extends ManifestListWriter { private final V3Metadata.IndexedManifestFile wrapper; @@ -102,15 +128,33 @@ protected FileAppender newAppender(OutputFile file, Map>>>>>> 684e5e3f3 (draft commit) } } static class V2Writer extends ManifestListWriter { private final V2Metadata.IndexedManifestFile wrapper; - V2Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) { + V2Writer( + OutputFile snapshotFile, + EncryptionManager encryptionManager, + long snapshotId, + Long parentSnapshotId, + long sequenceNumber) { super( snapshotFile, + encryptionManager, + snapshotId, ImmutableMap.of( "snapshot-id", String.valueOf(snapshotId), "parent-snapshot-id", String.valueOf(parentSnapshotId), @@ -143,9 +187,15 @@ protected FileAppender newAppender(OutputFile file, Map>>>>>> 684e5e3f3 (draft commit) } throw new UnsupportedOperationException( "Cannot write manifest list for table version: " + formatVersion); diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 88587a1ebc89..8cada08cd4b3 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -22,6 +22,8 @@ import java.nio.ByteBuffer; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; @@ -39,7 +41,7 @@ public abstract class ManifestWriter> implements FileAp static final long UNASSIGNED_SEQ = -1L; private final OutputFile file; - private final ByteBuffer keyMetadataBuffer; + private final EncryptionKeyMetadata keyMetadata; private final int specId; private final FileAppender> writer; private final Long snapshotId; @@ -62,7 +64,7 @@ private ManifestWriter(PartitionSpec spec, EncryptedOutputFile file, Long snapsh this.snapshotId = snapshotId; this.reused = new GenericManifestEntry<>(spec.partitionType()); this.stats = new PartitionSummary(spec); - this.keyMetadataBuffer = (file.keyMetadata() == null) ? null : file.keyMetadata().buffer(); + this.keyMetadata = file.keyMetadata(); } protected abstract ManifestEntry prepare(ManifestEntry entry); @@ -189,6 +191,18 @@ public long length() { public ManifestFile toManifestFile() { Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed"); + + // if key metadata can store the length, add it + ByteBuffer keyMetadataBuffer; + if (keyMetadata instanceof NativeEncryptionKeyMetadata) { + keyMetadataBuffer = + ((NativeEncryptionKeyMetadata) keyMetadata).copyWithLength(length()).buffer(); + } else if (keyMetadata != null) { + keyMetadataBuffer = keyMetadata.buffer(); + } else { + keyMetadataBuffer = null; + } + // if the minSequenceNumber is null, then no manifests with a sequence number have been written, // so the min data sequence number is the one that will be assigned when this is committed. // pass UNASSIGNED_SEQ to inherit it. diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java b/core/src/main/java/org/apache/iceberg/SnapshotParser.java index bc5ef6094695..04234d0b4273 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Base64; import java.util.Iterator; import java.util.Map; import org.apache.iceberg.io.FileIO; @@ -30,6 +32,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.JsonUtil; public class SnapshotParser { @@ -48,6 +51,8 @@ private SnapshotParser() {} private static final String MANIFESTS = "manifests"; private static final String MANIFEST_LIST = "manifest-list"; private static final String SCHEMA_ID = "schema-id"; + private static final String ENCRYPTED_KEY_METADATA = "encrypted-key-metadata"; + private static final String KEY_METADATA_KEY_ID = "key-metadata-key-id"; static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException { generator.writeStartObject(); @@ -76,10 +81,10 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOExceptio generator.writeEndObject(); } - String manifestList = snapshot.manifestListLocation(); - if (manifestList != null) { + ManifestListFile manifestList = snapshot.manifestListFile(); + if (manifestList.location() != null) { // write just the location. manifests should not be embedded in JSON along with a list - generator.writeStringField(MANIFEST_LIST, manifestList); + generator.writeStringField(MANIFEST_LIST, manifestList.location()); } else { // embed the manifest list in the JSON, v1 only JsonUtil.writeStringArray( @@ -93,6 +98,14 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOExceptio generator.writeNumberField(SCHEMA_ID, snapshot.schemaId()); } + if (manifestList.encryptedKeyMetadata() != null) { + String encodedKeyMetadata = + Base64.getEncoder() + .encodeToString(ByteBuffers.toByteArray(manifestList.encryptedKeyMetadata())); + generator.writeStringField(ENCRYPTED_KEY_METADATA, encodedKeyMetadata); + generator.writeStringField(KEY_METADATA_KEY_ID, manifestList.keyMetadataKeyId()); + } + generator.writeEndObject(); } @@ -147,6 +160,18 @@ static Snapshot fromJson(JsonNode node) { if (node.has(MANIFEST_LIST)) { // the manifest list is stored in a manifest list file String manifestList = JsonUtil.getString(MANIFEST_LIST, node); + + // Manifest list can be encrypted + ManifestListFile manifestListFile; + if (node.has(ENCRYPTED_KEY_METADATA)) { + String encodedKeyMetadata = JsonUtil.getString(ENCRYPTED_KEY_METADATA, node); + ByteBuffer keyMetadata = ByteBuffer.wrap(Base64.getDecoder().decode(encodedKeyMetadata)); + String keyId = JsonUtil.getString(KEY_METADATA_KEY_ID, node); + manifestListFile = new BaseManifestListFile(manifestList, snapshotId, keyId, keyMetadata); + } else { + manifestListFile = new BaseManifestListFile(manifestList, snapshotId, null, null); + } + return new BaseSnapshot( sequenceNumber, snapshotId, @@ -155,7 +180,7 @@ static Snapshot fromJson(JsonNode node) { operation, summary, schemaId, - manifestList); + manifestListFile); } else { // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 74997cc89849..146a75076ccf 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -236,14 +236,16 @@ public Snapshot apply() { List manifests = apply(base, parentSnapshot); OutputFile manifestList = manifestListPath(); - - try (ManifestListWriter writer = - ManifestLists.write( - ops.current().formatVersion(), - manifestList, - snapshotId(), - parentSnapshotId, - sequenceNumber)) { + ManifestListWriter writer = null; + try { + writer = + ManifestLists.write( + ops.current().formatVersion(), + ops.encryption(), + manifestList, + snapshotId(), + parentSnapshotId, + sequenceNumber); // keep track of the manifest lists created manifestLists.add(manifestList.location()); @@ -257,8 +259,15 @@ public Snapshot apply() { .run(index -> manifestFiles[index] = manifestsWithMetadata.get(manifests.get(index))); writer.addAll(Arrays.asList(manifestFiles)); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to write manifest list file"); + + } finally { + if (writer != null) { + try { + writer.close(); // must close before getting file length + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to close manifest list file writer"); + } + } } return new BaseSnapshot( @@ -269,7 +278,7 @@ public Snapshot apply() { operation(), summary(base), base.currentSchemaId(), - manifestList.location()); + writer.toManifestListFile()); } protected abstract Map summary(); diff --git a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java index a43643fcc779..b03944859b6e 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java +++ b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java @@ -26,20 +26,34 @@ public class AesGcmInputFile implements InputFile { private final InputFile sourceFile; private final byte[] dataKey; private final byte[] fileAADPrefix; - private long plaintextLength; + private Long encryptedLength; + private Long plaintextLength; public AesGcmInputFile(InputFile sourceFile, byte[] dataKey, byte[] fileAADPrefix) { + this(sourceFile, dataKey, fileAADPrefix, null); + } + + public AesGcmInputFile(InputFile sourceFile, byte[] dataKey, byte[] fileAADPrefix, Long length) { this.sourceFile = sourceFile; this.dataKey = dataKey; this.fileAADPrefix = fileAADPrefix; - this.plaintextLength = -1; + this.encryptedLength = length; + this.plaintextLength = null; + } + + private long encryptedLength() { + if (encryptedLength == null) { + this.encryptedLength = sourceFile.getLength(); + } + + return encryptedLength; } @Override public long getLength() { - if (plaintextLength == -1) { + if (plaintextLength == null) { // Presumes all streams use hard-coded plaintext block size. - plaintextLength = AesGcmInputStream.calculatePlaintextLength(sourceFile.getLength()); + plaintextLength = AesGcmInputStream.calculatePlaintextLength(encryptedLength()); } return plaintextLength; @@ -47,7 +61,7 @@ public long getLength() { @Override public SeekableInputStream newStream() { - long ciphertextLength = sourceFile.getLength(); + long ciphertextLength = encryptedLength(); Preconditions.checkState( ciphertextLength >= Ciphers.MIN_STREAM_LENGTH, "Invalid encrypted stream: %d is shorter than the minimum possible stream length", diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java index e2cf98bf767f..1523055911e1 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -18,12 +18,17 @@ */ package org.apache.iceberg.encryption; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Map; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.TableProperties; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.PropertyUtil; public class EncryptionUtil { @@ -70,31 +75,94 @@ public static KeyManagementClient createKmsClient(Map catalogPro return kmsClient; } + /** + * @deprecated will be removed in 2.0.0. use {@link #createEncryptionManager(String, int, + * KeyManagementClient, long)} instead. + */ + @Deprecated public static EncryptionManager createEncryptionManager( Map tableProperties, KeyManagementClient kmsClient) { - Preconditions.checkArgument(kmsClient != null, "Invalid KMS client: null"); String tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY); - - if (null == tableKeyId) { - // Unencrypted table - return PlaintextEncryptionManager.instance(); - } - int dataKeyLength = PropertyUtil.propertyAsInt( tableProperties, TableProperties.ENCRYPTION_DEK_LENGTH, TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); + return createEncryptionManager( + tableKeyId, dataKeyLength, kmsClient, CatalogProperties.WRITER_KEK_TIMEOUT_SEC_DEFAULT); + } + + public static EncryptionManager createEncryptionManager( + String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient, long writerKekTimeout) { + Preconditions.checkArgument(kmsClient != null, "Invalid KMS client: null"); + + if (null == tableKeyId) { + // Unencrypted table + return PlaintextEncryptionManager.instance(); + } + Preconditions.checkState( dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32, "Invalid data key length: %s (must be 16, 24, or 32)", dataKeyLength); - return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient); + return new StandardEncryptionManager(tableKeyId, dataKeyLength, ImmutableList.of(), kmsClient, writerKekTimeout); } public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) { return new BaseEncryptedOutputFile(encryptingOutputFile, EncryptionKeyMetadata.empty()); } + + /** + * Decrypt the key metadata for a snapshot. + * + *

Encryption for snapshot key metadata is only available for tables using standard encryption. + * + * @param manifestList a ManifestListFile + * @param em the table's EncryptionManager + * @return a decrypted key metadata buffer + */ + public static ByteBuffer decryptSnapshotKeyMetadata( + ManifestListFile manifestList, EncryptionManager em) { + Preconditions.checkState( + em instanceof StandardEncryptionManager, + "Snapshot key metadata encryption requires a StandardEncryptionManager"); + ByteBuffer unwrappedKey = + ((StandardEncryptionManager) em).unwrapKey(manifestList.keyMetadataKeyId()); + return decryptSnapshotKeyMetadata( + unwrappedKey, manifestList.snapshotId(), manifestList.encryptedKeyMetadata()); + } + + private static ByteBuffer decryptSnapshotKeyMetadata( + ByteBuffer key, long snapshotId, ByteBuffer encryptedKeyMetadata) { + Ciphers.AesGcmDecryptor decryptor = new Ciphers.AesGcmDecryptor(ByteBuffers.toByteArray(key)); + byte[] keyMetadataBytes = ByteBuffers.toByteArray(encryptedKeyMetadata); + byte[] decryptedKeyMetadata = decryptor.decrypt(keyMetadataBytes, snapshotIdAsAAD(snapshotId)); + return ByteBuffer.wrap(decryptedKeyMetadata); + } + + /** + * Encrypts the key metadata for a snapshot. + * + *

Encryption for snapshot key metadata is only available for tables using standard encryption. + * + * @param key unwrapped snapshot key bytes + * @param snapshotId ID of the table snapshot + * @param keyMetadata unencrypted EncryptionKeyMetadata + * @return a Pair of the key ID used to encrypt and the encrypted key metadata + */ + public static ByteBuffer encryptSnapshotKeyMetadata( + ByteBuffer key, long snapshotId, EncryptionKeyMetadata keyMetadata) { + Ciphers.AesGcmEncryptor encryptor = new Ciphers.AesGcmEncryptor(ByteBuffers.toByteArray(key)); + byte[] keyMetadataBytes = ByteBuffers.toByteArray(keyMetadata.buffer()); + byte[] encryptedKeyMetadata = encryptor.encrypt(keyMetadataBytes, snapshotIdAsAAD(snapshotId)); + return ByteBuffer.wrap(encryptedKeyMetadata); + } + + private static byte[] snapshotIdAsAAD(long snapshotId) { + ByteBuffer asBuffer = + ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, snapshotId); + return ByteBuffers.toByteArray(asBuffer); + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java index c2ed9d564d1e..127cf8a8b63a 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java +++ b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java @@ -27,4 +27,15 @@ public interface NativeEncryptionKeyMetadata extends EncryptionKeyMetadata { /** Additional authentication data as a {@link ByteBuffer} */ ByteBuffer aadPrefix(); + + /** Encrypted file length */ + Long fileLength(); + + /** + * Copy this key metadata and set the file length. + * + * @param length length of the encrypted file in bytes + * @return a copy of this key metadata (key and AAD) with the file length + */ + NativeEncryptionKeyMetadata copyWithLength(long length); } diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 119d2a5f9ae2..8186bcf3e97b 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -20,28 +20,70 @@ import java.nio.ByteBuffer; import java.security.SecureRandom; +import java.util.Base64; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.TableProperties; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.ByteBuffers; public class StandardEncryptionManager implements EncryptionManager { - private final transient KeyManagementClient kmsClient; private final String tableKeyId; private final int dataKeyLength; + private final long writerKekTimeout; + + // a holder class of metadata that is not available after serialization + private static class KeyManagementMetadata { + private final KeyManagementClient kmsClient; + private final Map encryptionKeys; + private WrappedEncryptionKey currentEncryptionKey; + + private KeyManagementMetadata(KeyManagementClient kmsClient) { + this.kmsClient = kmsClient; + this.encryptionKeys = Maps.newLinkedHashMap(); + this.currentEncryptionKey = null; + } + } + + private final transient KeyManagementMetadata keyData; private transient volatile SecureRandom lazyRNG = null; + /** + * @deprecated will be removed in 2.0.0. use {@link #StandardEncryptionManager(String, int, List, + * KeyManagementClient, long)} instead. + */ + @Deprecated + public StandardEncryptionManager( + String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { + this( + tableKeyId, + dataKeyLength, + ImmutableList.of(), + kmsClient, + CatalogProperties.WRITER_KEK_TIMEOUT_SEC_DEFAULT); + } + /** * @param tableKeyId table encryption key id * @param dataKeyLength length of data encryption key (16/24/32 bytes) * @param kmsClient Client of KMS used to wrap/unwrap keys in envelope encryption + * @param writerKekTimeout timeout of kek (key encryption key) cache entries */ - public StandardEncryptionManager( - String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { + StandardEncryptionManager( + String tableKeyId, + int dataKeyLength, + List keys, + KeyManagementClient kmsClient, + long writerKekTimeout) { Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null"); Preconditions.checkArgument( dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32, @@ -49,8 +91,18 @@ public StandardEncryptionManager( dataKeyLength); Preconditions.checkNotNull(kmsClient, "Invalid KMS client: null"); this.tableKeyId = tableKeyId; - this.kmsClient = kmsClient; + this.keyData = new KeyManagementMetadata(kmsClient); this.dataKeyLength = dataKeyLength; + this.writerKekTimeout = writerKekTimeout; + + for (WrappedEncryptionKey key : keys) { + keyData.encryptionKeys.put(key.id(), key); + + if (keyData.currentEncryptionKey == null + || keyData.currentEncryptionKey.timestamp() < key.timestamp()) { + keyData.currentEncryptionKey = key; + } + } } @Override @@ -82,21 +134,79 @@ private SecureRandom workerRNG() { } public ByteBuffer wrapKey(ByteBuffer secretKey) { - if (kmsClient == null) { + if (keyData == null) { throw new IllegalStateException( "Cannot wrap key after called after serialization (missing KMS client)"); } - return kmsClient.wrapKey(secretKey, tableKeyId); + return keyData.kmsClient.wrapKey(secretKey, tableKeyId); } public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) { - if (kmsClient == null) { - throw new IllegalStateException( - "Cannot wrap key after called after serialization (missing KMS client)"); + if (keyData == null) { + throw new IllegalStateException("Cannot unwrap key after serialization (missing KMS client)"); } - return kmsClient.unwrapKey(wrappedSecretKey, tableKeyId); + return keyData.kmsClient.unwrapKey(wrappedSecretKey, tableKeyId); + } + + public String currentSnapshotKeyId() { + if (keyData == null) { + throw new IllegalStateException("Cannot return the current snapshot key after serialization"); + } + + if (keyData.currentEncryptionKey == null + || keyData.currentEncryptionKey.timestamp() + < System.currentTimeMillis() - writerKekTimeout) { + createNewEncryptionKey(); + } + + return keyData.currentEncryptionKey.id(); + } + + public ByteBuffer unwrapKey(String keyId) { + if (keyData == null) { + throw new IllegalStateException("Cannot unwrap key after serialization (missing KMS client)"); + } + + WrappedEncryptionKey cachedKey = keyData.encryptionKeys.get(keyId); + ByteBuffer key = cachedKey.key(); + + if (key == null) { + key = unwrapKey(cachedKey.wrappedKey()); + cachedKey.setUnwrappedKey(key); + } + + return key; + } + + Collection keys() { + if (keyData == null) { + throw new IllegalStateException("Cannot return the current keys after serialization"); + } + + return keyData.encryptionKeys.values(); + } + + private ByteBuffer newKey() { + byte[] newKey = new byte[dataKeyLength]; + workerRNG().nextBytes(newKey); + return ByteBuffer.wrap(newKey); + } + + private String newKeyId() { + byte[] idBytes = new byte[6]; + workerRNG().nextBytes(idBytes); + return Base64.getEncoder().encodeToString(idBytes); + } + + private void createNewEncryptionKey() { + long now = System.currentTimeMillis(); + ByteBuffer keyBytes = newKey(); + WrappedEncryptionKey key = + new WrappedEncryptionKey(newKeyId(), keyBytes, wrapKey(keyBytes), now); + keyData.encryptionKeys.put(key.id(), key); + keyData.currentEncryptionKey = key; } private class StandardEncryptedOutputFile implements NativeEncryptionOutputFile { @@ -173,7 +283,8 @@ private AesGcmInputFile decrypted() { new AesGcmInputFile( encryptedInputFile.encryptedInputFile(), ByteBuffers.toByteArray(keyMetadata().encryptionKey()), - ByteBuffers.toByteArray(keyMetadata().aadPrefix())); + ByteBuffers.toByteArray(keyMetadata().aadPrefix()), + keyMetadata().fileLength()); } return lazyDecryptedInputFile; diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java index 98f87c65d95f..6ddea184d8c4 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java @@ -36,7 +36,8 @@ class StandardKeyMetadata implements NativeEncryptionKeyMetadata, IndexedRecord private static final Schema SCHEMA_V1 = new Schema( required(0, "encryption_key", Types.BinaryType.get()), - optional(1, "aad_prefix", Types.BinaryType.get())); + optional(1, "aad_prefix", Types.BinaryType.get()), + optional(2, "file_length", Types.LongType.get())); private static final org.apache.avro.Schema AVRO_SCHEMA_V1 = AvroSchemaUtil.convert(SCHEMA_V1, StandardKeyMetadata.class.getCanonicalName()); @@ -49,20 +50,31 @@ class StandardKeyMetadata implements NativeEncryptionKeyMetadata, IndexedRecord private ByteBuffer encryptionKey; private ByteBuffer aadPrefix; - private org.apache.avro.Schema avroSchema; + private Long fileLength; /** Used by Avro reflection to instantiate this class * */ StandardKeyMetadata() {} StandardKeyMetadata(byte[] key, byte[] aad) { + this(key, aad, null); + } + + StandardKeyMetadata(byte[] key, byte[] aad, Long fileLength) { this.encryptionKey = ByteBuffer.wrap(key); this.aadPrefix = ByteBuffer.wrap(aad); + this.fileLength = fileLength; } - private StandardKeyMetadata(ByteBuffer encryptionKey, ByteBuffer aadPrefix) { - this.encryptionKey = encryptionKey; - this.aadPrefix = aadPrefix; - this.avroSchema = AVRO_SCHEMA_V1; + /** + * Copy constructor. + * + * @param toCopy a StandardKeymetadata to copy + * @param fileLength file length that overrides toCopy if not null + */ + private StandardKeyMetadata(StandardKeyMetadata toCopy, Long fileLength) { + this.encryptionKey = toCopy.encryptionKey; + this.aadPrefix = toCopy.aadPrefix; + this.fileLength = fileLength != null ? fileLength : toCopy.fileLength; } static Map supportedSchemaVersions() { @@ -83,6 +95,11 @@ public ByteBuffer aadPrefix() { return aadPrefix; } + @Override + public Long fileLength() { + return fileLength; + } + static StandardKeyMetadata castOrParse(EncryptionKeyMetadata keyMetadata) { if (keyMetadata instanceof StandardKeyMetadata) { return (StandardKeyMetadata) keyMetadata; @@ -116,7 +133,12 @@ public ByteBuffer buffer() { @Override public EncryptionKeyMetadata copy() { - return new StandardKeyMetadata(encryptionKey(), aadPrefix()); + return new StandardKeyMetadata(this, null); + } + + @Override + public NativeEncryptionKeyMetadata copyWithLength(long length) { + return new StandardKeyMetadata(this, length); } @Override @@ -128,6 +150,9 @@ public void put(int i, Object v) { case 1: this.aadPrefix = (ByteBuffer) v; return; + case 2: + this.fileLength = (Long) v; + return; default: // ignore the object, it must be from a newer version of the format } @@ -140,6 +165,8 @@ public Object get(int i) { return encryptionKey; case 1: return aadPrefix; + case 2: + return fileLength; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); } @@ -147,6 +174,6 @@ public Object get(int i) { @Override public org.apache.avro.Schema getSchema() { - return avroSchema; + return AVRO_SCHEMA_V1; } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/WrappedEncryptionKey.java b/core/src/main/java/org/apache/iceberg/encryption/WrappedEncryptionKey.java new file mode 100644 index 000000000000..c5bad6ca1251 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/WrappedEncryptionKey.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import java.io.Serializable; +import java.nio.ByteBuffer; + +public class WrappedEncryptionKey implements Serializable { + private final String keyID; + private final ByteBuffer wrappedKey; + private final long timestamp; + private ByteBuffer keyBytes; + + public WrappedEncryptionKey( + String keyID, ByteBuffer keyBytes, ByteBuffer wrappedKey, long timestamp) { + this.keyID = keyID; + this.wrappedKey = wrappedKey; + this.timestamp = timestamp; + } + + public String id() { + return keyID; + } + + public ByteBuffer wrappedKey() { + return wrappedKey; + } + + public long timestamp() { + return timestamp; + } + + public ByteBuffer key() { + return keyBytes; + } + + public void setUnwrappedKey(ByteBuffer key) { + keyBytes = key; + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java b/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java index 13e8985cdb56..7c32d8a6245d 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java @@ -85,7 +85,7 @@ public class TestManifestEncryption { private static final DataFile DATA_FILE = new GenericDataFile( - 0, + SPEC.specId(), PATH, FORMAT, PARTITION, diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java b/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java new file mode 100644 index 000000000000..90feed4789f1 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.avro.InvalidAvroMagicException; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionTestHelpers; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.inmemory.InMemoryOutputFile; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class TestManifestListEncryption { + private static final String PATH = "s3://bucket/table/m1.avro"; + private static final long LENGTH = 1024L; + private static final int SPEC_ID = 1; + private static final long SEQ_NUM = 34L; + private static final long MIN_SEQ_NUM = 10L; + private static final long SNAPSHOT_ID = 987134631982734L; + private static final int ADDED_FILES = 2; + private static final long ADDED_ROWS = 5292L; + private static final int EXISTING_FILES = 343; + private static final long EXISTING_ROWS = 857273L; + private static final int DELETED_FILES = 1; + private static final long DELETED_ROWS = 22910L; + + private static final ByteBuffer FIRST_SUMMARY_LOWER_BOUND = + Conversions.toByteBuffer(Types.IntegerType.get(), 10); + private static final ByteBuffer FIRST_SUMMARY_UPPER_BOUND = + Conversions.toByteBuffer(Types.IntegerType.get(), 100); + private static final ByteBuffer SECOND_SUMMARY_LOWER_BOUND = + Conversions.toByteBuffer(Types.IntegerType.get(), 20); + private static final ByteBuffer SECOND_SUMMARY_UPPER_BOUND = + Conversions.toByteBuffer(Types.IntegerType.get(), 200); + + private static final List PARTITION_SUMMARIES = + Lists.newArrayList( + new GenericPartitionFieldSummary( + false, FIRST_SUMMARY_LOWER_BOUND, FIRST_SUMMARY_UPPER_BOUND), + new GenericPartitionFieldSummary( + true, false, SECOND_SUMMARY_LOWER_BOUND, SECOND_SUMMARY_UPPER_BOUND)); + private static final ByteBuffer MANIFEST_KEY_METADATA = ByteBuffer.allocate(100); + + private static final ManifestFile TEST_MANIFEST = + new GenericManifestFile( + PATH, + LENGTH, + SPEC_ID, + ManifestContent.DATA, + SEQ_NUM, + MIN_SEQ_NUM, + SNAPSHOT_ID, + ADDED_FILES, + ADDED_ROWS, + EXISTING_FILES, + EXISTING_ROWS, + DELETED_FILES, + DELETED_ROWS, + PARTITION_SUMMARIES, + MANIFEST_KEY_METADATA); + + private static final EncryptionManager ENCRYPTION_MANAGER = + EncryptionTestHelpers.createEncryptionManager(); + + @Test + public void testV2Write() throws IOException { + ManifestFile manifest = writeAndReadEncryptedManifestList(); + + assertThat(manifest.path()).isEqualTo(PATH); + assertThat(manifest.length()).isEqualTo(LENGTH); + assertThat(manifest.partitionSpecId()).isEqualTo(SPEC_ID); + assertThat(manifest.content()).isEqualTo(ManifestContent.DATA); + assertThat(manifest.sequenceNumber()).isEqualTo(SEQ_NUM); + assertThat(manifest.minSequenceNumber()).isEqualTo(MIN_SEQ_NUM); + assertThat((long) manifest.snapshotId()).isEqualTo(SNAPSHOT_ID); + assertThat((int) manifest.addedFilesCount()).isEqualTo(ADDED_FILES); + assertThat((long) manifest.addedRowsCount()).isEqualTo(ADDED_ROWS); + assertThat((int) manifest.existingFilesCount()).isEqualTo(EXISTING_FILES); + assertThat((long) manifest.existingRowsCount()).isEqualTo(EXISTING_ROWS); + assertThat((int) manifest.deletedFilesCount()).isEqualTo(DELETED_FILES); + assertThat((long) manifest.deletedRowsCount()).isEqualTo(DELETED_ROWS); + assertThat(manifest.content()).isEqualTo(ManifestContent.DATA); + } + + private ManifestFile writeAndReadEncryptedManifestList() throws IOException { + OutputFile rawOutput = new InMemoryOutputFile(); + EncryptedOutputFile encryptedOutput = ENCRYPTION_MANAGER.encrypt(rawOutput); + EncryptionKeyMetadata keyMetadata = encryptedOutput.keyMetadata(); + + try (FileAppender writer = + ManifestLists.write( + 2, encryptedOutput.encryptingOutputFile(), SNAPSHOT_ID, SNAPSHOT_ID - 1, SEQ_NUM)) { + writer.add(TEST_MANIFEST); + } + + InputFile rawInput = rawOutput.toInputFile(); + + // First try to read without decryption + assertThatThrownBy(() -> ManifestLists.read(rawInput)) + .isInstanceOf(RuntimeIOException.class) + .hasMessageContaining("Failed to open file") + .hasCauseInstanceOf(InvalidAvroMagicException.class); + + EncryptedInputFile encryptedManifestListInput = + EncryptedFiles.encryptedInput(rawInput, keyMetadata); + InputFile manifestListInput = ENCRYPTION_MANAGER.decrypt(encryptedManifestListInput); + + List manifests = ManifestLists.read(manifestListInput); + assertThat(manifests.size()).isEqualTo(1); + return manifests.get(0); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index bcde7a7f31d3..dcfb36b1111c 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -54,6 +54,7 @@ import java.util.stream.Stream; import org.apache.iceberg.TableMetadata.MetadataLogEntry; import org.apache.iceberg.TableMetadata.SnapshotLogEntry; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -1605,7 +1606,13 @@ private String createManifestListWithManifestFile( manifestList.deleteOnExit(); try (ManifestListWriter writer = - ManifestLists.write(1, Files.localOutput(manifestList), snapshotId, parentSnapshotId, 0)) { + ManifestLists.write( + 1, + PlaintextEncryptionManager.instance(), + Files.localOutput(manifestList), + snapshotId, + parentSnapshotId, + 0)) { writer.addAll( ImmutableList.of(new GenericManifestFile(localInput(manifestFile), SPEC_5.specId()))); } diff --git a/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java index aa49e1c40fe2..94f8eb603d5e 100644 --- a/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java @@ -31,11 +31,11 @@ public static EncryptionManager createEncryptionManager() { Map catalogProperties = Maps.newHashMap(); catalogProperties.put( CatalogProperties.ENCRYPTION_KMS_IMPL, UnitestKMS.class.getCanonicalName()); - Map tableProperties = Maps.newHashMap(); - tableProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, UnitestKMS.MASTER_KEY_NAME1); - tableProperties.put(TableProperties.FORMAT_VERSION, "2"); return EncryptionUtil.createEncryptionManager( - tableProperties, EncryptionUtil.createKmsClient(catalogProperties)); + UnitestKMS.MASTER_KEY_NAME1, + TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT, + EncryptionUtil.createKmsClient(catalogProperties), + CatalogProperties.WRITER_KEK_TIMEOUT_SEC_DEFAULT); } } From cc27c3f2f3d3d8a8a184bf9500ab03b415f1de9f Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Thu, 12 Sep 2024 10:03:52 +0300 Subject: [PATCH 2/7] compile and ci fixes --- .palantir/revapi.yml | 11 ++++++ .../apache/iceberg/BaseManifestListFile.java | 9 ++--- .../apache/iceberg/ManifestListWriter.java | 35 +++++++++++-------- .../org/apache/iceberg/ManifestLists.java | 6 +--- .../hadoop/TestCatalogUtilDropTable.java | 4 +++ 5 files changed, 42 insertions(+), 23 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 9b8017f0beec..4a300820e2bb 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1058,6 +1058,11 @@ acceptedBreaks: new: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::()" justification: "Deprecations for 1.6.0 release" "1.6.0": + org.apache.iceberg:iceberg-api: + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.encryption.EncryptingFileIO" + new: "class org.apache.iceberg.encryption.EncryptingFileIO" + justification: "New method for Manifest List reading" org.apache.iceberg:iceberg-common: - code: "java.method.removed" old: "method org.apache.iceberg.common.DynFields.StaticField org.apache.iceberg.common.DynFields.Builder::buildStaticChecked()\ @@ -1091,6 +1096,12 @@ acceptedBreaks: - code: "java.class.removed" old: "enum org.apache.iceberg.BaseMetastoreTableOperations.CommitStatus" justification: "Removing deprecated code" + - code: "java.method.addedToInterface" + new: "method java.lang.Long org.apache.iceberg.encryption.NativeEncryptionKeyMetadata::fileLength()" + justification: "New method in interface" + - code: "java.method.addedToInterface" + new: "method org.apache.iceberg.encryption.NativeEncryptionKeyMetadata org.apache.iceberg.encryption.NativeEncryptionKeyMetadata::copyWithLength(long)" + justification: "New method in interface" - code: "java.method.removed" old: "method java.lang.String org.apache.iceberg.FileScanTaskParser::toJson(org.apache.iceberg.FileScanTask)" justification: "Removing deprecated code" diff --git a/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java b/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java index d253f884779b..9f49d167158f 100644 --- a/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java @@ -32,10 +32,7 @@ class BaseManifestListFile implements ManifestListFile, Serializable { private final byte[] encryptedKeyMetadata; BaseManifestListFile( - String location, - long snapshotId, - String keyMetadataKeyID, - ByteBuffer encryptedKeyMetadata) { + String location, long snapshotId, String keyMetadataKeyID, ByteBuffer encryptedKeyMetadata) { this.location = location; this.snapshotId = snapshotId; this.encryptedKeyMetadata = ByteBuffers.toByteArray(encryptedKeyMetadata); @@ -59,6 +56,10 @@ public String keyMetadataKeyId() { @Override public ByteBuffer encryptedKeyMetadata() { + if (encryptedKeyMetadata == null) { + return null; + } + return ByteBuffer.wrap(encryptedKeyMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java index f7b193caaee2..5c3cec9ebfa3 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java @@ -63,7 +63,19 @@ private ManifestListWriter( protected abstract ManifestFile prepare(ManifestFile manifest); protected abstract FileAppender newAppender( - OutputFile file, Map meta); + OutputFile outputFile, Map meta); + + public ManifestListFile toManifestListFile() { + if (em != null) { + String keyId = em.currentSnapshotKeyId(); + ByteBuffer encryptedKeyMetadata = + EncryptionUtil.encryptSnapshotKeyMetadata( + em.unwrapKey(keyId), snapshotId, keyMetadata.copyWithLength(writer.length())); + return new BaseManifestListFile(file.location(), snapshotId, keyId, encryptedKeyMetadata); + } else { + return new BaseManifestListFile(file.location(), snapshotId, null, null); + } + } @Override public void add(ManifestFile manifest) { @@ -95,13 +107,19 @@ public long length() { return writer.length(); } -<<<<<<< HEAD static class V3Writer extends ManifestListWriter { private final V3Metadata.IndexedManifestFile wrapper; - V3Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) { + V3Writer( + OutputFile snapshotFile, + EncryptionManager encryptionManager, + long snapshotId, + Long parentSnapshotId, + long sequenceNumber) { super( snapshotFile, + encryptionManager, + snapshotId, ImmutableMap.of( "snapshot-id", String.valueOf(snapshotId), "parent-snapshot-id", String.valueOf(parentSnapshotId), @@ -128,17 +146,6 @@ protected FileAppender newAppender(OutputFile file, Map>>>>>> 684e5e3f3 (draft commit) } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestLists.java b/core/src/main/java/org/apache/iceberg/ManifestLists.java index 1c32b6a10ed2..7012d652abc8 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestLists.java +++ b/core/src/main/java/org/apache/iceberg/ManifestLists.java @@ -84,14 +84,10 @@ static ManifestListWriter write( manifestListFile, encryptionManager, snapshotId, parentSnapshotId); case 2: return new ManifestListWriter.V2Writer( -<<<<<<< HEAD - manifestListFile, snapshotId, parentSnapshotId, sequenceNumber); + manifestListFile, encryptionManager, snapshotId, parentSnapshotId, sequenceNumber); case 3: return new ManifestListWriter.V3Writer( - manifestListFile, snapshotId, parentSnapshotId, sequenceNumber); -======= manifestListFile, encryptionManager, snapshotId, parentSnapshotId, sequenceNumber); ->>>>>>> 684e5e3f3 (draft commit) } throw new UnsupportedOperationException( "Cannot write manifest list for table version: " + formatVersion); diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java index 79f30e109f7d..22c3f8a139c7 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java @@ -36,6 +36,7 @@ import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; @@ -196,6 +197,9 @@ private static FileIO createMockFileIO(FileIO wrapped) { .thenAnswer( invocation -> wrapped.newInputFile(invocation.getArgument(0), invocation.getArgument(1))); + Mockito.when(mockIO.newInputFile(Mockito.any(ManifestListFile.class))) + .thenAnswer( + invocation -> wrapped.newInputFile((ManifestListFile) invocation.getArgument(0))); Mockito.when(mockIO.newInputFile(Mockito.any(ManifestFile.class))) .thenAnswer(invocation -> wrapped.newInputFile((ManifestFile) invocation.getArgument(0))); Mockito.when(mockIO.newInputFile(Mockito.any(DataFile.class))) From 0988f6fc1f1b4b2f18c273092b52b4ad7a6b3b7e Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Thu, 12 Sep 2024 10:10:23 +0300 Subject: [PATCH 3/7] spotless apply --- .../java/org/apache/iceberg/encryption/EncryptionUtil.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java index 1523055911e1..40147e1f27ff 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -107,7 +107,8 @@ public static EncryptionManager createEncryptionManager( "Invalid data key length: %s (must be 16, 24, or 32)", dataKeyLength); - return new StandardEncryptionManager(tableKeyId, dataKeyLength, ImmutableList.of(), kmsClient, writerKekTimeout); + return new StandardEncryptionManager( + tableKeyId, dataKeyLength, ImmutableList.of(), kmsClient, writerKekTimeout); } public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) { From 6e7de370d11f77b93dd110010bd69ad9995a8a47 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Thu, 12 Sep 2024 11:03:33 +0300 Subject: [PATCH 4/7] re-apply unwrapped cache patch Co-authored-by: Ryan Blue --- .../encryption/StandardEncryptionManager.java | 89 ++++++++++--------- .../encryption/WrappedEncryptionKey.java | 12 +-- 2 files changed, 49 insertions(+), 52 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 8186bcf3e97b..ff1b399166bc 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -18,12 +18,14 @@ */ package org.apache.iceberg.encryption; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import java.nio.ByteBuffer; import java.security.SecureRandom; import java.util.Base64; -import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.TableProperties; import org.apache.iceberg.io.InputFile; @@ -41,24 +43,29 @@ public class StandardEncryptionManager implements EncryptionManager { private final long writerKekTimeout; // a holder class of metadata that is not available after serialization - private static class KeyManagementMetadata { + private class TransientEncryptionState { private final KeyManagementClient kmsClient; private final Map encryptionKeys; + private final LoadingCache unwrappedKeyCache; private WrappedEncryptionKey currentEncryptionKey; - private KeyManagementMetadata(KeyManagementClient kmsClient) { + private TransientEncryptionState(KeyManagementClient kmsClient) { this.kmsClient = kmsClient; this.encryptionKeys = Maps.newLinkedHashMap(); + this.unwrappedKeyCache = + Caffeine.newBuilder() + .expireAfterWrite(1, TimeUnit.MINUTES) + .build(wrappedKey -> kmsClient.unwrapKey(wrappedKey, tableKeyId)); this.currentEncryptionKey = null; } } - private final transient KeyManagementMetadata keyData; + private final transient TransientEncryptionState transientState; private transient volatile SecureRandom lazyRNG = null; /** - * @deprecated will be removed in 2.0.0. use {@link #StandardEncryptionManager(String, int, List, + * @deprecated will be removed in 1.8.0. use {@link #StandardEncryptionManager(String, int, List, * KeyManagementClient, long)} instead. */ @Deprecated @@ -91,16 +98,16 @@ public StandardEncryptionManager( dataKeyLength); Preconditions.checkNotNull(kmsClient, "Invalid KMS client: null"); this.tableKeyId = tableKeyId; - this.keyData = new KeyManagementMetadata(kmsClient); + this.transientState = new TransientEncryptionState(kmsClient); this.dataKeyLength = dataKeyLength; this.writerKekTimeout = writerKekTimeout; for (WrappedEncryptionKey key : keys) { - keyData.encryptionKeys.put(key.id(), key); + transientState.encryptionKeys.put(key.id(), key); - if (keyData.currentEncryptionKey == null - || keyData.currentEncryptionKey.timestamp() < key.timestamp()) { - keyData.currentEncryptionKey = key; + if (transientState.currentEncryptionKey == null + || transientState.currentEncryptionKey.timestamp() < key.timestamp()) { + transientState.currentEncryptionKey = key; } } } @@ -133,59 +140,52 @@ private SecureRandom workerRNG() { return lazyRNG; } + /** + * @deprecated will be removed in 1.8.0; use {@link #currentSnapshotKeyId()} instead. + */ + @Deprecated public ByteBuffer wrapKey(ByteBuffer secretKey) { - if (keyData == null) { + if (transientState == null) { throw new IllegalStateException( "Cannot wrap key after called after serialization (missing KMS client)"); } - return keyData.kmsClient.wrapKey(secretKey, tableKeyId); + return transientState.kmsClient.wrapKey(secretKey, tableKeyId); } + /** + * @deprecated will be removed in 1.8.0; use {@link #unwrapKey(String)}} instead. + */ + @Deprecated public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) { - if (keyData == null) { + if (transientState == null) { throw new IllegalStateException("Cannot unwrap key after serialization (missing KMS client)"); } - return keyData.kmsClient.unwrapKey(wrappedSecretKey, tableKeyId); + return transientState.unwrappedKeyCache.get(wrappedSecretKey); } public String currentSnapshotKeyId() { - if (keyData == null) { + if (transientState == null) { throw new IllegalStateException("Cannot return the current snapshot key after serialization"); } - if (keyData.currentEncryptionKey == null - || keyData.currentEncryptionKey.timestamp() + if (transientState.currentEncryptionKey == null + || transientState.currentEncryptionKey.timestamp() < System.currentTimeMillis() - writerKekTimeout) { createNewEncryptionKey(); } - return keyData.currentEncryptionKey.id(); + return transientState.currentEncryptionKey.id(); } public ByteBuffer unwrapKey(String keyId) { - if (keyData == null) { + if (transientState == null) { throw new IllegalStateException("Cannot unwrap key after serialization (missing KMS client)"); } - WrappedEncryptionKey cachedKey = keyData.encryptionKeys.get(keyId); - ByteBuffer key = cachedKey.key(); - - if (key == null) { - key = unwrapKey(cachedKey.wrappedKey()); - cachedKey.setUnwrappedKey(key); - } - - return key; - } - - Collection keys() { - if (keyData == null) { - throw new IllegalStateException("Cannot return the current keys after serialization"); - } - - return keyData.encryptionKeys.values(); + return transientState.unwrappedKeyCache.get( + transientState.encryptionKeys.get(keyId).wrappedKey()); } private ByteBuffer newKey() { @@ -201,12 +201,19 @@ private String newKeyId() { } private void createNewEncryptionKey() { - long now = System.currentTimeMillis(); - ByteBuffer keyBytes = newKey(); + if (transientState == null) { + throw new IllegalStateException("Cannot create encryption keys after serialization"); + } + + ByteBuffer unwrapped = newKey(); + ByteBuffer wrapped = transientState.kmsClient.wrapKey(unwrapped, tableKeyId); WrappedEncryptionKey key = - new WrappedEncryptionKey(newKeyId(), keyBytes, wrapKey(keyBytes), now); - keyData.encryptionKeys.put(key.id(), key); - keyData.currentEncryptionKey = key; + new WrappedEncryptionKey(newKeyId(), wrapped, System.currentTimeMillis()); + + // update internal tracking + transientState.unwrappedKeyCache.put(wrapped, unwrapped); + transientState.encryptionKeys.put(key.id(), key); + transientState.currentEncryptionKey = key; } private class StandardEncryptedOutputFile implements NativeEncryptionOutputFile { diff --git a/core/src/main/java/org/apache/iceberg/encryption/WrappedEncryptionKey.java b/core/src/main/java/org/apache/iceberg/encryption/WrappedEncryptionKey.java index c5bad6ca1251..480d6b0d14d1 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/WrappedEncryptionKey.java +++ b/core/src/main/java/org/apache/iceberg/encryption/WrappedEncryptionKey.java @@ -25,10 +25,8 @@ public class WrappedEncryptionKey implements Serializable { private final String keyID; private final ByteBuffer wrappedKey; private final long timestamp; - private ByteBuffer keyBytes; - public WrappedEncryptionKey( - String keyID, ByteBuffer keyBytes, ByteBuffer wrappedKey, long timestamp) { + public WrappedEncryptionKey(String keyID, ByteBuffer wrappedKey, long timestamp) { this.keyID = keyID; this.wrappedKey = wrappedKey; this.timestamp = timestamp; @@ -45,12 +43,4 @@ public ByteBuffer wrappedKey() { public long timestamp() { return timestamp; } - - public ByteBuffer key() { - return keyBytes; - } - - public void setUnwrappedKey(ByteBuffer key) { - keyBytes = key; - } } From 1c03cd19c6fc68f4a086340e7f884d69580930ca Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Thu, 12 Sep 2024 13:12:42 +0300 Subject: [PATCH 5/7] caching limit --- .../apache/iceberg/encryption/StandardEncryptionManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index ff1b399166bc..b7da59648bb2 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -54,7 +54,8 @@ private TransientEncryptionState(KeyManagementClient kmsClient) { this.encryptionKeys = Maps.newLinkedHashMap(); this.unwrappedKeyCache = Caffeine.newBuilder() - .expireAfterWrite(1, TimeUnit.MINUTES) + .maximumSize(1000) + .expireAfterWrite(1, TimeUnit.DAYS) .build(wrappedKey -> kmsClient.unwrapKey(wrappedKey, tableKeyId)); this.currentEncryptionKey = null; } From e637adcbb2ed18ec7debcab1333e4ebc886f726e Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Tue, 17 Sep 2024 14:05:03 +0300 Subject: [PATCH 6/7] address review comments --- .palantir/revapi.yml | 6 --- .../org/apache/iceberg/CatalogProperties.java | 6 --- .../iceberg/encryption/EncryptionUtil.java | 11 +++-- .../NativeEncryptionKeyMetadata.java | 10 ++++- .../encryption/StandardEncryptionManager.java | 42 +++++++------------ .../encryption/WrappedEncryptionKey.java | 4 ++ .../encryption/EncryptionTestHelpers.java | 3 +- 7 files changed, 34 insertions(+), 48 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 4a300820e2bb..aa0b974ca2ca 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1096,12 +1096,6 @@ acceptedBreaks: - code: "java.class.removed" old: "enum org.apache.iceberg.BaseMetastoreTableOperations.CommitStatus" justification: "Removing deprecated code" - - code: "java.method.addedToInterface" - new: "method java.lang.Long org.apache.iceberg.encryption.NativeEncryptionKeyMetadata::fileLength()" - justification: "New method in interface" - - code: "java.method.addedToInterface" - new: "method org.apache.iceberg.encryption.NativeEncryptionKeyMetadata org.apache.iceberg.encryption.NativeEncryptionKeyMetadata::copyWithLength(long)" - justification: "New method in interface" - code: "java.method.removed" old: "method java.lang.String org.apache.iceberg.FileScanTaskParser::toJson(org.apache.iceberg.FileScanTask)" justification: "Removing deprecated code" diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index 27d7cd91131b..339c59b45d1b 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -160,10 +160,4 @@ private CatalogProperties() {} public static final String ENCRYPTION_KMS_TYPE = "encryption.kms-type"; public static final String ENCRYPTION_KMS_IMPL = "encryption.kms-impl"; - public static final String WRITER_KEK_TIMEOUT_SEC = "encryption.kek-timeout-sec"; - - /** - * Default time-out of key encryption keys. Per NIST SP 800-57 P1 R5 section 5.3.6, set to 1 week. - */ - public static final long WRITER_KEK_TIMEOUT_SEC_DEFAULT = TimeUnit.DAYS.toSeconds(7); } diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java index 40147e1f27ff..bb2b9d76460b 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -25,6 +25,7 @@ import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.TableProperties; import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -77,7 +78,7 @@ public static KeyManagementClient createKmsClient(Map catalogPro /** * @deprecated will be removed in 2.0.0. use {@link #createEncryptionManager(String, int, - * KeyManagementClient, long)} instead. + * KeyManagementClient)} instead. */ @Deprecated public static EncryptionManager createEncryptionManager( @@ -89,12 +90,11 @@ public static EncryptionManager createEncryptionManager( TableProperties.ENCRYPTION_DEK_LENGTH, TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); - return createEncryptionManager( - tableKeyId, dataKeyLength, kmsClient, CatalogProperties.WRITER_KEK_TIMEOUT_SEC_DEFAULT); + return createEncryptionManager(tableKeyId, dataKeyLength, kmsClient); } public static EncryptionManager createEncryptionManager( - String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient, long writerKekTimeout) { + String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { Preconditions.checkArgument(kmsClient != null, "Invalid KMS client: null"); if (null == tableKeyId) { @@ -107,8 +107,7 @@ public static EncryptionManager createEncryptionManager( "Invalid data key length: %s (must be 16, 24, or 32)", dataKeyLength); - return new StandardEncryptionManager( - tableKeyId, dataKeyLength, ImmutableList.of(), kmsClient, writerKekTimeout); + return new StandardEncryptionManager(tableKeyId, dataKeyLength, ImmutableList.of(), kmsClient); } public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) { diff --git a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java index 127cf8a8b63a..2188378a4e87 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java +++ b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java @@ -29,7 +29,10 @@ public interface NativeEncryptionKeyMetadata extends EncryptionKeyMetadata { ByteBuffer aadPrefix(); /** Encrypted file length */ - Long fileLength(); + default Long fileLength() { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement fileLength"); + } /** * Copy this key metadata and set the file length. @@ -37,5 +40,8 @@ public interface NativeEncryptionKeyMetadata extends EncryptionKeyMetadata { * @param length length of the encrypted file in bytes * @return a copy of this key metadata (key and AAD) with the file length */ - NativeEncryptionKeyMetadata copyWithLength(long length); + default NativeEncryptionKeyMetadata copyWithLength(long length) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement copyWithLength"); + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index b7da59648bb2..3a613ffd3241 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.TableProperties; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -38,15 +37,19 @@ import org.apache.iceberg.util.ByteBuffers; public class StandardEncryptionManager implements EncryptionManager { + /** + * Default time-out of key encryption keys. Per NIST SP 800-57 P1 R5 section 5.3.6, set to 1 week. + */ + private static final long KEY_ENCRYPTION_KEY_TIMEOUT = TimeUnit.DAYS.toMillis(7); + private final String tableKeyId; private final int dataKeyLength; - private final long writerKekTimeout; // a holder class of metadata that is not available after serialization private class TransientEncryptionState { private final KeyManagementClient kmsClient; private final Map encryptionKeys; - private final LoadingCache unwrappedKeyCache; + private final LoadingCache unwrappedKeyCache; private WrappedEncryptionKey currentEncryptionKey; private TransientEncryptionState(KeyManagementClient kmsClient) { @@ -54,9 +57,9 @@ private TransientEncryptionState(KeyManagementClient kmsClient) { this.encryptionKeys = Maps.newLinkedHashMap(); this.unwrappedKeyCache = Caffeine.newBuilder() - .maximumSize(1000) - .expireAfterWrite(1, TimeUnit.DAYS) - .build(wrappedKey -> kmsClient.unwrapKey(wrappedKey, tableKeyId)); + .expireAfterWrite(1, TimeUnit.HOURS) + .build( + keyId -> kmsClient.unwrapKey(encryptionKeys.get(keyId).wrappedKey(), tableKeyId)); this.currentEncryptionKey = null; } } @@ -67,31 +70,24 @@ private TransientEncryptionState(KeyManagementClient kmsClient) { /** * @deprecated will be removed in 1.8.0. use {@link #StandardEncryptionManager(String, int, List, - * KeyManagementClient, long)} instead. + * KeyManagementClient)} instead. */ @Deprecated public StandardEncryptionManager( String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { - this( - tableKeyId, - dataKeyLength, - ImmutableList.of(), - kmsClient, - CatalogProperties.WRITER_KEK_TIMEOUT_SEC_DEFAULT); + this(tableKeyId, dataKeyLength, ImmutableList.of(), kmsClient); } /** * @param tableKeyId table encryption key id * @param dataKeyLength length of data encryption key (16/24/32 bytes) * @param kmsClient Client of KMS used to wrap/unwrap keys in envelope encryption - * @param writerKekTimeout timeout of kek (key encryption key) cache entries */ StandardEncryptionManager( String tableKeyId, int dataKeyLength, List keys, - KeyManagementClient kmsClient, - long writerKekTimeout) { + KeyManagementClient kmsClient) { Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null"); Preconditions.checkArgument( dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32, @@ -101,7 +97,6 @@ public StandardEncryptionManager( this.tableKeyId = tableKeyId; this.transientState = new TransientEncryptionState(kmsClient); this.dataKeyLength = dataKeyLength; - this.writerKekTimeout = writerKekTimeout; for (WrappedEncryptionKey key : keys) { transientState.encryptionKeys.put(key.id(), key); @@ -159,11 +154,7 @@ public ByteBuffer wrapKey(ByteBuffer secretKey) { */ @Deprecated public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) { - if (transientState == null) { - throw new IllegalStateException("Cannot unwrap key after serialization (missing KMS client)"); - } - - return transientState.unwrappedKeyCache.get(wrappedSecretKey); + throw new UnsupportedOperationException("Use unwrapKey(String) instead"); } public String currentSnapshotKeyId() { @@ -173,7 +164,7 @@ public String currentSnapshotKeyId() { if (transientState.currentEncryptionKey == null || transientState.currentEncryptionKey.timestamp() - < System.currentTimeMillis() - writerKekTimeout) { + < System.currentTimeMillis() - KEY_ENCRYPTION_KEY_TIMEOUT) { createNewEncryptionKey(); } @@ -185,8 +176,7 @@ public ByteBuffer unwrapKey(String keyId) { throw new IllegalStateException("Cannot unwrap key after serialization (missing KMS client)"); } - return transientState.unwrappedKeyCache.get( - transientState.encryptionKeys.get(keyId).wrappedKey()); + return transientState.unwrappedKeyCache.get(keyId); } private ByteBuffer newKey() { @@ -212,7 +202,7 @@ private void createNewEncryptionKey() { new WrappedEncryptionKey(newKeyId(), wrapped, System.currentTimeMillis()); // update internal tracking - transientState.unwrappedKeyCache.put(wrapped, unwrapped); + transientState.unwrappedKeyCache.put(key.id(), unwrapped); transientState.encryptionKeys.put(key.id(), key); transientState.currentEncryptionKey = key; } diff --git a/core/src/main/java/org/apache/iceberg/encryption/WrappedEncryptionKey.java b/core/src/main/java/org/apache/iceberg/encryption/WrappedEncryptionKey.java index 480d6b0d14d1..c3824f66e9de 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/WrappedEncryptionKey.java +++ b/core/src/main/java/org/apache/iceberg/encryption/WrappedEncryptionKey.java @@ -21,6 +21,10 @@ import java.io.Serializable; import java.nio.ByteBuffer; +/** + * This class keeps a wrapped (KMS-encrypted) version of the keys used to encrypt manifest list key + * metadata. These keys have an ID and a creation timestamp. + */ public class WrappedEncryptionKey implements Serializable { private final String keyID; private final ByteBuffer wrappedKey; diff --git a/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java index 94f8eb603d5e..cdc8c2e1515b 100644 --- a/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java @@ -35,7 +35,6 @@ public static EncryptionManager createEncryptionManager() { return EncryptionUtil.createEncryptionManager( UnitestKMS.MASTER_KEY_NAME1, TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT, - EncryptionUtil.createKmsClient(catalogProperties), - CatalogProperties.WRITER_KEK_TIMEOUT_SEC_DEFAULT); + EncryptionUtil.createKmsClient(catalogProperties)); } } From 1fbdaa68d87a2224d753615780a82d61ef279a2b Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Tue, 17 Sep 2024 14:15:33 +0300 Subject: [PATCH 7/7] spotless apply --- .../main/java/org/apache/iceberg/encryption/EncryptionUtil.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java index bb2b9d76460b..dfc00f256e12 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -25,7 +25,6 @@ import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.TableProperties; import org.apache.iceberg.common.DynConstructors; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;