diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 9b8017f0beec..aa0b974ca2ca 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1058,6 +1058,11 @@ acceptedBreaks: new: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::()" justification: "Deprecations for 1.6.0 release" "1.6.0": + org.apache.iceberg:iceberg-api: + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.encryption.EncryptingFileIO" + new: "class org.apache.iceberg.encryption.EncryptingFileIO" + justification: "New method for Manifest List reading" org.apache.iceberg:iceberg-common: - code: "java.method.removed" old: "method org.apache.iceberg.common.DynFields.StaticField org.apache.iceberg.common.DynFields.Builder::buildStaticChecked()\ diff --git a/api/src/main/java/org/apache/iceberg/ManifestListFile.java b/api/src/main/java/org/apache/iceberg/ManifestListFile.java new file mode 100644 index 000000000000..e536a0b286ac --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/ManifestListFile.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.nio.ByteBuffer; +import org.apache.iceberg.encryption.EncryptionManager; + +public interface ManifestListFile { + + /** Location of manifest list file. */ + String location(); + + /** Snapshot ID of the manifest list. */ + long snapshotId(); + + /** + * The manifest list key metadata is encrypted with a "key encryption key" (KEK). Returns the KEK + * ID for this manifest file. + */ + String keyMetadataKeyId(); + + /** Returns the manifest list key metadata, encrypted with its KEK. */ + ByteBuffer encryptedKeyMetadata(); + + /** Decrypt and return the encrypted key metadata */ + ByteBuffer decryptKeyMetadata(EncryptionManager em); +} diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java b/api/src/main/java/org/apache/iceberg/Snapshot.java index c035259e0e2c..71ba5dbe4955 100644 --- a/api/src/main/java/org/apache/iceberg/Snapshot.java +++ b/api/src/main/java/org/apache/iceberg/Snapshot.java @@ -162,6 +162,16 @@ default Iterable removedDeleteFiles(FileIO io) { */ String manifestListLocation(); + /** + * This snapshot's manifest list file info: size, encryption key metadata and location + * + * @return manifest list file info + */ + default ManifestListFile manifestListFile() { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement manifestListFile method"); + } + /** * Return the id of the schema used when this snapshot was created, or null if this information is * not available. diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java index 0203361844a5..cb2bc93ba657 100644 --- a/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java +++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java @@ -28,6 +28,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -109,13 +110,21 @@ public InputFile newInputFile(ManifestFile manifest) { } } + @Override + public InputFile newInputFile(ManifestListFile manifestList) { + if (manifestList.encryptedKeyMetadata() != null) { + ByteBuffer keyMetadata = manifestList.decryptKeyMetadata(em); + return newDecryptingInputFile(manifestList.location(), keyMetadata); + } else { + return newInputFile(manifestList.location()); + } + } + public InputFile newDecryptingInputFile(String path, ByteBuffer buffer) { return em.decrypt(wrap(io.newInputFile(path), buffer)); } public InputFile newDecryptingInputFile(String path, long length, ByteBuffer buffer) { - // TODO: is the length correct for the encrypted file? It may be the length of the plaintext - // stream return em.decrypt(wrap(io.newInputFile(path, length), buffer)); } diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java b/api/src/main/java/org/apache/iceberg/io/FileIO.java index de4bc2e12a81..eff084c7dd34 100644 --- a/api/src/main/java/org/apache/iceberg/io/FileIO.java +++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java @@ -24,6 +24,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** @@ -70,6 +71,15 @@ default InputFile newInputFile(ManifestFile manifest) { return newInputFile(manifest.path(), manifest.length()); } + default InputFile newInputFile(ManifestListFile manifestList) { + Preconditions.checkArgument( + manifestList.encryptedKeyMetadata() == null, + "Cannot decrypt manifest list: %s (use EncryptingFileIO)", + manifestList.location()); + // cannot pass length because it is not tracked outside of key metadata + return newInputFile(manifestList.location()); + } + /** Get a {@link OutputFile} instance to write bytes to the file at the given path. */ OutputFile newOutputFile(String path); diff --git a/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java b/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java new file mode 100644 index 000000000000..9f49d167158f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.util.ByteBuffers; + +class BaseManifestListFile implements ManifestListFile, Serializable { + private final String location; + private final long snapshotId; + private final String keyMetadataKeyID; + // stored as a byte array to be Serializable + private final byte[] encryptedKeyMetadata; + + BaseManifestListFile( + String location, long snapshotId, String keyMetadataKeyID, ByteBuffer encryptedKeyMetadata) { + this.location = location; + this.snapshotId = snapshotId; + this.encryptedKeyMetadata = ByteBuffers.toByteArray(encryptedKeyMetadata); + this.keyMetadataKeyID = keyMetadataKeyID; + } + + @Override + public String location() { + return location; + } + + @Override + public long snapshotId() { + return snapshotId; + } + + @Override + public String keyMetadataKeyId() { + return keyMetadataKeyID; + } + + @Override + public ByteBuffer encryptedKeyMetadata() { + if (encryptedKeyMetadata == null) { + return null; + } + + return ByteBuffer.wrap(encryptedKeyMetadata); + } + + @Override + public ByteBuffer decryptKeyMetadata(EncryptionManager em) { + return EncryptionUtil.decryptSnapshotKeyMetadata(this, em); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index a3c4fc8738cd..649ebe865ed5 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -26,6 +26,7 @@ import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -38,11 +39,11 @@ class BaseSnapshot implements Snapshot { private final Long parentId; private final long sequenceNumber; private final long timestampMillis; - private final String manifestListLocation; private final String operation; private final Map summary; private final Integer schemaId; private final String[] v1ManifestLocations; + private final ManifestListFile manifestListFile; // lazily initialized private transient List allManifests = null; @@ -53,6 +54,7 @@ class BaseSnapshot implements Snapshot { private transient List addedDeleteFiles = null; private transient List removedDeleteFiles = null; + @VisibleForTesting BaseSnapshot( long sequenceNumber, long snapshotId, @@ -62,6 +64,26 @@ class BaseSnapshot implements Snapshot { Map summary, Integer schemaId, String manifestList) { + this( + sequenceNumber, + snapshotId, + parentId, + timestampMillis, + operation, + summary, + schemaId, + new BaseManifestListFile(manifestList, snapshotId, null, null)); + } + + BaseSnapshot( + long sequenceNumber, + long snapshotId, + Long parentId, + long timestampMillis, + String operation, + Map summary, + Integer schemaId, + ManifestListFile manifestListFile) { this.sequenceNumber = sequenceNumber; this.snapshotId = snapshotId; this.parentId = parentId; @@ -69,7 +91,7 @@ class BaseSnapshot implements Snapshot { this.operation = operation; this.summary = summary; this.schemaId = schemaId; - this.manifestListLocation = manifestList; + this.manifestListFile = manifestListFile; this.v1ManifestLocations = null; } @@ -89,7 +111,7 @@ class BaseSnapshot implements Snapshot { this.operation = operation; this.summary = summary; this.schemaId = schemaId; - this.manifestListLocation = null; + this.manifestListFile = new BaseManifestListFile(null, snapshotId, null, null); this.v1ManifestLocations = v1ManifestLocations; } @@ -128,6 +150,11 @@ public Integer schemaId() { return schemaId; } + @Override + public ManifestListFile manifestListFile() { + return manifestListFile; + } + private void cacheManifests(FileIO fileIO) { if (fileIO == null) { throw new IllegalArgumentException("Cannot cache changes: FileIO is null"); @@ -143,7 +170,7 @@ private void cacheManifests(FileIO fileIO) { if (allManifests == null) { // if manifests isn't set, then the snapshotFile is set and should be read to get the list - this.allManifests = ManifestLists.read(fileIO.newInputFile(manifestListLocation)); + this.allManifests = ManifestLists.read(fileIO.newInputFile(manifestListFile)); } if (dataManifests == null || deleteManifests == null) { @@ -216,7 +243,7 @@ public Iterable removedDeleteFiles(FileIO fileIO) { @Override public String manifestListLocation() { - return manifestListLocation; + return manifestListFile.location(); } private void cacheDeleteFileChanges(FileIO fileIO) { @@ -317,7 +344,7 @@ public String toString() { .add("timestamp_ms", timestampMillis) .add("operation", operation) .add("summary", summary) - .add("manifest-list", manifestListLocation) + .add("manifest-list", manifestListFile.location()) .add("schema-id", schemaId) .toString(); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java index b17eedad18af..5c3cec9ebfa3 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java @@ -19,9 +19,15 @@ package org.apache.iceberg; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Iterator; import java.util.Map; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata; +import org.apache.iceberg.encryption.NativeEncryptionOutputFile; +import org.apache.iceberg.encryption.StandardEncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; @@ -29,16 +35,47 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; abstract class ManifestListWriter implements FileAppender { + private final long snapshotId; + private final StandardEncryptionManager em; + private final NativeEncryptionKeyMetadata keyMetadata; private final FileAppender writer; + private final OutputFile file; + + private ManifestListWriter( + OutputFile file, EncryptionManager em, long snapshotId, Map meta) { + if (em instanceof StandardEncryptionManager) { + // only encrypt the manifest list if standard table encryption is used because the ability to + // encrypt the manifest list key was introduced for standard encryption. + this.em = (StandardEncryptionManager) em; + NativeEncryptionOutputFile encryptedFile = this.em.encrypt(file); + this.file = encryptedFile.encryptingOutputFile(); + this.keyMetadata = encryptedFile.keyMetadata(); + } else { + this.em = null; + this.file = file; + this.keyMetadata = null; + } - private ManifestListWriter(OutputFile file, Map meta) { - this.writer = newAppender(file, meta); + this.snapshotId = snapshotId; + this.writer = newAppender(this.file, meta); } protected abstract ManifestFile prepare(ManifestFile manifest); protected abstract FileAppender newAppender( - OutputFile file, Map meta); + OutputFile outputFile, Map meta); + + public ManifestListFile toManifestListFile() { + if (em != null) { + String keyId = em.currentSnapshotKeyId(); + ByteBuffer encryptedKeyMetadata = + EncryptionUtil.encryptSnapshotKeyMetadata( + em.unwrapKey(keyId), snapshotId, keyMetadata.copyWithLength(writer.length())); + return new BaseManifestListFile(file.location(), snapshotId, keyId, encryptedKeyMetadata); + } else { + return new BaseManifestListFile(file.location(), snapshotId, null, null); + } + } @Override public void add(ManifestFile manifest) { @@ -73,9 +110,16 @@ public long length() { static class V3Writer extends ManifestListWriter { private final V3Metadata.IndexedManifestFile wrapper; - V3Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) { + V3Writer( + OutputFile snapshotFile, + EncryptionManager encryptionManager, + long snapshotId, + Long parentSnapshotId, + long sequenceNumber) { super( snapshotFile, + encryptionManager, + snapshotId, ImmutableMap.of( "snapshot-id", String.valueOf(snapshotId), "parent-snapshot-id", String.valueOf(parentSnapshotId), @@ -108,9 +152,16 @@ protected FileAppender newAppender(OutputFile file, Map newAppender(OutputFile file, Map> implements FileAp static final long UNASSIGNED_SEQ = -1L; private final OutputFile file; - private final ByteBuffer keyMetadataBuffer; + private final EncryptionKeyMetadata keyMetadata; private final int specId; private final FileAppender> writer; private final Long snapshotId; @@ -62,7 +64,7 @@ private ManifestWriter(PartitionSpec spec, EncryptedOutputFile file, Long snapsh this.snapshotId = snapshotId; this.reused = new GenericManifestEntry<>(spec.partitionType()); this.stats = new PartitionSummary(spec); - this.keyMetadataBuffer = (file.keyMetadata() == null) ? null : file.keyMetadata().buffer(); + this.keyMetadata = file.keyMetadata(); } protected abstract ManifestEntry prepare(ManifestEntry entry); @@ -189,6 +191,18 @@ public long length() { public ManifestFile toManifestFile() { Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed"); + + // if key metadata can store the length, add it + ByteBuffer keyMetadataBuffer; + if (keyMetadata instanceof NativeEncryptionKeyMetadata) { + keyMetadataBuffer = + ((NativeEncryptionKeyMetadata) keyMetadata).copyWithLength(length()).buffer(); + } else if (keyMetadata != null) { + keyMetadataBuffer = keyMetadata.buffer(); + } else { + keyMetadataBuffer = null; + } + // if the minSequenceNumber is null, then no manifests with a sequence number have been written, // so the min data sequence number is the one that will be assigned when this is committed. // pass UNASSIGNED_SEQ to inherit it. diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java b/core/src/main/java/org/apache/iceberg/SnapshotParser.java index bc5ef6094695..04234d0b4273 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Base64; import java.util.Iterator; import java.util.Map; import org.apache.iceberg.io.FileIO; @@ -30,6 +32,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.JsonUtil; public class SnapshotParser { @@ -48,6 +51,8 @@ private SnapshotParser() {} private static final String MANIFESTS = "manifests"; private static final String MANIFEST_LIST = "manifest-list"; private static final String SCHEMA_ID = "schema-id"; + private static final String ENCRYPTED_KEY_METADATA = "encrypted-key-metadata"; + private static final String KEY_METADATA_KEY_ID = "key-metadata-key-id"; static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException { generator.writeStartObject(); @@ -76,10 +81,10 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOExceptio generator.writeEndObject(); } - String manifestList = snapshot.manifestListLocation(); - if (manifestList != null) { + ManifestListFile manifestList = snapshot.manifestListFile(); + if (manifestList.location() != null) { // write just the location. manifests should not be embedded in JSON along with a list - generator.writeStringField(MANIFEST_LIST, manifestList); + generator.writeStringField(MANIFEST_LIST, manifestList.location()); } else { // embed the manifest list in the JSON, v1 only JsonUtil.writeStringArray( @@ -93,6 +98,14 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOExceptio generator.writeNumberField(SCHEMA_ID, snapshot.schemaId()); } + if (manifestList.encryptedKeyMetadata() != null) { + String encodedKeyMetadata = + Base64.getEncoder() + .encodeToString(ByteBuffers.toByteArray(manifestList.encryptedKeyMetadata())); + generator.writeStringField(ENCRYPTED_KEY_METADATA, encodedKeyMetadata); + generator.writeStringField(KEY_METADATA_KEY_ID, manifestList.keyMetadataKeyId()); + } + generator.writeEndObject(); } @@ -147,6 +160,18 @@ static Snapshot fromJson(JsonNode node) { if (node.has(MANIFEST_LIST)) { // the manifest list is stored in a manifest list file String manifestList = JsonUtil.getString(MANIFEST_LIST, node); + + // Manifest list can be encrypted + ManifestListFile manifestListFile; + if (node.has(ENCRYPTED_KEY_METADATA)) { + String encodedKeyMetadata = JsonUtil.getString(ENCRYPTED_KEY_METADATA, node); + ByteBuffer keyMetadata = ByteBuffer.wrap(Base64.getDecoder().decode(encodedKeyMetadata)); + String keyId = JsonUtil.getString(KEY_METADATA_KEY_ID, node); + manifestListFile = new BaseManifestListFile(manifestList, snapshotId, keyId, keyMetadata); + } else { + manifestListFile = new BaseManifestListFile(manifestList, snapshotId, null, null); + } + return new BaseSnapshot( sequenceNumber, snapshotId, @@ -155,7 +180,7 @@ static Snapshot fromJson(JsonNode node) { operation, summary, schemaId, - manifestList); + manifestListFile); } else { // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 74997cc89849..146a75076ccf 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -236,14 +236,16 @@ public Snapshot apply() { List manifests = apply(base, parentSnapshot); OutputFile manifestList = manifestListPath(); - - try (ManifestListWriter writer = - ManifestLists.write( - ops.current().formatVersion(), - manifestList, - snapshotId(), - parentSnapshotId, - sequenceNumber)) { + ManifestListWriter writer = null; + try { + writer = + ManifestLists.write( + ops.current().formatVersion(), + ops.encryption(), + manifestList, + snapshotId(), + parentSnapshotId, + sequenceNumber); // keep track of the manifest lists created manifestLists.add(manifestList.location()); @@ -257,8 +259,15 @@ public Snapshot apply() { .run(index -> manifestFiles[index] = manifestsWithMetadata.get(manifests.get(index))); writer.addAll(Arrays.asList(manifestFiles)); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to write manifest list file"); + + } finally { + if (writer != null) { + try { + writer.close(); // must close before getting file length + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to close manifest list file writer"); + } + } } return new BaseSnapshot( @@ -269,7 +278,7 @@ public Snapshot apply() { operation(), summary(base), base.currentSchemaId(), - manifestList.location()); + writer.toManifestListFile()); } protected abstract Map summary(); diff --git a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java index a43643fcc779..b03944859b6e 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java +++ b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java @@ -26,20 +26,34 @@ public class AesGcmInputFile implements InputFile { private final InputFile sourceFile; private final byte[] dataKey; private final byte[] fileAADPrefix; - private long plaintextLength; + private Long encryptedLength; + private Long plaintextLength; public AesGcmInputFile(InputFile sourceFile, byte[] dataKey, byte[] fileAADPrefix) { + this(sourceFile, dataKey, fileAADPrefix, null); + } + + public AesGcmInputFile(InputFile sourceFile, byte[] dataKey, byte[] fileAADPrefix, Long length) { this.sourceFile = sourceFile; this.dataKey = dataKey; this.fileAADPrefix = fileAADPrefix; - this.plaintextLength = -1; + this.encryptedLength = length; + this.plaintextLength = null; + } + + private long encryptedLength() { + if (encryptedLength == null) { + this.encryptedLength = sourceFile.getLength(); + } + + return encryptedLength; } @Override public long getLength() { - if (plaintextLength == -1) { + if (plaintextLength == null) { // Presumes all streams use hard-coded plaintext block size. - plaintextLength = AesGcmInputStream.calculatePlaintextLength(sourceFile.getLength()); + plaintextLength = AesGcmInputStream.calculatePlaintextLength(encryptedLength()); } return plaintextLength; @@ -47,7 +61,7 @@ public long getLength() { @Override public SeekableInputStream newStream() { - long ciphertextLength = sourceFile.getLength(); + long ciphertextLength = encryptedLength(); Preconditions.checkState( ciphertextLength >= Ciphers.MIN_STREAM_LENGTH, "Invalid encrypted stream: %d is shorter than the minimum possible stream length", diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java index e2cf98bf767f..dfc00f256e12 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -18,12 +18,17 @@ */ package org.apache.iceberg.encryption; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Map; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.TableProperties; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.PropertyUtil; public class EncryptionUtil { @@ -70,31 +75,93 @@ public static KeyManagementClient createKmsClient(Map catalogPro return kmsClient; } + /** + * @deprecated will be removed in 2.0.0. use {@link #createEncryptionManager(String, int, + * KeyManagementClient)} instead. + */ + @Deprecated public static EncryptionManager createEncryptionManager( Map tableProperties, KeyManagementClient kmsClient) { - Preconditions.checkArgument(kmsClient != null, "Invalid KMS client: null"); String tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY); - - if (null == tableKeyId) { - // Unencrypted table - return PlaintextEncryptionManager.instance(); - } - int dataKeyLength = PropertyUtil.propertyAsInt( tableProperties, TableProperties.ENCRYPTION_DEK_LENGTH, TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); + return createEncryptionManager(tableKeyId, dataKeyLength, kmsClient); + } + + public static EncryptionManager createEncryptionManager( + String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { + Preconditions.checkArgument(kmsClient != null, "Invalid KMS client: null"); + + if (null == tableKeyId) { + // Unencrypted table + return PlaintextEncryptionManager.instance(); + } + Preconditions.checkState( dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32, "Invalid data key length: %s (must be 16, 24, or 32)", dataKeyLength); - return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient); + return new StandardEncryptionManager(tableKeyId, dataKeyLength, ImmutableList.of(), kmsClient); } public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) { return new BaseEncryptedOutputFile(encryptingOutputFile, EncryptionKeyMetadata.empty()); } + + /** + * Decrypt the key metadata for a snapshot. + * + *

Encryption for snapshot key metadata is only available for tables using standard encryption. + * + * @param manifestList a ManifestListFile + * @param em the table's EncryptionManager + * @return a decrypted key metadata buffer + */ + public static ByteBuffer decryptSnapshotKeyMetadata( + ManifestListFile manifestList, EncryptionManager em) { + Preconditions.checkState( + em instanceof StandardEncryptionManager, + "Snapshot key metadata encryption requires a StandardEncryptionManager"); + ByteBuffer unwrappedKey = + ((StandardEncryptionManager) em).unwrapKey(manifestList.keyMetadataKeyId()); + return decryptSnapshotKeyMetadata( + unwrappedKey, manifestList.snapshotId(), manifestList.encryptedKeyMetadata()); + } + + private static ByteBuffer decryptSnapshotKeyMetadata( + ByteBuffer key, long snapshotId, ByteBuffer encryptedKeyMetadata) { + Ciphers.AesGcmDecryptor decryptor = new Ciphers.AesGcmDecryptor(ByteBuffers.toByteArray(key)); + byte[] keyMetadataBytes = ByteBuffers.toByteArray(encryptedKeyMetadata); + byte[] decryptedKeyMetadata = decryptor.decrypt(keyMetadataBytes, snapshotIdAsAAD(snapshotId)); + return ByteBuffer.wrap(decryptedKeyMetadata); + } + + /** + * Encrypts the key metadata for a snapshot. + * + *

Encryption for snapshot key metadata is only available for tables using standard encryption. + * + * @param key unwrapped snapshot key bytes + * @param snapshotId ID of the table snapshot + * @param keyMetadata unencrypted EncryptionKeyMetadata + * @return a Pair of the key ID used to encrypt and the encrypted key metadata + */ + public static ByteBuffer encryptSnapshotKeyMetadata( + ByteBuffer key, long snapshotId, EncryptionKeyMetadata keyMetadata) { + Ciphers.AesGcmEncryptor encryptor = new Ciphers.AesGcmEncryptor(ByteBuffers.toByteArray(key)); + byte[] keyMetadataBytes = ByteBuffers.toByteArray(keyMetadata.buffer()); + byte[] encryptedKeyMetadata = encryptor.encrypt(keyMetadataBytes, snapshotIdAsAAD(snapshotId)); + return ByteBuffer.wrap(encryptedKeyMetadata); + } + + private static byte[] snapshotIdAsAAD(long snapshotId) { + ByteBuffer asBuffer = + ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, snapshotId); + return ByteBuffers.toByteArray(asBuffer); + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java index c2ed9d564d1e..2188378a4e87 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java +++ b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java @@ -27,4 +27,21 @@ public interface NativeEncryptionKeyMetadata extends EncryptionKeyMetadata { /** Additional authentication data as a {@link ByteBuffer} */ ByteBuffer aadPrefix(); + + /** Encrypted file length */ + default Long fileLength() { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement fileLength"); + } + + /** + * Copy this key metadata and set the file length. + * + * @param length length of the encrypted file in bytes + * @return a copy of this key metadata (key and AAD) with the file length + */ + default NativeEncryptionKeyMetadata copyWithLength(long length) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement copyWithLength"); + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 119d2a5f9ae2..3a613ffd3241 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -18,30 +18,76 @@ */ package org.apache.iceberg.encryption; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import java.nio.ByteBuffer; import java.security.SecureRandom; +import java.util.Base64; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.iceberg.TableProperties; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.ByteBuffers; public class StandardEncryptionManager implements EncryptionManager { - private final transient KeyManagementClient kmsClient; + /** + * Default time-out of key encryption keys. Per NIST SP 800-57 P1 R5 section 5.3.6, set to 1 week. + */ + private static final long KEY_ENCRYPTION_KEY_TIMEOUT = TimeUnit.DAYS.toMillis(7); + private final String tableKeyId; private final int dataKeyLength; + // a holder class of metadata that is not available after serialization + private class TransientEncryptionState { + private final KeyManagementClient kmsClient; + private final Map encryptionKeys; + private final LoadingCache unwrappedKeyCache; + private WrappedEncryptionKey currentEncryptionKey; + + private TransientEncryptionState(KeyManagementClient kmsClient) { + this.kmsClient = kmsClient; + this.encryptionKeys = Maps.newLinkedHashMap(); + this.unwrappedKeyCache = + Caffeine.newBuilder() + .expireAfterWrite(1, TimeUnit.HOURS) + .build( + keyId -> kmsClient.unwrapKey(encryptionKeys.get(keyId).wrappedKey(), tableKeyId)); + this.currentEncryptionKey = null; + } + } + + private final transient TransientEncryptionState transientState; + private transient volatile SecureRandom lazyRNG = null; + /** + * @deprecated will be removed in 1.8.0. use {@link #StandardEncryptionManager(String, int, List, + * KeyManagementClient)} instead. + */ + @Deprecated + public StandardEncryptionManager( + String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { + this(tableKeyId, dataKeyLength, ImmutableList.of(), kmsClient); + } + /** * @param tableKeyId table encryption key id * @param dataKeyLength length of data encryption key (16/24/32 bytes) * @param kmsClient Client of KMS used to wrap/unwrap keys in envelope encryption */ - public StandardEncryptionManager( - String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { + StandardEncryptionManager( + String tableKeyId, + int dataKeyLength, + List keys, + KeyManagementClient kmsClient) { Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null"); Preconditions.checkArgument( dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32, @@ -49,8 +95,17 @@ public StandardEncryptionManager( dataKeyLength); Preconditions.checkNotNull(kmsClient, "Invalid KMS client: null"); this.tableKeyId = tableKeyId; - this.kmsClient = kmsClient; + this.transientState = new TransientEncryptionState(kmsClient); this.dataKeyLength = dataKeyLength; + + for (WrappedEncryptionKey key : keys) { + transientState.encryptionKeys.put(key.id(), key); + + if (transientState.currentEncryptionKey == null + || transientState.currentEncryptionKey.timestamp() < key.timestamp()) { + transientState.currentEncryptionKey = key; + } + } } @Override @@ -81,22 +136,75 @@ private SecureRandom workerRNG() { return lazyRNG; } + /** + * @deprecated will be removed in 1.8.0; use {@link #currentSnapshotKeyId()} instead. + */ + @Deprecated public ByteBuffer wrapKey(ByteBuffer secretKey) { - if (kmsClient == null) { + if (transientState == null) { throw new IllegalStateException( "Cannot wrap key after called after serialization (missing KMS client)"); } - return kmsClient.wrapKey(secretKey, tableKeyId); + return transientState.kmsClient.wrapKey(secretKey, tableKeyId); } + /** + * @deprecated will be removed in 1.8.0; use {@link #unwrapKey(String)}} instead. + */ + @Deprecated public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) { - if (kmsClient == null) { - throw new IllegalStateException( - "Cannot wrap key after called after serialization (missing KMS client)"); + throw new UnsupportedOperationException("Use unwrapKey(String) instead"); + } + + public String currentSnapshotKeyId() { + if (transientState == null) { + throw new IllegalStateException("Cannot return the current snapshot key after serialization"); + } + + if (transientState.currentEncryptionKey == null + || transientState.currentEncryptionKey.timestamp() + < System.currentTimeMillis() - KEY_ENCRYPTION_KEY_TIMEOUT) { + createNewEncryptionKey(); + } + + return transientState.currentEncryptionKey.id(); + } + + public ByteBuffer unwrapKey(String keyId) { + if (transientState == null) { + throw new IllegalStateException("Cannot unwrap key after serialization (missing KMS client)"); } - return kmsClient.unwrapKey(wrappedSecretKey, tableKeyId); + return transientState.unwrappedKeyCache.get(keyId); + } + + private ByteBuffer newKey() { + byte[] newKey = new byte[dataKeyLength]; + workerRNG().nextBytes(newKey); + return ByteBuffer.wrap(newKey); + } + + private String newKeyId() { + byte[] idBytes = new byte[6]; + workerRNG().nextBytes(idBytes); + return Base64.getEncoder().encodeToString(idBytes); + } + + private void createNewEncryptionKey() { + if (transientState == null) { + throw new IllegalStateException("Cannot create encryption keys after serialization"); + } + + ByteBuffer unwrapped = newKey(); + ByteBuffer wrapped = transientState.kmsClient.wrapKey(unwrapped, tableKeyId); + WrappedEncryptionKey key = + new WrappedEncryptionKey(newKeyId(), wrapped, System.currentTimeMillis()); + + // update internal tracking + transientState.unwrappedKeyCache.put(key.id(), unwrapped); + transientState.encryptionKeys.put(key.id(), key); + transientState.currentEncryptionKey = key; } private class StandardEncryptedOutputFile implements NativeEncryptionOutputFile { @@ -173,7 +281,8 @@ private AesGcmInputFile decrypted() { new AesGcmInputFile( encryptedInputFile.encryptedInputFile(), ByteBuffers.toByteArray(keyMetadata().encryptionKey()), - ByteBuffers.toByteArray(keyMetadata().aadPrefix())); + ByteBuffers.toByteArray(keyMetadata().aadPrefix()), + keyMetadata().fileLength()); } return lazyDecryptedInputFile; diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java index 98f87c65d95f..6ddea184d8c4 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java @@ -36,7 +36,8 @@ class StandardKeyMetadata implements NativeEncryptionKeyMetadata, IndexedRecord private static final Schema SCHEMA_V1 = new Schema( required(0, "encryption_key", Types.BinaryType.get()), - optional(1, "aad_prefix", Types.BinaryType.get())); + optional(1, "aad_prefix", Types.BinaryType.get()), + optional(2, "file_length", Types.LongType.get())); private static final org.apache.avro.Schema AVRO_SCHEMA_V1 = AvroSchemaUtil.convert(SCHEMA_V1, StandardKeyMetadata.class.getCanonicalName()); @@ -49,20 +50,31 @@ class StandardKeyMetadata implements NativeEncryptionKeyMetadata, IndexedRecord private ByteBuffer encryptionKey; private ByteBuffer aadPrefix; - private org.apache.avro.Schema avroSchema; + private Long fileLength; /** Used by Avro reflection to instantiate this class * */ StandardKeyMetadata() {} StandardKeyMetadata(byte[] key, byte[] aad) { + this(key, aad, null); + } + + StandardKeyMetadata(byte[] key, byte[] aad, Long fileLength) { this.encryptionKey = ByteBuffer.wrap(key); this.aadPrefix = ByteBuffer.wrap(aad); + this.fileLength = fileLength; } - private StandardKeyMetadata(ByteBuffer encryptionKey, ByteBuffer aadPrefix) { - this.encryptionKey = encryptionKey; - this.aadPrefix = aadPrefix; - this.avroSchema = AVRO_SCHEMA_V1; + /** + * Copy constructor. + * + * @param toCopy a StandardKeymetadata to copy + * @param fileLength file length that overrides toCopy if not null + */ + private StandardKeyMetadata(StandardKeyMetadata toCopy, Long fileLength) { + this.encryptionKey = toCopy.encryptionKey; + this.aadPrefix = toCopy.aadPrefix; + this.fileLength = fileLength != null ? fileLength : toCopy.fileLength; } static Map supportedSchemaVersions() { @@ -83,6 +95,11 @@ public ByteBuffer aadPrefix() { return aadPrefix; } + @Override + public Long fileLength() { + return fileLength; + } + static StandardKeyMetadata castOrParse(EncryptionKeyMetadata keyMetadata) { if (keyMetadata instanceof StandardKeyMetadata) { return (StandardKeyMetadata) keyMetadata; @@ -116,7 +133,12 @@ public ByteBuffer buffer() { @Override public EncryptionKeyMetadata copy() { - return new StandardKeyMetadata(encryptionKey(), aadPrefix()); + return new StandardKeyMetadata(this, null); + } + + @Override + public NativeEncryptionKeyMetadata copyWithLength(long length) { + return new StandardKeyMetadata(this, length); } @Override @@ -128,6 +150,9 @@ public void put(int i, Object v) { case 1: this.aadPrefix = (ByteBuffer) v; return; + case 2: + this.fileLength = (Long) v; + return; default: // ignore the object, it must be from a newer version of the format } @@ -140,6 +165,8 @@ public Object get(int i) { return encryptionKey; case 1: return aadPrefix; + case 2: + return fileLength; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); } @@ -147,6 +174,6 @@ public Object get(int i) { @Override public org.apache.avro.Schema getSchema() { - return avroSchema; + return AVRO_SCHEMA_V1; } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/WrappedEncryptionKey.java b/core/src/main/java/org/apache/iceberg/encryption/WrappedEncryptionKey.java new file mode 100644 index 000000000000..c3824f66e9de --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/WrappedEncryptionKey.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import java.io.Serializable; +import java.nio.ByteBuffer; + +/** + * This class keeps a wrapped (KMS-encrypted) version of the keys used to encrypt manifest list key + * metadata. These keys have an ID and a creation timestamp. + */ +public class WrappedEncryptionKey implements Serializable { + private final String keyID; + private final ByteBuffer wrappedKey; + private final long timestamp; + + public WrappedEncryptionKey(String keyID, ByteBuffer wrappedKey, long timestamp) { + this.keyID = keyID; + this.wrappedKey = wrappedKey; + this.timestamp = timestamp; + } + + public String id() { + return keyID; + } + + public ByteBuffer wrappedKey() { + return wrappedKey; + } + + public long timestamp() { + return timestamp; + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java b/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java index 13e8985cdb56..7c32d8a6245d 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java @@ -85,7 +85,7 @@ public class TestManifestEncryption { private static final DataFile DATA_FILE = new GenericDataFile( - 0, + SPEC.specId(), PATH, FORMAT, PARTITION, diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java b/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java new file mode 100644 index 000000000000..90feed4789f1 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.avro.InvalidAvroMagicException; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionTestHelpers; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.inmemory.InMemoryOutputFile; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class TestManifestListEncryption { + private static final String PATH = "s3://bucket/table/m1.avro"; + private static final long LENGTH = 1024L; + private static final int SPEC_ID = 1; + private static final long SEQ_NUM = 34L; + private static final long MIN_SEQ_NUM = 10L; + private static final long SNAPSHOT_ID = 987134631982734L; + private static final int ADDED_FILES = 2; + private static final long ADDED_ROWS = 5292L; + private static final int EXISTING_FILES = 343; + private static final long EXISTING_ROWS = 857273L; + private static final int DELETED_FILES = 1; + private static final long DELETED_ROWS = 22910L; + + private static final ByteBuffer FIRST_SUMMARY_LOWER_BOUND = + Conversions.toByteBuffer(Types.IntegerType.get(), 10); + private static final ByteBuffer FIRST_SUMMARY_UPPER_BOUND = + Conversions.toByteBuffer(Types.IntegerType.get(), 100); + private static final ByteBuffer SECOND_SUMMARY_LOWER_BOUND = + Conversions.toByteBuffer(Types.IntegerType.get(), 20); + private static final ByteBuffer SECOND_SUMMARY_UPPER_BOUND = + Conversions.toByteBuffer(Types.IntegerType.get(), 200); + + private static final List PARTITION_SUMMARIES = + Lists.newArrayList( + new GenericPartitionFieldSummary( + false, FIRST_SUMMARY_LOWER_BOUND, FIRST_SUMMARY_UPPER_BOUND), + new GenericPartitionFieldSummary( + true, false, SECOND_SUMMARY_LOWER_BOUND, SECOND_SUMMARY_UPPER_BOUND)); + private static final ByteBuffer MANIFEST_KEY_METADATA = ByteBuffer.allocate(100); + + private static final ManifestFile TEST_MANIFEST = + new GenericManifestFile( + PATH, + LENGTH, + SPEC_ID, + ManifestContent.DATA, + SEQ_NUM, + MIN_SEQ_NUM, + SNAPSHOT_ID, + ADDED_FILES, + ADDED_ROWS, + EXISTING_FILES, + EXISTING_ROWS, + DELETED_FILES, + DELETED_ROWS, + PARTITION_SUMMARIES, + MANIFEST_KEY_METADATA); + + private static final EncryptionManager ENCRYPTION_MANAGER = + EncryptionTestHelpers.createEncryptionManager(); + + @Test + public void testV2Write() throws IOException { + ManifestFile manifest = writeAndReadEncryptedManifestList(); + + assertThat(manifest.path()).isEqualTo(PATH); + assertThat(manifest.length()).isEqualTo(LENGTH); + assertThat(manifest.partitionSpecId()).isEqualTo(SPEC_ID); + assertThat(manifest.content()).isEqualTo(ManifestContent.DATA); + assertThat(manifest.sequenceNumber()).isEqualTo(SEQ_NUM); + assertThat(manifest.minSequenceNumber()).isEqualTo(MIN_SEQ_NUM); + assertThat((long) manifest.snapshotId()).isEqualTo(SNAPSHOT_ID); + assertThat((int) manifest.addedFilesCount()).isEqualTo(ADDED_FILES); + assertThat((long) manifest.addedRowsCount()).isEqualTo(ADDED_ROWS); + assertThat((int) manifest.existingFilesCount()).isEqualTo(EXISTING_FILES); + assertThat((long) manifest.existingRowsCount()).isEqualTo(EXISTING_ROWS); + assertThat((int) manifest.deletedFilesCount()).isEqualTo(DELETED_FILES); + assertThat((long) manifest.deletedRowsCount()).isEqualTo(DELETED_ROWS); + assertThat(manifest.content()).isEqualTo(ManifestContent.DATA); + } + + private ManifestFile writeAndReadEncryptedManifestList() throws IOException { + OutputFile rawOutput = new InMemoryOutputFile(); + EncryptedOutputFile encryptedOutput = ENCRYPTION_MANAGER.encrypt(rawOutput); + EncryptionKeyMetadata keyMetadata = encryptedOutput.keyMetadata(); + + try (FileAppender writer = + ManifestLists.write( + 2, encryptedOutput.encryptingOutputFile(), SNAPSHOT_ID, SNAPSHOT_ID - 1, SEQ_NUM)) { + writer.add(TEST_MANIFEST); + } + + InputFile rawInput = rawOutput.toInputFile(); + + // First try to read without decryption + assertThatThrownBy(() -> ManifestLists.read(rawInput)) + .isInstanceOf(RuntimeIOException.class) + .hasMessageContaining("Failed to open file") + .hasCauseInstanceOf(InvalidAvroMagicException.class); + + EncryptedInputFile encryptedManifestListInput = + EncryptedFiles.encryptedInput(rawInput, keyMetadata); + InputFile manifestListInput = ENCRYPTION_MANAGER.decrypt(encryptedManifestListInput); + + List manifests = ManifestLists.read(manifestListInput); + assertThat(manifests.size()).isEqualTo(1); + return manifests.get(0); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index bcde7a7f31d3..dcfb36b1111c 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -54,6 +54,7 @@ import java.util.stream.Stream; import org.apache.iceberg.TableMetadata.MetadataLogEntry; import org.apache.iceberg.TableMetadata.SnapshotLogEntry; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -1605,7 +1606,13 @@ private String createManifestListWithManifestFile( manifestList.deleteOnExit(); try (ManifestListWriter writer = - ManifestLists.write(1, Files.localOutput(manifestList), snapshotId, parentSnapshotId, 0)) { + ManifestLists.write( + 1, + PlaintextEncryptionManager.instance(), + Files.localOutput(manifestList), + snapshotId, + parentSnapshotId, + 0)) { writer.addAll( ImmutableList.of(new GenericManifestFile(localInput(manifestFile), SPEC_5.specId()))); } diff --git a/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java index aa49e1c40fe2..cdc8c2e1515b 100644 --- a/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java @@ -31,11 +31,10 @@ public static EncryptionManager createEncryptionManager() { Map catalogProperties = Maps.newHashMap(); catalogProperties.put( CatalogProperties.ENCRYPTION_KMS_IMPL, UnitestKMS.class.getCanonicalName()); - Map tableProperties = Maps.newHashMap(); - tableProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, UnitestKMS.MASTER_KEY_NAME1); - tableProperties.put(TableProperties.FORMAT_VERSION, "2"); return EncryptionUtil.createEncryptionManager( - tableProperties, EncryptionUtil.createKmsClient(catalogProperties)); + UnitestKMS.MASTER_KEY_NAME1, + TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT, + EncryptionUtil.createKmsClient(catalogProperties)); } } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java index 79f30e109f7d..22c3f8a139c7 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java @@ -36,6 +36,7 @@ import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; @@ -196,6 +197,9 @@ private static FileIO createMockFileIO(FileIO wrapped) { .thenAnswer( invocation -> wrapped.newInputFile(invocation.getArgument(0), invocation.getArgument(1))); + Mockito.when(mockIO.newInputFile(Mockito.any(ManifestListFile.class))) + .thenAnswer( + invocation -> wrapped.newInputFile((ManifestListFile) invocation.getArgument(0))); Mockito.when(mockIO.newInputFile(Mockito.any(ManifestFile.class))) .thenAnswer(invocation -> wrapped.newInputFile((ManifestFile) invocation.getArgument(0))); Mockito.when(mockIO.newInputFile(Mockito.any(DataFile.class)))