Skip to content

Commit

Permalink
spring-projectsGH-3103: Introduce CloudEvents transformers
Browse files Browse the repository at this point in the history
Fixes spring-projects#3103

* Add an `io.cloudevents:cloudevents-api` optional dependency
* Introduce a `HeaderMapper` and `Marshallers` in the `support.cloudevents`
to marshal `CloudEvent` instances
* Introduce a `ToCloudEventTransformer` to build a `CloudEvent` instance
from a `Message` and optional marshaling logic if necessary.
Such a transformer could be used as a general purpose CE protocol binder
before sending a result message into the target protocol channel adapter
  • Loading branch information
artembilan committed Apr 16, 2020
1 parent 47d7bf3 commit 962ba1a
Show file tree
Hide file tree
Showing 6 changed files with 488 additions and 0 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ ext {
assertjVersion = '3.15.0'
assertkVersion = '0.22'
awaitilityVersion = '4.0.2'
cloudEventsVersion = '1.3.0'
commonsDbcp2Version = '2.7.0'
commonsIoVersion = '2.6'
commonsNetVersion = '3.6'
Expand Down Expand Up @@ -419,6 +420,7 @@ project('spring-integration-core') {
optionalApi "io.github.resilience4j:resilience4j-ratelimiter:$resilience4jVersion"
optionalApi "org.apache.avro:avro:$avroVersion"
optionalApi 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
optionalApi "io.cloudevents:cloudevents-api:$cloudEventsVersion"

testImplementation ("org.aspectj:aspectjweaver:$aspectjVersion")
testImplementation ('com.fasterxml.jackson.datatype:jackson-datatype-jsr310')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.support.cloudevents;

import java.util.AbstractMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;

import io.cloudevents.v1.ContextAttributes;

/**
* A Cloud Event header mapper.
*
* @author Artem Bilan
*
* @since 5.3
*/
public class HeaderMapper {

/**
* Cloud event headers prefix as a {@value HEADER_PREFIX}.
*/
public static final String HEADER_PREFIX = "ce_";

/**
* Following the signature of {@link io.cloudevents.fun.FormatHeaderMapper}
* @param attributes The map of attributes
* @param extensions The map of extensions
* @return The map of headers
*/
public static Map<String, String> map(Map<String, String> attributes, Map<String, String> extensions) {
Assert.notNull(attributes, "'attributes' must noy be null");
Assert.notNull(extensions, "'extensions' must noy be null");

Map<String, String> result =
attributes.entrySet()
.stream()
.filter(attribute ->
attribute.getValue() != null
&& !ContextAttributes.datacontenttype.name().equals(attribute.getKey()))
.map(header ->
new AbstractMap.SimpleEntry<>(
HEADER_PREFIX + header.getKey().toLowerCase(Locale.US),
header.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

result.putAll(
extensions.entrySet()
.stream()
.filter(extension -> extension.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
);

Optional.ofNullable(attributes
.get(ContextAttributes.datacontenttype.name()))
.ifPresent((dataContentType) -> {
result.put(MessageHeaders.CONTENT_TYPE, dataContentType);
});

return result;
}

private HeaderMapper() {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.support.cloudevents;

import java.util.HashMap;
import java.util.Map;

import org.springframework.messaging.MessageHeaders;

import io.cloudevents.extensions.ExtensionFormat;
import io.cloudevents.format.BinaryMarshaller;
import io.cloudevents.format.StructuredMarshaller;
import io.cloudevents.format.Wire;
import io.cloudevents.format.builder.EventStep;
import io.cloudevents.json.Json;
import io.cloudevents.v1.Accessor;
import io.cloudevents.v1.AttributesImpl;

/**
* A Cloud Events general purpose marshallers factory.
*
* @author Artem Bilan
*
* @since 5.3
*/
public final class Marshallers {

private static final Map<String, String> NO_HEADERS = new HashMap<>();

/**
* Builds a Binary Content Mode marshaller to marshal cloud events as JSON for
* any Transport Binding.
* @param <T> The data type
* @return a builder to provide the {@link io.cloudevents.CloudEvent} and marshal as JSON
* @see BinaryMarshaller
*/
public static <T> EventStep<AttributesImpl, T, byte[], String> binary() {
return BinaryMarshaller.<AttributesImpl, T, byte[], String>builder()
.map(AttributesImpl::marshal)
.map(Accessor::extensionsOf)
.map(ExtensionFormat::marshal)
.map(HeaderMapper::map)
.map(Json::binaryMarshal)
.builder(Wire::new);
}

/**
* Builds a Structured Content Mode marshaller to marshal cloud event as JSON for
* any Transport Binding.
* @param <T> The data type
* @return a builder to provide the {@link io.cloudevents.CloudEvent} and marshal as JSON
* @see StructuredMarshaller
*/
public static <T> EventStep<AttributesImpl, T, byte[], String> structured() {
return StructuredMarshaller.<AttributesImpl, T, byte[], String>
builder()
.mime(MessageHeaders.CONTENT_TYPE, "application/cloudevents+json")
.map((event) -> Json.binaryMarshal(event, NO_HEADERS))
.skip();
}

private Marshallers() {

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* Provides classes supporting for Cloud Events.
*/
package org.springframework.integration.support.cloudevents;
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.transformer;

import java.net.URI;
import java.time.ZonedDateTime;
import java.util.UUID;

import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.integration.support.cloudevents.Marshallers;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;

import io.cloudevents.CloudEvent;
import io.cloudevents.extensions.ExtensionFormat;
import io.cloudevents.format.Wire;
import io.cloudevents.format.builder.EventStep;
import io.cloudevents.v1.AttributesImpl;
import io.cloudevents.v1.CloudEventBuilder;
import io.cloudevents.v1.CloudEventImpl;

/**
* An {@link AbstractTransformer} implementation to build a cloud event
* from the request message.
* <p>
* This transformer may produce a message according a {@link ToCloudEventTransformer.Result} option.
* By default it is a {@link ToCloudEventTransformer.Result#RAW}
* with the meaning to produce a {@link io.cloudevents.CloudEvent}
* instance as a reply message payload.
* <p>
* A {@link ToCloudEventTransformer.Result#BINARY} mode produces a marshalled into a {@code byte[]}
* a built {@link io.cloudevents.CloudEvent} body and respective cloud event headers.
* <p>
* A {@link ToCloudEventTransformer.Result#STRUCTURED} mode produces a marshalled into a {@code byte[]}
* a whole {@link io.cloudevents.CloudEvent} and respective content type header
* with the {@code "application/cloudevents+json"} value.
*
* @author Artem Bilan
*
* @since 5.3
*/
public class ToCloudEventTransformer extends AbstractTransformer {

public enum Result {

RAW, BINARY, STRUCTURED

}

private final URI source;

@Nullable
private final EventStep<AttributesImpl, Object, byte[], String> wireBuilder;

private Expression typeExpression =
new FunctionExpression<Message<?>>((message) -> message.getPayload().getClass().getName());

@Nullable
private Expression subjectExpression;

@Nullable
private Expression dataSchemaExpression;

@Nullable
private Expression extensionExpression;

private EvaluationContext evaluationContext;

public ToCloudEventTransformer(URI source) {
this(source, Result.RAW);
}

public ToCloudEventTransformer(URI source, Result resultMode) {
Assert.notNull(source, "'source' must not be null");
Assert.notNull(resultMode, "'resultMode' must not be null");
this.source = source;
switch (resultMode) {
case BINARY:
this.wireBuilder = Marshallers.binary();
break;
case STRUCTURED:
this.wireBuilder = Marshallers.structured();
break;
default:
this.wireBuilder = null;
}
}

public void setTypeExpression(Expression typeExpression) {
Assert.notNull(source, "'typeExpression' must not be null");
this.typeExpression = typeExpression;
}

public void setSubjectExpression(@Nullable Expression subjectExpression) {
this.subjectExpression = subjectExpression;
}

public void setDataSchemaExpression(@Nullable Expression dataSchemaExpression) {
this.dataSchemaExpression = dataSchemaExpression;
}

public void setExtensionExpression(@Nullable Expression extensionExpression) {
this.extensionExpression = extensionExpression;
}

@Override
protected void onInit() {
super.onInit();
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
}

@Override
protected Object doTransform(Message<?> message) {
CloudEventImpl<Object> cloudEvent = buildCloudEvent(message);

if (this.wireBuilder != null) {
Wire<byte[], String, String> wire =
this.wireBuilder.withEvent(() -> cloudEvent)
.marshal();

return getMessageBuilderFactory()
.withPayload(wire.getPayload().orElse(new byte[0]))
.copyHeaders(wire.getHeaders())
.copyHeadersIfAbsent(message.getHeaders())
.build();
}
else {
return cloudEvent;
}
}

@SuppressWarnings("unchecked")
private CloudEventImpl<Object> buildCloudEvent(Message<?> message) {
MessageHeaders headers = message.getHeaders();
Object payload = message.getPayload();

CloudEventBuilder<Object> cloudEventBuilder =
payload instanceof CloudEvent
? CloudEventBuilder.builder((CloudEvent<AttributesImpl, Object>) payload)
: CloudEventBuilder.builder();

cloudEventBuilder.withId(headers.getId() != null
? headers.getId().toString()
: UUID.randomUUID().toString())
.withTime(ZonedDateTime.now())
.withSource(this.source)
.withType(this.typeExpression.getValue(this.evaluationContext, message, String.class));

if (!(payload instanceof CloudEvent)) {
if (payload instanceof byte[]) {
cloudEventBuilder.withDataBase64((byte[]) payload);
}
else {
cloudEventBuilder.withData(payload);
}
}

MimeType contentType = StaticMessageHeaderAccessor.getContentType(message);

if (contentType != null) {
cloudEventBuilder.withDataContentType(contentType.toString());
}

if (this.subjectExpression != null) {
cloudEventBuilder.withSubject(
this.subjectExpression.getValue(this.evaluationContext, message, String.class));
}

if (this.dataSchemaExpression != null) {
cloudEventBuilder.withDataschema(
this.dataSchemaExpression.getValue(this.evaluationContext, message, URI.class));
}

if (this.extensionExpression != null) {
cloudEventBuilder.withExtension(
this.extensionExpression.getValue(this.evaluationContext, message, ExtensionFormat.class));
}

return cloudEventBuilder.build();
}

}
Loading

0 comments on commit 962ba1a

Please sign in to comment.