Skip to content

Commit

Permalink
Copy the Spring ReactorClientHttpConnector and adapt it to be used wi…
Browse files Browse the repository at this point in the history
…thout spring-context
  • Loading branch information
filiphr committed Feb 9, 2024
1 parent 214f503 commit 20f4031
Show file tree
Hide file tree
Showing 4 changed files with 439 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.flowable.http.common.impl.spring.reactive;

import java.net.URI;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.springframework.http.HttpMethod;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.util.Assert;

import reactor.core.publisher.Mono;
import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest;

// This is a copy of org.springframework.http.client.reactive.ReactorClientHttpConnector
// the name has been changed to indicate that this is an adaptation of the Spring ReactorClientHttpConnector
// The difference is the removal of the SmartLifecycle and the not used methods by Flowable.
// The reason for the adaptation is due to https://github.com/spring-projects/spring-framework/issues/31180#issuecomment-1934453468

/**
* Reactor-Netty implementation of {@link ClientHttpConnector}.
*
* @author Brian Clozel
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
* @see reactor.netty.http.client.HttpClient
* @since 5.0
*/
class FlowableReactorClientHttpConnector implements ClientHttpConnector {

private final HttpClient httpClient;

/**
* Constructor with a pre-configured {@code HttpClient} instance.
*
* @param httpClient the client to use
* @since 5.1
*/
public FlowableReactorClientHttpConnector(HttpClient httpClient) {
Assert.notNull(httpClient, "HttpClient is required");
this.httpClient = httpClient;
}

@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {

AtomicReference<FlowableReactorClientHttpResponse> responseRef = new AtomicReference<>();

HttpClient.RequestSender requestSender = this.httpClient
.request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()));

requestSender = setUri(requestSender, uri);

return requestSender
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
.responseConnection((response, connection) -> {
responseRef.set(new FlowableReactorClientHttpResponse(response, connection));
return Mono.just((ClientHttpResponse) responseRef.get());
})
.next()
.doOnCancel(() -> {
FlowableReactorClientHttpResponse response = responseRef.get();
if (response != null) {
response.releaseAfterCancel(method);
}
});
}

private static HttpClient.RequestSender setUri(HttpClient.RequestSender requestSender, URI uri) {
if (uri.isAbsolute()) {
try {
return requestSender.uri(uri);
} catch (Exception ex) {
// Fall back on passing it in as a String
}
}
return requestSender.uri(uri.toString());
}

private FlowableReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request,
NettyOutbound nettyOutbound) {

return new FlowableReactorClientHttpRequest(method, uri, request, nettyOutbound);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.flowable.http.common.impl.spring.reactive;

import java.net.URI;
import java.nio.file.Path;

import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.http.client.reactive.AbstractClientHttpRequest;
import org.springframework.http.support.Netty4HeadersAdapter;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.cookie.DefaultCookie;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClientRequest;

// This is a copy of org.springframework.http.client.reactive.ReactorClientHttpRequest
// the name has been changed to indicate that this is an adaptation of the Spring ReactorClientHttpRequest
// This is needed since the Spring class is package protected, and we needed a custom FlowableReactorClientHttpConnector.
// See comments in for FlowableReactorClientHttpConnector more information

/**
* {@link org.springframework.http.client.reactive.ClientHttpRequest ClientHttpRequest} implementation for the Reactor-Netty HTTP client.
*
* @author Brian Clozel
* @author Rossen Stoyanchev
* @see reactor.netty.http.client.HttpClient
* @since 5.0
*/
class FlowableReactorClientHttpRequest extends AbstractClientHttpRequest implements ZeroCopyHttpOutputMessage {

private final HttpMethod httpMethod;

private final URI uri;

private final HttpClientRequest request;

private final NettyOutbound outbound;

private final NettyDataBufferFactory bufferFactory;

public FlowableReactorClientHttpRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound outbound) {
this.httpMethod = method;
this.uri = uri;
this.request = request;
this.outbound = outbound;
this.bufferFactory = new NettyDataBufferFactory(outbound.alloc());
}

@Override
public HttpMethod getMethod() {
return this.httpMethod;
}

@Override
public URI getURI() {
return this.uri;
}

@Override
public DataBufferFactory bufferFactory() {
return this.bufferFactory;
}

@Override
@SuppressWarnings("unchecked")
public <T> T getNativeRequest() {
return (T) this.request;
}

@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return doCommit(() -> {
// Send as Mono if possible as an optimization hint to Reactor Netty
if (body instanceof Mono) {
Mono<ByteBuf> byteBufMono = Mono.from(body).map(NettyDataBufferFactory::toByteBuf);
return this.outbound.send(byteBufMono).then();

} else {
Flux<ByteBuf> byteBufFlux = Flux.from(body).map(NettyDataBufferFactory::toByteBuf);
return this.outbound.send(byteBufFlux).then();
}
});
}

@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
Publisher<Publisher<ByteBuf>> byteBufs = Flux.from(body).map(FlowableReactorClientHttpRequest::toByteBufs);
return doCommit(() -> this.outbound.sendGroups(byteBufs).then());
}

private static Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dataBuffers) {
return Flux.from(dataBuffers).map(NettyDataBufferFactory::toByteBuf);
}

@Override
public Mono<Void> writeWith(Path file, long position, long count) {
return doCommit(() -> this.outbound.sendFile(file, position, count).then());
}

@Override
public Mono<Void> setComplete() {
return doCommit(this.outbound::then);
}

@Override
protected void applyHeaders() {
getHeaders().forEach((key, value) -> this.request.requestHeaders().set(key, value));
}

@Override
protected void applyCookies() {
getCookies().values().forEach(values -> values.forEach(value -> {
DefaultCookie cookie = new DefaultCookie(value.getName(), value.getValue());
this.request.addCookie(cookie);
}));
}

@Override
protected HttpHeaders initReadOnlyHeaders() {
return HttpHeaders.readOnlyHttpHeaders(new Netty4HeadersAdapter(this.request.requestHeaders()));
}

}
Loading

0 comments on commit 20f4031

Please sign in to comment.