Skip to content

Commit

Permalink
Add PlaintextEncryptionManager and add encryption support in Spark (N…
Browse files Browse the repository at this point in the history
  • Loading branch information
yifeih authored and rdblue committed Mar 5, 2019
1 parent 4cbc4f0 commit 8d92f92
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 40 deletions.
5 changes: 1 addition & 4 deletions api/src/main/java/com/netflix/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,7 @@ default AppendFiles newFastAppend() {
* @return an {@link com.netflix.iceberg.encryption.EncryptionManager} to encrypt and decrypt
* data files.
*/
default EncryptionManager encryption() {
// TODO coming soon
throw new UnsupportedOperationException("Encryption is a work in progress.");
}
EncryptionManager encryption();

/**
* @return a {@link LocationProvider} to provide locations for new data files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,26 @@
*/
public interface EncryptionKeyMetadata {

EncryptionKeyMetadata EMPTY = new EncryptionKeyMetadata() {
@Override
public ByteBuffer buffer() {
return null;
}

@Override
public EncryptionKeyMetadata copy() {
return this;
}
};

static EncryptionKeyMetadata empty() {
return EMPTY;
}

/**
* Opaque blob representing metadata about a file's encryption key.
*/
ByteBuffer keyMetadata();
ByteBuffer buffer();

EncryptionKeyMetadata copy();
}
6 changes: 6 additions & 0 deletions core/src/main/java/com/netflix/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package com.netflix.iceberg;

import com.netflix.iceberg.encryption.EncryptionManager;
import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.io.LocationProvider;
import java.util.Map;
Expand Down Expand Up @@ -147,6 +148,11 @@ public FileIO io() {
return operations().io();
}

@Override
public EncryptionManager encryption() {
return operations().encryption();
}

@Override
public LocationProvider locationProvider() {
return operations().locationProvider();
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/com/netflix/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.netflix.iceberg.encryption.EncryptionManager;
import com.netflix.iceberg.exceptions.CommitFailedException;
import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.io.LocationProvider;
Expand Down Expand Up @@ -283,6 +284,11 @@ public FileIO io() {
return ops.io();
}

@Override
public EncryptionManager encryption() {
return ops.encryption();
}

@Override
public String metadataFileLocation(String fileName) {
return ops.metadataFileLocation(fileName);
Expand Down Expand Up @@ -399,6 +405,11 @@ public FileIO io() {
return transactionOps.io();
}

@Override
public EncryptionManager encryption() {
return transactionOps.encryption();
}

@Override
public LocationProvider locationProvider() {
return transactionOps.locationProvider();
Expand Down
32 changes: 31 additions & 1 deletion core/src/main/java/com/netflix/iceberg/DataFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package com.netflix.iceberg;

import com.google.common.base.Preconditions;
import com.netflix.iceberg.encryption.EncryptedInputFile;
import com.netflix.iceberg.encryption.EncryptedOutputFile;
import com.netflix.iceberg.encryption.EncryptionKeyMetadata;
import com.netflix.iceberg.hadoop.HadoopInputFile;
import com.netflix.iceberg.io.InputFile;
Expand Down Expand Up @@ -136,13 +138,35 @@ public static DataFile fromInputFile(InputFile file, PartitionData partition, Me
location, format, partition, file.getLength(), DEFAULT_BLOCK_SIZE, metrics);
}

public static DataFile fromEncryptedOutputFile(EncryptedOutputFile encryptedFile, PartitionData partition,
Metrics metrics) {
EncryptionKeyMetadata keyMetadata = encryptedFile.keyMetadata();
InputFile file = encryptedFile.encryptingOutputFile().toInputFile();
if (encryptedFile instanceof HadoopInputFile) {
return fromStat(((HadoopInputFile) file).getStat(), partition, metrics, keyMetadata);
}

String location = file.location();
FileFormat format = FileFormat.fromFileName(location);
return new GenericDataFile(
location, format, partition, file.getLength(), DEFAULT_BLOCK_SIZE, metrics, keyMetadata.buffer());
}

public static DataFile fromStat(FileStatus stat, PartitionData partition, Metrics metrics) {
String location = stat.getPath().toString();
FileFormat format = FileFormat.fromFileName(location);
return new GenericDataFile(
location, format, partition, stat.getLen(), stat.getBlockSize(), metrics);
}

public static DataFile fromStat(FileStatus stat, PartitionData partition, Metrics metrics,
EncryptionKeyMetadata keyMetadata) {
String location = stat.getPath().toString();
FileFormat format = FileFormat.fromFileName(location);
return new GenericDataFile(
location, format, partition, stat.getLen(), stat.getBlockSize(), metrics, keyMetadata.buffer());
}

public static DataFile fromParquetInputFile(InputFile file,
PartitionData partition,
Metrics metrics) {
Expand Down Expand Up @@ -253,6 +277,12 @@ public Builder withInputFile(InputFile file) {
return this;
}

public Builder withEncryptedOutputFile(EncryptedOutputFile encryptedFile) {
withInputFile(encryptedFile.encryptingOutputFile().toInputFile());
withEncryptionKeyMetadata(encryptedFile.keyMetadata());
return this;
}

public Builder withPath(String filePath) {
this.filePath = filePath;
return this;
Expand Down Expand Up @@ -312,7 +342,7 @@ public Builder withEncryptionKeyMetadata(ByteBuffer keyMetadata) {
}

public Builder withEncryptionKeyMetadata(EncryptionKeyMetadata keyMetadata) {
return withEncryptionKeyMetadata(keyMetadata.keyMetadata());
return withEncryptionKeyMetadata(keyMetadata.buffer());
}

public Builder withEncryptionKeyMetadata(byte[] keyMetadata) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/com/netflix/iceberg/TableOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package com.netflix.iceberg;

import com.netflix.iceberg.encryption.EncryptionManager;
import com.netflix.iceberg.encryption.PlaintextEncryptionManager;
import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.io.LocationProvider;
import java.util.UUID;
Expand Down Expand Up @@ -67,8 +68,7 @@ public interface TableOperations {
* data files.
*/
default EncryptionManager encryption() {
// TODO coming soon
throw new UnsupportedOperationException("Encryption is a work in progress.");
return new PlaintextEncryptionManager();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,29 @@
import java.nio.ByteBuffer;

class BaseEncryptionKeyMetadata implements EncryptionKeyMetadata {

public static EncryptionKeyMetadata fromKeyMetadata(ByteBuffer keyMetadata) {
if (keyMetadata == null) {
return EncryptionKeyMetadata.empty();
}
return new BaseEncryptionKeyMetadata(keyMetadata);
}

public static EncryptionKeyMetadata fromByteArray(byte[] keyMetadata) {
if (keyMetadata == null) {
return EncryptionKeyMetadata.empty();
}
return fromKeyMetadata(ByteBuffer.wrap(keyMetadata));
}

private final ByteBuffer keyMetadata;

BaseEncryptionKeyMetadata(ByteBuffer keyMetadata) {
private BaseEncryptionKeyMetadata(ByteBuffer keyMetadata) {
this.keyMetadata = keyMetadata;
}

@Override
public ByteBuffer keyMetadata() {
public ByteBuffer buffer() {
return keyMetadata;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ public static EncryptedInputFile encryptedInput(

public static EncryptedInputFile encryptedInput(
InputFile encryptedInputFile, ByteBuffer keyMetadata) {
return encryptedInput(encryptedInputFile, new BaseEncryptionKeyMetadata(keyMetadata));
return encryptedInput(encryptedInputFile, BaseEncryptionKeyMetadata.fromKeyMetadata(keyMetadata));
}

public static EncryptedInputFile encryptedInput(InputFile encryptedInputFile, byte[] keyMetadata) {
return encryptedInput(encryptedInputFile, ByteBuffer.wrap(keyMetadata));
return encryptedInput(encryptedInputFile, BaseEncryptionKeyMetadata.fromByteArray(keyMetadata));
}

public static EncryptedOutputFile encryptedOutput(
Expand All @@ -47,11 +47,11 @@ public static EncryptedOutputFile encryptedOutput(

public static EncryptedOutputFile encryptedOutput(
OutputFile encryptingOutputFile, ByteBuffer keyMetadata) {
return encryptedOutput(encryptingOutputFile, new BaseEncryptionKeyMetadata(keyMetadata));
return encryptedOutput(encryptingOutputFile, BaseEncryptionKeyMetadata.fromKeyMetadata(keyMetadata));
}

public static EncryptedOutputFile encryptedOutput(OutputFile encryptedOutputFile, byte[] keyMetadata) {
return encryptedOutput(encryptedOutputFile, ByteBuffer.wrap(keyMetadata));
return encryptedOutput(encryptedOutputFile, BaseEncryptionKeyMetadata.fromByteArray(keyMetadata));
}

private EncryptedFiles() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
public class EncryptionKeyMetadatas {

public static EncryptionKeyMetadata of(ByteBuffer keyMetadata) {
return new BaseEncryptionKeyMetadata(keyMetadata);
return BaseEncryptionKeyMetadata.fromKeyMetadata(keyMetadata);
}

public static EncryptionKeyMetadata of(byte[] keyMetadata) {
return of(ByteBuffer.wrap(keyMetadata));
return BaseEncryptionKeyMetadata.fromByteArray(keyMetadata);
}

private EncryptionKeyMetadatas() {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.netflix.iceberg.encryption;

import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;

public class PlaintextEncryptionManager implements EncryptionManager {
private static final Logger LOG = LoggerFactory.getLogger(PlaintextEncryptionManager.class);

@Override
public InputFile decrypt(EncryptedInputFile encrypted) {
if (encrypted.keyMetadata().buffer() != null) {
LOG.warn("File encryption key metadata is present, but currently using PlaintextEncryptionManager.");
}
return encrypted.encryptedInputFile();
}

@Override
public EncryptedOutputFile encrypt(OutputFile rawOutput) {
return EncryptedFiles.encryptedOutput(rawOutput, (ByteBuffer) null);
}
}
3 changes: 3 additions & 0 deletions core/src/main/java/com/netflix/iceberg/util/ByteBuffers.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public static byte[] toByteArray(ByteBuffer buffer) {
}

public static ByteBuffer copy(ByteBuffer buffer) {
if (buffer == null) {
return null;
}
int size = buffer.remaining();
byte[] copyArray = new byte[size];
ByteBuffer readerBuffer = buffer.asReadOnlyBuffer();
Expand Down

0 comments on commit 8d92f92

Please sign in to comment.