Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions httpcore5-testing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@
<artifactId>slf4j-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
Expand All @@ -76,12 +82,6 @@
<artifactId>log4j-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.conscrypt</groupId>
<artifactId>conscrypt-openjdk-uber</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/

package org.apache.hc.core5.testing.reactive;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HeaderElements;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.impl.BasicEntityDetails;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.message.BasicHttpResponse;
import org.apache.hc.core5.http.nio.ResponseChannel;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.reactive.ReactiveRequestProcessor;
import org.reactivestreams.Publisher;

public final class ReactiveEchoProcessor implements ReactiveRequestProcessor {
public ReactiveEchoProcessor() {
}

@Override
public void processRequest(
final HttpRequest request,
final EntityDetails entityDetails,
final ResponseChannel responseChannel,
final HttpContext context,
final Publisher<ByteBuffer> requestBody,
final Callback<Publisher<ByteBuffer>> responseBodyFuture
) throws HttpException, IOException {
if (new BasicHeader(HttpHeaders.EXPECT, HeaderElements.CONTINUE).equals(request.getHeader(HttpHeaders.EXPECT))) {
responseChannel.sendInformation(new BasicHttpResponse(100), context);
}

responseChannel.sendResponse(
new BasicHttpResponse(200),
new BasicEntityDetails(-1, ContentType.APPLICATION_OCTET_STREAM),
context);
responseBodyFuture.execute(requestBody);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/

package org.apache.hc.core5.testing.reactive;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;

import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HeaderElements;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.MethodNotSupportedException;
import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.impl.BasicEntityDetails;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.message.BasicHttpResponse;
import org.apache.hc.core5.http.nio.ResponseChannel;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.reactive.ReactiveRequestProcessor;
import org.reactivestreams.Publisher;

import io.reactivex.Flowable;

public class ReactiveRandomProcessor implements ReactiveRequestProcessor {
public ReactiveRandomProcessor() {
}

@Override
public void processRequest(
final HttpRequest request,
final EntityDetails entityDetails,
final ResponseChannel responseChannel,
final HttpContext context,
final Publisher<ByteBuffer> requestBody,
final Callback<Publisher<ByteBuffer>> responseBodyCallback
) throws HttpException, IOException {
final String method = request.getMethod();
if (!"GET".equalsIgnoreCase(method) &&
!"HEAD".equalsIgnoreCase(method) &&
!"POST".equalsIgnoreCase(method) &&
!"PUT".equalsIgnoreCase(method)) {
throw new MethodNotSupportedException(method + " not supported by " + getClass().getName());
}
final URI uri;
try {
uri = request.getUri();
} catch (final URISyntaxException ex) {
throw new ProtocolException(ex.getMessage(), ex);
}
final String path = uri.getPath();
final int slash = path.lastIndexOf('/');
if (slash != -1) {
final String payload = path.substring(slash + 1);
final long n;
if (!payload.isEmpty()) {
try {
n = Long.parseLong(payload);
} catch (final NumberFormatException ex) {
throw new ProtocolException("Invalid request path: " + path);
}
} else {
// random length, but make sure at least something is sent
n = 1 + (int) (Math.random() * 79.0);
}

if (new BasicHeader(HttpHeaders.EXPECT, HeaderElements.CONTINUE).equals(request.getHeader(HttpHeaders.EXPECT))) {
responseChannel.sendInformation(new BasicHttpResponse(100), context);
}

final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
final Flowable<ByteBuffer> stream = ReactiveTestUtils.produceStream(n);
final String hash = ReactiveTestUtils.getStreamHash(n);
response.addHeader("response-hash-code", hash);
final BasicEntityDetails basicEntityDetails = new BasicEntityDetails(n, ContentType.APPLICATION_OCTET_STREAM);
responseChannel.sendResponse(response, basicEntityDetails, context);
responseBodyCallback.execute(stream);
} else {
throw new ProtocolException("Invalid request path: " + path);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/

package org.apache.hc.core5.testing.reactive;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hc.core5.util.TextUtils;
import org.reactivestreams.Publisher;

import io.reactivex.Emitter;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;

public class ReactiveTestUtils {
/** The range from which to generate random data. */
private final static byte[] RANGE = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
.getBytes(StandardCharsets.US_ASCII);

/**
* Produces a deterministic stream of bytes, in randomly sized chunks of up to 128kB.
*
* @param length the number of bytes in the stream
* @return a reactive stream of bytes
*/
public static Flowable<ByteBuffer> produceStream(final long length) {
return produceStream(length, null);
}

/**
* Produces a deterministic stream of bytes, in randomly sized chunks of up to 128kB, while computing the hash of
* the random data.
*
* @param length the number of bytes in the stream
* @param hash an output argument for the hash, set when the end of the stream is reached; if {@code null}, the
* hash will not be computed
* @return a reactive stream of bytes
*/
public static Flowable<ByteBuffer> produceStream(final long length, final AtomicReference<String> hash) {
return produceStream(length, 128 * 1024, hash);
}

/**
* Produces a deterministic stream of bytes, in randomly sized chunks, while computing the hash of the random data.
*
* @param length the number of bytes in the stream
* @param maximumBlockSize the maximum size of any {@code ByteBuffer in the stream}
* @param hash an output argument for the hash, set when the end of the stream is reached; if {@code null}, the
* hash will not be computed
* @return a reactive stream of bytes
*/
public static Flowable<ByteBuffer> produceStream(
final long length,
final int maximumBlockSize,
final AtomicReference<String> hash
) {
return Flowable.generate(new Consumer<Emitter<ByteBuffer>>() {
Random random = new Random(length); // Use the length as the random seed for easy reproducibility
long bytesEmitted = 0;
MessageDigest md = newMessageDigest();

@Override
public void accept(final Emitter<ByteBuffer> emitter) {
final long remainingLength = length - bytesEmitted;
if (remainingLength == 0) {
emitter.onComplete();
if (hash != null) {
hash.set(TextUtils.toHexString(md.digest()));
}
} else {
final int bufferLength = (int) Math.min(remainingLength, 1 + random.nextInt(maximumBlockSize));
final byte[] bs = new byte[bufferLength];
for (int i = 0; i < bufferLength; i++) {
final byte b = RANGE[(int) (Math.random() * RANGE.length)];
bs[i] = b;
}
if (hash != null) {
md.update(bs);
}
emitter.onNext(ByteBuffer.wrap(bs));
bytesEmitted += bufferLength;
}
}
});
}

/**
* Computes the hash of the deterministic stream (as produced by {@link #produceStream(long)}).
*/
public static String getStreamHash(final long length) {
return TextUtils.toHexString(consumeStream(produceStream(length)).blockingGet().md.digest());
}

/**
* Consumes the given stream and returns a data structure containing its length and digest.
*/
public static Single<StreamDescription> consumeStream(final Publisher<ByteBuffer> publisher) {
final StreamDescription seed = new StreamDescription(0, newMessageDigest());
return Flowable.fromPublisher(publisher)
.reduce(seed, new BiFunction<StreamDescription, ByteBuffer, StreamDescription>() {
@Override
public StreamDescription apply(final StreamDescription desc, final ByteBuffer byteBuffer) {
final long length = desc.length + byteBuffer.remaining();
desc.md.update(byteBuffer);
return new StreamDescription(length, desc.md);
}
});
}

private static MessageDigest newMessageDigest() {
try {
return MessageDigest.getInstance("MD5");
} catch (final NoSuchAlgorithmException ex) {
throw new AssertionError(ex);
}
}

public static class StreamDescription {
public final long length;
public final MessageDigest md;

public StreamDescription(final long length, final MessageDigest md) {
this.length = length;
this.md = md;
}
}
}
Loading