Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/java/io/amazon-web-services2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dependencies {
implementation "software.amazon.kinesis:amazon-kinesis-client:2.3.4", excludeNetty
implementation library.java.netty_all // force version of netty used by Beam
permitUnusedDeclared library.java.netty_all
implementation library.java.byte_buddy
implementation library.java.jackson_core
implementation library.java.jackson_annotations
implementation library.java.jackson_databind
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.utils.ImmutableMap;

/** {@link Coder}s for common AWS SDK objects. */
/**
* {@link Coder}s for common AWS SDK objects.
*
* @deprecated {@link org.apache.beam.sdk.schemas.SchemaCoder SchemaCoders} for {@link
* software.amazon.awssdk.core.SdkPojo AWS model classes} will be automatically inferred by
* means of {@link org.apache.beam.sdk.io.aws2.schemas.AwsSchemaProvider AwsSchemaProvider}.
*/
@Deprecated
public final class AwsCoders {

private AwsCoders() {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.schemas;

import java.io.Serializable;
import java.util.List;
import software.amazon.awssdk.core.SdkField;
import software.amazon.awssdk.core.SdkPojo;
import software.amazon.awssdk.utils.builder.SdkBuilder;

/** Builder factory for AWS {@link SdkPojo} to avoid using reflection to instantiate a builder. */
public abstract class AwsBuilderFactory<
PojoT extends SdkPojo, BuilderT extends SdkBuilder<BuilderT, PojoT> & SdkPojo>
implements Serializable {
protected List<SdkField<?>> sdkFields() {
return get().sdkFields();
}

protected abstract BuilderT get();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.schemas;

import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
import static org.apache.beam.sdk.io.aws2.schemas.AwsSchemaUtils.getter;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets.difference;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets.newHashSet;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import org.apache.beam.sdk.io.aws2.schemas.AwsSchemaUtils.SdkBuilderSetter;
import org.apache.beam.sdk.io.aws2.schemas.AwsTypes.ConverterFactory;
import org.apache.beam.sdk.schemas.CachingFactory;
import org.apache.beam.sdk.schemas.Factory;
import org.apache.beam.sdk.schemas.FieldValueGetter;
import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
import org.apache.beam.sdk.schemas.GetterBasedSchemaProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.RowWithGetters;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.Nullable;
import software.amazon.awssdk.core.SdkField;
import software.amazon.awssdk.core.SdkPojo;
import software.amazon.awssdk.utils.builder.SdkBuilder;

/**
* Schema provider for AWS {@link SdkPojo} models using the provided field metadata (@see {@link
* SdkPojo#sdkFields()}) rather than reflection.
*
* <p>Note: Beam doesn't support self-referential schemas. Some AWS models are not compatible with
* schemas for that reason and require a dedicated coder, such as {@link
* software.amazon.awssdk.services.dynamodb.model.AttributeValue DynamoDB AttributeValue} ({@link
* org.apache.beam.sdk.io.aws2.dynamodb.AttributeValueCoder coder}).
*/
public class AwsSchemaProvider extends GetterBasedSchemaProvider {
/** Byte-code generated {@link SdkBuilder} factories. */
@SuppressWarnings("rawtypes") // Crashes checker otherwise
private static final Map<Class, AwsBuilderFactory> FACTORIES = Maps.newConcurrentMap();

@Override
public @Nullable <T> Schema schemaFor(TypeDescriptor<T> type) {
if (!SdkPojo.class.isAssignableFrom(type.getRawType())) {
return null;
}
return AwsTypes.schemaFor(sdkFields((Class<? extends SdkPojo>) type.getRawType()));
}

@SuppressWarnings("rawtypes")
@Override
public List<FieldValueGetter> fieldValueGetters(Class<?> clazz, Schema schema) {
ConverterFactory fromAws = ConverterFactory.fromAws();
Map<String, SdkField<?>> sdkFields = sdkFieldsByName((Class<? extends SdkPojo>) clazz);
List<FieldValueGetter> getters = new ArrayList<>(schema.getFieldCount());
for (String field : schema.getFieldNames()) {
SdkField<?> sdkField = checkStateNotNull(sdkFields.get(field), "Unknown field");
getters.add(getter(field, fromAws.create(sdkField::getValueOrDefault, sdkField)));
}
return getters;
}

// Overriding `fromRowFunction` to instead use the generated builder factories with SDK provided
// setters from `SdkField`s.
@Override
public <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T> type) {
checkState(SdkPojo.class.isAssignableFrom(type.getRawType()), "Unsupported type %s", type);
return FromRowFactory.create(type.getRawType());
}

private static class FromRowWithBuilder<T extends SdkPojo>
implements SerializableFunction<Row, T> {
private final Class<T> cls;
private final Factory<List<SdkBuilderSetter>> factory;

FromRowWithBuilder(Class<T> cls, Factory<List<SdkBuilderSetter>> factory) {
this.cls = cls;
this.factory = factory;
}

@Override
@SuppressWarnings("nullness") // checker doesn't recognize the builder type
public T apply(Row row) {
if (row instanceof RowWithGetters) {
Object target = ((RowWithGetters) row).getGetterTarget();
if (target.getClass().equals(cls)) {
return (T) target; // simply extract the underlying object instead of creating a new one.
}
}
SdkBuilder<?, T> builder = sdkBuilder(cls);
List<SdkBuilderSetter> setters = factory.create(cls, row.getSchema());
for (SdkBuilderSetter set : setters) {
if (!row.getSchema().hasField(set.name())) {
continue;
}
set.set(builder, row.getValue(set.name()));
}
return builder.build();
}

@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FromRowWithBuilder<?> that = (FromRowWithBuilder<?>) o;
return cls.equals(that.cls);
}

@Override
public int hashCode() {
return Objects.hash(cls);
}
}

private static class FromRowFactory implements Factory<SerializableFunction<Row, ?>> {
@SuppressWarnings("nullness") // circular initialization
private final Factory<SerializableFunction<Row, ?>> cachingFactory = new CachingFactory<>(this);

private final Factory<List<SdkBuilderSetter>> settersFactory =
new CachingFactory<>(new SettersFactory());

@SuppressWarnings("nullness") // schema nullable for this factory
static <T> SerializableFunction<Row, T> create(Class<? super T> clazz) {
checkState(SdkPojo.class.isAssignableFrom(clazz), "Unsupported clazz %s", clazz);
return (SerializableFunction<Row, T>) new FromRowFactory().cachingFactory.create(clazz, null);
}

@Override
public SerializableFunction<Row, ?> create(Class<?> clazz, Schema ignored) {
return new FromRowWithBuilder<>((Class<? extends SdkPojo>) clazz, settersFactory);
}

private class SettersFactory implements Factory<List<SdkBuilderSetter>> {
private final ConverterFactory toAws;

private SettersFactory() {
this.toAws = ConverterFactory.toAws(cachingFactory);
}

@Override
public List<SdkBuilderSetter> create(Class<?> clazz, Schema schema) {
Map<String, SdkField<?>> fields = sdkFieldsByName((Class<? extends SdkPojo>) clazz);
checkForUnknownFields(schema, fields);

List<SdkBuilderSetter> setters = new ArrayList<>(schema.getFieldCount());
for (Entry<String, SdkField<?>> entry : fields.entrySet()) {
SdkField<?> sdkField = entry.getValue();
BiConsumer<SdkBuilder<?, ?>, Object> setter =
toAws.needsConversion(sdkField)
? ConverterFactory.createSetter(sdkField::set, toAws.create(sdkField))
: sdkField::set;
setters.add(AwsSchemaUtils.setter(entry.getKey(), setter));
}
return setters;
}
}

private void checkForUnknownFields(Schema schema, Map<String, SdkField<?>> fields) {
Set<String> unknowns = difference(newHashSet(schema.getFieldNames()), fields.keySet());
checkState(unknowns.isEmpty(), "Row schema contains unknown fields: %s", unknowns);
}
}

@Override
public List<FieldValueTypeInformation> fieldValueTypeInformations(Class<?> cls, Schema schema) {
throw new UnsupportedOperationException("FieldValueTypeInformation not available");
}

@Override
public SchemaUserTypeCreator schemaTypeCreator(Class<?> cls, Schema schema) {
throw new UnsupportedOperationException("SchemaUserTypeCreator not available");
}

private static <T extends SdkPojo> AwsBuilderFactory<T, ?> builderFactory(Class<T> cls) {
return FACTORIES.computeIfAbsent(cls, c -> AwsSchemaUtils.builderFactory(cls));
}

private static <T extends SdkPojo> List<SdkField<?>> sdkFields(Class<T> cls) {
return builderFactory(cls).sdkFields();
}

private static <T extends SdkPojo> SdkBuilder<?, T> sdkBuilder(Class<T> cls) {
return builderFactory(cls).get();
}

private static <T extends SdkPojo> Map<String, SdkField<?>> sdkFieldsByName(Class<T> cls) {
return sdkFields(cls).stream().collect(toMap(AwsTypes::normalizedNameOf, identity()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.sqs;
package org.apache.beam.sdk.io.aws2.schemas;

import com.google.auto.service.AutoService;
import java.util.List;
import org.apache.beam.sdk.coders.CoderProvider;
import org.apache.beam.sdk.coders.CoderProviderRegistrar;
import org.apache.beam.sdk.coders.CoderProviders;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.SchemaProviderRegistrar;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import software.amazon.awssdk.services.sqs.model.Message;

/** A {@link CoderProviderRegistrar} for standard types used with {@link SqsIO}. */
@AutoService(CoderProviderRegistrar.class)
public class MessageCoderRegistrar implements CoderProviderRegistrar {
@AutoService(SchemaProviderRegistrar.class)
public class AwsSchemaRegistrar implements SchemaProviderRegistrar {
@Override
public List<CoderProvider> getCoderProviders() {
return ImmutableList.of(
CoderProviders.forCoder(TypeDescriptor.of(Message.class), MessageCoder.of()));
public List<SchemaProvider> getSchemaProviders() {
return ImmutableList.of(new AwsSchemaProvider());
}
}
Loading