Skip to content

Commit

Permalink
update the patch
Browse files Browse the repository at this point in the history
  • Loading branch information
ggershinsky committed Mar 27, 2024
1 parent 0e209f7 commit a2d7b10
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 91 deletions.
35 changes: 17 additions & 18 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;

/**
Expand Down Expand Up @@ -73,11 +72,6 @@ public interface Snapshot extends Serializable {
*/
List<ManifestFile> allManifests(FileIO io);

default List<ManifestFile> allManifests(FileIO fileIO, EncryptionManager encryption) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement allManifests with encryption");
}

/**
* Return a {@link ManifestFile} for each data manifest in this snapshot.
*
Expand All @@ -86,19 +80,13 @@ default List<ManifestFile> allManifests(FileIO fileIO, EncryptionManager encrypt
*/
List<ManifestFile> dataManifests(FileIO io);

default List<ManifestFile> dataManifests(FileIO io, EncryptionManager encryption) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement dataManifests with encryption");
}

/**
* Return a {@link ManifestFile} for each delete manifest in this snapshot.
*
* @param io a {@link FileIO} instance used for reading files from storage
* @return a list of ManifestFile
*/
List<ManifestFile> deleteManifests(FileIO io);
// TODO add encryption manager

/**
* Return the name of the {@link DataOperations data operation} that produced this snapshot.
Expand Down Expand Up @@ -126,7 +114,6 @@ default List<ManifestFile> dataManifests(FileIO io, EncryptionManager encryption
* @return all data files added to the table in this snapshot.
*/
Iterable<DataFile> addedDataFiles(FileIO io);
// TODO add encryption manager

/**
* Return all data files removed from the table in this snapshot.
Expand All @@ -139,7 +126,6 @@ default List<ManifestFile> dataManifests(FileIO io, EncryptionManager encryption
* @return all data files removed from the table in this snapshot.
*/
Iterable<DataFile> removedDataFiles(FileIO io);
// TODO add encryption manager

/**
* Return all delete files added to the table in this snapshot.
Expand All @@ -154,7 +140,6 @@ default Iterable<DeleteFile> addedDeleteFiles(FileIO io) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement addedDeleteFiles");
}
// TODO add encryption manager

/**
* Return all delete files removed from the table in this snapshot.
Expand All @@ -169,7 +154,6 @@ default Iterable<DeleteFile> removedDeleteFiles(FileIO io) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement removedDeleteFiles");
}
// TODO add encryption manager

/**
* Return the location of this snapshot's manifest list, or null if it is not separate.
Expand All @@ -178,6 +162,15 @@ default Iterable<DeleteFile> removedDeleteFiles(FileIO io) {
*/
String manifestListLocation();

/**
* Return the size of this snapshot's manifest list. For encrypted tables, a verified plaintext
* size must be used.
*/
default long manifestListSize() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement manifestListSize");
}

/**
* Return the id of the schema used when this snapshot was created, or null if this information is
* not available.
Expand All @@ -188,7 +181,13 @@ default Integer schemaId() {
return null;
}

default String manifestKeyMetadata() {
return null;
/**
* Key metadata for encrypted manifest lists.
*
* @return base64-encoded key metadata for the manifest list file encryption key
*/
default String manifestListKeyMetadata() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement manifestKeyMetadata");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ public InputFile newDecryptingInputFile(String path, ByteBuffer buffer) {
}

public InputFile newDecryptingInputFile(String path, long length, ByteBuffer buffer) {
// TODO: is the length correct for the encrypted file? It may be the length of the plaintext
// stream
return em.decrypt(wrap(io.newInputFile(path, length), buffer));
}

Expand Down Expand Up @@ -157,7 +155,7 @@ private static SimpleEncryptedInputFile wrap(InputFile encryptedInputFile, ByteB
}

private static EncryptionKeyMetadata toKeyMetadata(ByteBuffer buffer) {
return buffer != null ? new SimpleKeyMetadata(buffer) : EmptyKeyMetadata.get();
return buffer != null ? new SimpleKeyMetadata(buffer) : EncryptionKeyMetadata.empty();
}

private static class SimpleEncryptedInputFile implements EncryptedInputFile {
Expand Down Expand Up @@ -198,22 +196,4 @@ public EncryptionKeyMetadata copy() {
return new SimpleKeyMetadata(metadataBuffer.duplicate());
}
}

private static class EmptyKeyMetadata implements EncryptionKeyMetadata {
private static final EmptyKeyMetadata INSTANCE = new EmptyKeyMetadata();

private static EmptyKeyMetadata get() {
return INSTANCE;
}

@Override
public ByteBuffer buffer() {
return null;
}

@Override
public EncryptionKeyMetadata copy() {
return this;
}
}
}
77 changes: 38 additions & 39 deletions core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@
import java.util.Base64;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedInputFile;
import org.apache.iceberg.encryption.EncryptingFileIO;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata;
import org.apache.iceberg.encryption.StandardEncryptionManager;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
Expand All @@ -48,6 +47,7 @@ class BaseSnapshot implements Snapshot {
private final long sequenceNumber;
private final long timestampMillis;
private final String manifestListLocation;
private final long manifestListSize;
private final String operation;
private final Map<String, String> summary;
private final Integer schemaId;
Expand All @@ -63,6 +63,7 @@ class BaseSnapshot implements Snapshot {
private transient List<DeleteFile> addedDeleteFiles = null;
private transient List<DeleteFile> removedDeleteFiles = null;

/** Tests only */
BaseSnapshot(
long sequenceNumber,
long snapshotId,
Expand All @@ -81,6 +82,7 @@ class BaseSnapshot implements Snapshot {
summary,
schemaId,
manifestList,
-1L,
null);
}

Expand All @@ -92,7 +94,8 @@ class BaseSnapshot implements Snapshot {
String operation,
Map<String, String> summary,
Integer schemaId,
String manifestList,
String manifestListLocation,
long manifestListSize,
String manifestListKeyMetadata) {
this.sequenceNumber = sequenceNumber;
this.snapshotId = snapshotId;
Expand All @@ -101,7 +104,8 @@ class BaseSnapshot implements Snapshot {
this.operation = operation;
this.summary = summary;
this.schemaId = schemaId;
this.manifestListLocation = manifestList;
this.manifestListLocation = manifestListLocation;
this.manifestListSize = manifestListSize;
this.v1ManifestLocations = null;
this.manifestListKeyMetadata = manifestListKeyMetadata;
}
Expand All @@ -123,6 +127,7 @@ class BaseSnapshot implements Snapshot {
this.summary = summary;
this.schemaId = schemaId;
this.manifestListLocation = null;
this.manifestListSize = -1L;
this.v1ManifestLocations = v1ManifestLocations;
this.manifestListKeyMetadata = null;
}
Expand Down Expand Up @@ -163,15 +168,16 @@ public Integer schemaId() {
}

@Override
public String manifestKeyMetadata() {
public String manifestListKeyMetadata() {
return manifestListKeyMetadata;
}

private void cacheManifests(FileIO fileIO) {
cacheManifests(fileIO, null); // TODO remove
@Override
public long manifestListSize() {
return manifestListSize;
}

private void cacheManifests(FileIO fileIO, EncryptionManager encryption) {
private void cacheManifests(FileIO fileIO) {
if (fileIO == null) {
throw new IllegalArgumentException("Cannot cache changes: FileIO is null");
}
Expand All @@ -186,29 +192,38 @@ private void cacheManifests(FileIO fileIO, EncryptionManager encryption) {

if (allManifests == null) {
// if manifests isn't set, then the snapshotFile is set and should be read to get the list

// TODO GG
InputFile manifestListFile = fileIO.newInputFile(manifestListLocation);
if (encryption != null && manifestListKeyMetadata != null) {
if (manifestListKeyMetadata != null) { // encrypted manifest list file
Preconditions.checkArgument(
fileIO instanceof EncryptingFileIO,
"No encryption in FileIO class " + fileIO.getClass());
EncryptingFileIO encryptingFileIO = (EncryptingFileIO) fileIO;
Preconditions.checkArgument(
encryption instanceof StandardEncryptionManager,
encryptingFileIO.encryptionManager() instanceof StandardEncryptionManager,
"Encryption manager for encrypted manifest list files can currently only be an instance of "
+ StandardEncryptionManager.class);
+ StandardEncryptionManager.class
+ ". Not "
+ encryptingFileIO.encryptionManager().getClass());
StandardEncryptionManager standardEncryptionManager =
(StandardEncryptionManager) encryptingFileIO.encryptionManager();

ByteBuffer keyMetadataBytes =
ByteBuffer.wrap(Base64.getDecoder().decode(manifestListKeyMetadata));
ByteBuffer unwrappedManfestListKey = null;

// Unwrap manifest list key
unwrappedManfestListKey =
((StandardEncryptionManager) encryption)
.unwrapKey(EncryptionUtil.fileEncryptionKey(keyMetadataBytes));
NativeEncryptionKeyMetadata keyMetadata = EncryptionUtil.parseKeyMetadata(keyMetadataBytes);
ByteBuffer unwrappedManfestListKey =
standardEncryptionManager.unwrapKey(keyMetadata.encryptionKey());

EncryptionKeyMetadata keyMetadata =
EncryptionUtil.createKeyMetadata(
unwrappedManfestListKey, EncryptionUtil.aadPrefix(keyMetadataBytes));
EncryptionKeyMetadata unwrappedKeyMetadata =
EncryptionUtil.createKeyMetadata(unwrappedManfestListKey, keyMetadata.aadPrefix());

EncryptedInputFile encryptedInputFile =
EncryptedFiles.encryptedInput(manifestListFile, keyMetadata);
manifestListFile = encryption.decrypt(encryptedInputFile);
manifestListFile =
encryptingFileIO.newDecryptingInputFile(
manifestListLocation, manifestListSize, unwrappedKeyMetadata.buffer());
}

this.allManifests = ManifestLists.read(manifestListFile);
}

Expand All @@ -232,14 +247,6 @@ public List<ManifestFile> allManifests(FileIO fileIO) {
return allManifests;
}

@Override
public List<ManifestFile> allManifests(FileIO fileIO, EncryptionManager encryption) {
if (allManifests == null) {
cacheManifests(fileIO, encryption);
}
return allManifests;
}

@Override
public List<ManifestFile> dataManifests(FileIO fileIO) {
if (dataManifests == null) {
Expand All @@ -248,14 +255,6 @@ public List<ManifestFile> dataManifests(FileIO fileIO) {
return dataManifests;
}

@Override
public List<ManifestFile> dataManifests(FileIO fileIO, EncryptionManager encryption) {
if (dataManifests == null) {
cacheManifests(fileIO, encryption);
}
return dataManifests;
}

@Override
public List<ManifestFile> deleteManifests(FileIO fileIO) {
if (deleteManifests == null) {
Expand Down
6 changes: 2 additions & 4 deletions core/src/main/java/org/apache/iceberg/DataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -66,9 +65,8 @@ public CloseableIterable<FileScanTask> doPlanFiles() {
Snapshot snapshot = snapshot();

FileIO io = table().io();
EncryptionManager encryption = table().encryption();
List<ManifestFile> dataManifests = snapshot.dataManifests(io, encryption);
List<ManifestFile> deleteManifests = snapshot.deleteManifests(io); // TODO add encryption
List<ManifestFile> dataManifests = snapshot.dataManifests(io);
List<ManifestFile> deleteManifests = snapshot.deleteManifests(io);
scanMetrics().totalDataManifests().increment((long) dataManifests.size());
scanMetrics().totalDeleteManifests().increment((long) deleteManifests.size());
ManifestGroup manifestGroup =
Expand Down
4 changes: 1 addition & 3 deletions core/src/main/java/org/apache/iceberg/ManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.List;
import java.util.Map;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -78,7 +77,6 @@ MetadataTableType metadataTableType() {

protected DataTask task(TableScan scan) {
FileIO io = table().io();
EncryptionManager encryption = table().encryption();
String location = scan.snapshot().manifestListLocation();
Map<Integer, PartitionSpec> specs = Maps.newHashMap(table().specs());

Expand All @@ -87,7 +85,7 @@ protected DataTask task(TableScan scan) {
location != null ? location : table().operations().current().metadataFileLocation()),
schema(),
scan.schema(),
scan.snapshot().allManifests(io, encryption),
scan.snapshot().allManifests(io),
manifest -> {
PartitionSpec spec = specs.get(manifest.partitionSpecId());
return ManifestsTable.manifestFileToRow(spec, manifest);
Expand Down
Loading

0 comments on commit a2d7b10

Please sign in to comment.