Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Key metadata in Avro format #6450

Merged
merged 15 commits into from Jun 9, 2023
Expand Up @@ -30,13 +30,17 @@
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.data.avro.DecoderResolver;

class GenericAvroReader<T> implements DatumReader<T>, SupportsRowPosition {
public class GenericAvroReader<T> implements DatumReader<T>, SupportsRowPosition {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note about this to myself for later...

This needs to use GenericAvroReader for a couple reasons. First, GenericAvroReader is the only one that supports creating records with a specific class. We could extend support so that DataReader can create StructLike instances, but that hits the second reason: using Iceberg generics will use different representations for some types, like using LocalDate instead of int for date type. The representation differences don't affect the KeyMetadata class right now, but it's still more correct to use the internal representation.


private final Schema readSchema;
private ClassLoader loader = Thread.currentThread().getContextClassLoader();
private Schema fileSchema = null;
private ValueReader<T> reader = null;

public static <D> GenericAvroReader<D> create(Schema schema) {
return new GenericAvroReader<>(schema);
}

GenericAvroReader(Schema readSchema) {
this.readSchema = readSchema;
}
Expand Down
Expand Up @@ -28,9 +28,13 @@
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class GenericAvroWriter<T> implements MetricsAwareDatumWriter<T> {
public class GenericAvroWriter<T> implements MetricsAwareDatumWriter<T> {
private ValueWriter<T> writer = null;

public static <D> GenericAvroWriter<D> create(Schema schema) {
return new GenericAvroWriter<>(schema);
}

GenericAvroWriter(Schema schema) {
setSchema(schema);
}
Expand Down
Expand Up @@ -20,22 +20,18 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Map;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaNormalization;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.message.BadHeaderException;
import org.apache.avro.message.MessageDecoder;
import org.apache.avro.message.MissingSchemaException;
import org.apache.avro.message.SchemaStore;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.ProjectionDatumReader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;

public class IcebergDecoder<D> extends MessageDecoder.BaseDecoder<D> {
Expand Down Expand Up @@ -106,7 +102,10 @@ public void addSchema(org.apache.iceberg.Schema writeSchema) {

private void addSchema(Schema writeSchema) {
long fp = SchemaNormalization.parsingFingerprint64(writeSchema);
decoders.put(fp, new RawDecoder<>(readSchema, writeSchema));
RawDecoder decoder =
new RawDecoder<>(
readSchema, avroSchema -> DataReader.create(readSchema, avroSchema), writeSchema);
decoders.put(fp, decoder);
}

private RawDecoder<D> getDecoder(long fp) {
Expand Down Expand Up @@ -144,44 +143,10 @@ public D decode(InputStream stream, D reuse) throws IOException {

RawDecoder<D> decoder = getDecoder(FP_BUFFER.get().getLong(2));

return decoder.decode(stream, reuse);
}

private static class RawDecoder<D> extends MessageDecoder.BaseDecoder<D> {
private static final ThreadLocal<BinaryDecoder> DECODER = new ThreadLocal<>();

private final DatumReader<D> reader;

/**
* Creates a new {@link MessageDecoder} that constructs datum instances described by the {@link
* Schema readSchema}.
*
* <p>The {@code readSchema} is used for the expected schema and the {@code writeSchema} is the
* schema used to decode buffers. The {@code writeSchema} must be the schema that was used to
* encode all buffers decoded by this class.
*
* @param readSchema the schema used to construct datum instances
* @param writeSchema the schema used to decode buffers
*/
private RawDecoder(org.apache.iceberg.Schema readSchema, Schema writeSchema) {
this.reader =
new ProjectionDatumReader<>(
avroSchema -> DataReader.create(readSchema, avroSchema),
readSchema,
ImmutableMap.of(),
null);
this.reader.setSchema(writeSchema);
}

@Override
public D decode(InputStream stream, D reuse) {
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(stream, DECODER.get());
DECODER.set(decoder);
try {
return reader.read(reuse, decoder);
} catch (IOException e) {
throw new AvroRuntimeException("Decoding datum failed", e);
}
try {
return decoder.decode(stream, reuse);
} catch (UncheckedIOException e) {
throw new AvroRuntimeException(e);
}
}

Expand Down
64 changes: 64 additions & 0 deletions core/src/main/java/org/apache/iceberg/data/avro/RawDecoder.java
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data.avro;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.message.MessageDecoder;
import org.apache.iceberg.avro.ProjectionDatumReader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;

public class RawDecoder<D> extends MessageDecoder.BaseDecoder<D> {
private static final ThreadLocal<BinaryDecoder> DECODER = new ThreadLocal<>();

private final DatumReader<D> reader;

/**
* Creates a new {@link MessageDecoder} that constructs datum instances described by the {@link
* Schema readSchema}.
*
* <p>The {@code readSchema} is used for the expected schema and the {@code writeSchema} is the
* schema used to decode buffers. The {@code writeSchema} must be the schema that was used to
* encode all buffers decoded by this class.
*/
public RawDecoder(
org.apache.iceberg.Schema readSchema,
Function<Schema, DatumReader<?>> readerFunction,
Schema writeSchema) {
this.reader = new ProjectionDatumReader<>(readerFunction, readSchema, ImmutableMap.of(), null);
this.reader.setSchema(writeSchema);
}

@Override
public D decode(InputStream stream, D reuse) {
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(stream, DECODER.get());
DECODER.set(decoder);
try {
return reader.read(reuse, decoder);
} catch (IOException e) {
throw new UncheckedIOException("Decoding datum failed", e);
}
}
}
132 changes: 132 additions & 0 deletions core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.encryption;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.avro.generic.IndexedRecord;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;

class KeyMetadata implements EncryptionKeyMetadata, IndexedRecord {
private static final byte V1 = 1;
private static final Schema SCHEMA_V1 =
new Schema(
required(0, "encryption_key", Types.BinaryType.get()),
optional(1, "aad_prefix", Types.BinaryType.get()));
rdblue marked this conversation as resolved.
Show resolved Hide resolved
private static final org.apache.avro.Schema AVRO_SCHEMA_V1 =
AvroSchemaUtil.convert(SCHEMA_V1, KeyMetadata.class.getCanonicalName());

private static final Map<Byte, Schema> schemaVersions = ImmutableMap.of(V1, SCHEMA_V1);
private static final Map<Byte, org.apache.avro.Schema> avroSchemaVersions =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be ALL_CAPS as well.

ImmutableMap.of(V1, AVRO_SCHEMA_V1);

private static final KeyMetadataEncoder KEY_METADATA_ENCODER = new KeyMetadataEncoder(V1);
private static final KeyMetadataDecoder KEY_METADATA_DECODER = new KeyMetadataDecoder(V1);

private ByteBuffer encryptionKey;
private ByteBuffer aadPrefix;
private org.apache.avro.Schema avroSchema;

/** Used by Avro reflection to instantiate this class * */
KeyMetadata() {}
rdblue marked this conversation as resolved.
Show resolved Hide resolved

KeyMetadata(ByteBuffer encryptionKey, ByteBuffer aadPrefix) {
this.encryptionKey = encryptionKey;
this.aadPrefix = aadPrefix;
this.avroSchema = AVRO_SCHEMA_V1;
}

static Map<Byte, Schema> supportedSchemaVersions() {
return schemaVersions;
}

static Map<Byte, org.apache.avro.Schema> supportedAvroSchemaVersions() {
Copy link
Contributor

@rdblue rdblue Jun 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this exposed? I don't think that we want to expose Avro schemas in the API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for package-internal usage by KeyMetadataEncoder and KeyMetadataDecoder - they need an access to the map of supported schema versions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least we can remove it if we refactor.

return avroSchemaVersions;
}

ByteBuffer encryptionKey() {
return encryptionKey;
}

ByteBuffer aadPrefix() {
return aadPrefix;
}

static KeyMetadata parse(ByteBuffer buffer) {
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please adhere to style guidelines throughout. There should not be code blocks that aren't separated by whitespace.

return KEY_METADATA_DECODER.decode(buffer);
} catch (IOException e) {
throw new UncheckedIOException("Failed to parse envelope encryption metadata", e);
}
}

@Override
public ByteBuffer buffer() {
rdblue marked this conversation as resolved.
Show resolved Hide resolved
try {
return KEY_METADATA_ENCODER.encode(this);
} catch (IOException e) {
throw new UncheckedIOException("Failed to serialize envelope key metadata", e);
}
}

@Override
public EncryptionKeyMetadata copy() {
KeyMetadata metadata = new KeyMetadata(encryptionKey(), aadPrefix());
return metadata;
}

@Override
public void put(int i, Object v) {
rdblue marked this conversation as resolved.
Show resolved Hide resolved
switch (i) {
case 0:
this.encryptionKey = (ByteBuffer) v;
return;
case 1:
this.aadPrefix = (ByteBuffer) v;
return;
default:
// ignore the object, it must be from a newer version of the format
}
}

@Override
public Object get(int i) {
switch (i) {
case 0:
return encryptionKey;
case 1:
return aadPrefix;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
}
}

@Override
public org.apache.avro.Schema getSchema() {
return avroSchema;
}
}
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.encryption;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.message.MessageDecoder;
import org.apache.iceberg.avro.GenericAvroReader;
import org.apache.iceberg.data.avro.RawDecoder;
import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;

class KeyMetadataDecoder extends MessageDecoder.BaseDecoder<KeyMetadata> {
private final org.apache.iceberg.Schema readSchema;
private final Map<Byte, RawDecoder<KeyMetadata>> decoders = new MapMaker().makeMap();

/**
* Creates a new decoder that constructs key metadata instances described by schema version.
*
* <p>The {@code readSchemaVersion} is as used the version of the expected (read) schema. Datum
* instances created by this class will are described by the expected schema.
*/
KeyMetadataDecoder(byte readSchemaVersion) {
this.readSchema = KeyMetadata.supportedSchemaVersions().get(readSchemaVersion);
}

@Override
public KeyMetadata decode(InputStream stream, KeyMetadata reuse) {
byte writeSchemaVersion;

try {
writeSchemaVersion = (byte) stream.read();
} catch (IOException e) {
throw new UncheckedIOException("Failed to read the version byte", e);
}

if (writeSchemaVersion < 0) {
throw new RuntimeException("Version byte - end of stream reached");
}

Schema writeSchema = KeyMetadata.supportedAvroSchemaVersions().get(writeSchemaVersion);

if (writeSchema == null) {
throw new UnsupportedOperationException(
"Cannot resolve schema for version: " + writeSchemaVersion);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This decoder method looks good, other than using a buffer to read the version byte.


RawDecoder<KeyMetadata> decoder = decoders.get(writeSchemaVersion);

if (decoder == null) {
decoder = new RawDecoder<>(readSchema, GenericAvroReader::create, writeSchema);

decoders.put(writeSchemaVersion, decoder);
}

return decoder.decode(stream, reuse);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catch IOException and wrap with UncheckedIOException here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the decoder.decode method doesn't throw an IOException

}
}