Skip to content

Commit

Permalink
Allow unparsed binary data to be used for ingestion (#508) (#509)
Browse files Browse the repository at this point in the history
Co-authored-by: Sylvain Wallez <sylvain@elastic.co>
  • Loading branch information
github-actions[bot] and swallez committed Feb 13, 2023
1 parent 93e176c commit f33dfe7
Show file tree
Hide file tree
Showing 16 changed files with 360 additions and 72 deletions.
Expand Up @@ -512,5 +512,5 @@ public static <TDocument> JsonpDeserializer<CreateRequest<TDocument>> createCrea
}
return params;

}, SimpleEndpoint.emptyMap(), true, CreateResponse._DESERIALIZER);
}, SimpleEndpoint.emptyMap(), r -> r.document(), CreateResponse._DESERIALIZER);
}
Expand Up @@ -663,5 +663,5 @@ public static <TDocument> JsonpDeserializer<IndexRequest<TDocument>> createIndex
}
return params;

}, SimpleEndpoint.emptyMap(), true, IndexResponse._DESERIALIZER);
}, SimpleEndpoint.emptyMap(), r -> r.document(), IndexResponse._DESERIALIZER);
}
54 changes: 54 additions & 0 deletions java-client/src/main/java/co/elastic/clients/json/JsonpUtils.java
Expand Up @@ -150,6 +150,60 @@ public static void skipValue(JsonParser parser, Event event) {
}
}

/**
* Copy the JSON value at the current parser location to a JSON generator.
*/
public static void copy(JsonParser parser, JsonGenerator generator, JsonParser.Event event) {

switch (event) {
case START_OBJECT:
generator.writeStartObject();
while ((event = parser.next()) != Event.END_OBJECT) {
expectEvent(parser, Event.KEY_NAME, event);
generator.writeKey(parser.getString());
copy(parser, generator, parser.next());
}
generator.writeEnd();
break;

case START_ARRAY:
generator.writeStartArray();
generator.writeStartObject();
while ((event = parser.next()) != Event.END_ARRAY) {
copy(parser, generator, event);
}
generator.writeEnd();
break;

case VALUE_STRING:
generator.write(parser.getString());
break;

case VALUE_FALSE:
generator.write(false);
break;

case VALUE_TRUE:
generator.write(true);
break;

case VALUE_NULL:
generator.writeNull();
break;

case VALUE_NUMBER:
if (parser.isIntegralNumber()) {
generator.write(parser.getLong());
} else {
generator.write(parser.getBigDecimal());
}
break;

default:
throw new UnexpectedJsonEventException(parser, event);
}
}

public static <T> void serialize(T value, JsonGenerator generator, @Nullable JsonpSerializer<T> serializer, JsonpMapper mapper) {
if (serializer != null) {
serializer.serialize(value, generator, mapper);
Expand Down
Expand Up @@ -70,7 +70,23 @@ default Map<String, String> headers(RequestT request) {
return Collections.emptyMap();
}

boolean hasRequestBody();
/**
* Get the body for a request. The caller must handle several cases depending on the interface implemented by the result:
* <li>
* {@code null} means the request has no body.
* </li>
* <li>
* {@link co.elastic.clients.json.NdJsonpSerializable} must be serialized as nd-json.
* </li>
* <li>
* {@link co.elastic.clients.util.BinaryData} must be serialized as is.
* </li>
* <li>
* All other objects must be serialized as JSON using a {@link co.elastic.clients.json.JsonpMapper}
* </li>
*/
@Nullable
Object body(RequestT request);

/**
* Is this status code to be considered as an error?
Expand All @@ -90,7 +106,7 @@ default BinaryEndpoint<RequestT> withBinaryResponse() {
this::requestUrl,
this::queryParameters,
this::headers,
this.hasRequestBody(),
this::body,
null
);
}
Expand Down
Expand Up @@ -24,6 +24,19 @@

public class BinaryEndpoint<RequestT> extends EndpointBase<RequestT, BinaryResponse> {

public BinaryEndpoint(
String id,
Function<RequestT, String> method,
Function<RequestT, String> requestUrl,
Function<RequestT,
Map<String, String>> queryParameters,
Function<RequestT, Map<String, String>> headers,
Function<RequestT, Object> body,
Object ignored // same number of arguments as SimpleEndpoint
) {
super(id, method, requestUrl, queryParameters, headers, body);
}

public BinaryEndpoint(
String id,
Function<RequestT, String> method,
Expand All @@ -34,7 +47,7 @@ public BinaryEndpoint(
boolean hasRequestBody,
Object ignored // same number of arguments as SimpleEndpoint
) {
super(id, method, requestUrl, queryParameters, headers, hasRequestBody);
super(id, method, requestUrl, queryParameters, headers, hasRequestBody ? returnSelf() : returnNull());
}

@Override
Expand Down
Expand Up @@ -34,7 +34,7 @@ public BooleanEndpoint(
boolean hasRequestBody,
Object ignored // same number of arguments as SimpleEndpoint
) {
super(id, method, requestUrl, queryParameters, headers, hasRequestBody);
super(id, method, requestUrl, queryParameters, headers, hasRequestBody ? returnSelf() : returnNull());
}

@Override
Expand Down
Expand Up @@ -58,9 +58,10 @@ public Map<String, String> headers(Req request) {
return endpoint.headers(request);
}

@Nullable
@Override
public boolean hasRequestBody() {
return endpoint.hasRequestBody();
public Object body(Req request) {
return endpoint.body(request);
}

@Override
Expand Down
Expand Up @@ -24,13 +24,16 @@
import co.elastic.clients.transport.Endpoint;
import org.apache.http.client.utils.URLEncodedUtils;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;

public class EndpointBase<RequestT, ResponseT> implements Endpoint<RequestT, ResponseT, ErrorResponse> {

private static final Function<?, Map<String, String>> EMPTY_MAP = x -> Collections.emptyMap();
private static final Function<?, Object> RETURN_NULL = x -> null;
private static final Function<?, ?> RETURN_SELF = x -> x;

/**
* Returns a function that always returns an empty String to String map. Useful to avoid creating lots of
Expand All @@ -41,27 +44,44 @@ public static <T> Function<T, Map<String, String>> emptyMap() {
return (Function<T, Map<String, String>>) EMPTY_MAP;
}

/**
* Returns a function that always returns {@code null}.
*/
@SuppressWarnings("unchecked")
static <T, U> Function<T, U> returnNull() {
return (Function<T, U>) RETURN_NULL;
}

/**
* Returns a function that always returns its parameter. It's similar to {@code Function.identity()} with the difference
* that the input and output generic parameters are different, making it suitable for use in a wider range of use cases.
*/
@SuppressWarnings("unchecked")
static <T, U> Function<T, U> returnSelf() {
return (Function<T, U>) RETURN_SELF;
}

protected final String id;
protected final Function<RequestT, String> method;
protected final Function<RequestT, String> requestUrl;
protected final Function<RequestT, Map<String, String>> queryParameters;
protected final Function<RequestT, Map<String, String>> headers;
protected final boolean hasRequestBody;
protected final Function<RequestT, Object> body;

public EndpointBase(
String id,
Function<RequestT, String> method,
Function<RequestT, String> requestUrl,
Function<RequestT, Map<String, String>> queryParameters,
Function<RequestT, Map<String, String>> headers,
boolean hasRequestBody
Function<RequestT, Object> body
) {
this.id = id;
this.method = method;
this.requestUrl = requestUrl;
this.queryParameters = queryParameters;
this.headers = headers;
this.hasRequestBody = hasRequestBody;
this.body = body;
}

@Override
Expand Down Expand Up @@ -89,9 +109,10 @@ public Map<String, String> headers(RequestT request) {
return this.headers.apply(request);
}

@Nullable
@Override
public boolean hasRequestBody() {
return this.hasRequestBody;
public Object body(RequestT request) {
return this.body.apply(request);
}

// ES-specific
Expand All @@ -114,7 +135,7 @@ public <NewResponseT> SimpleEndpoint<RequestT, NewResponseT> withResponseDeseria
requestUrl,
queryParameters,
headers,
hasRequestBody,
body,
newResponseParser
);
}
Expand Down
Expand Up @@ -24,24 +24,12 @@
import co.elastic.clients.transport.JsonEndpoint;
import org.apache.http.client.utils.URLEncodedUtils;

import java.util.Collections;
import java.util.Map;
import java.util.function.Function;

public class SimpleEndpoint<RequestT, ResponseT> extends EndpointBase<RequestT, ResponseT>
implements JsonEndpoint<RequestT, ResponseT, ErrorResponse> {

private static final Function<?, Map<String, String>> EMPTY_MAP = x -> Collections.emptyMap();

/**
* Returns a function that always returns an empty String to String map. Useful to avoid creating lots of
* duplicate lambdas in endpoints that don't have headers or parameters.
*/
@SuppressWarnings("unchecked")
public static <T> Function<T, Map<String, String>> emptyMap() {
return (Function<T, Map<String, String>>) EMPTY_MAP;
}

private final JsonpDeserializer<ResponseT> responseParser;

public SimpleEndpoint(
Expand All @@ -50,13 +38,33 @@ public SimpleEndpoint(
Function<RequestT, String> requestUrl,
Function<RequestT, Map<String, String>> queryParameters,
Function<RequestT, Map<String, String>> headers,
boolean hasRequestBody,
Function<RequestT, Object> body,
JsonpDeserializer<ResponseT> responseParser
) {
super(id, method, requestUrl, queryParameters, headers, hasRequestBody);
super(id, method, requestUrl, queryParameters, headers, body);
this.responseParser = responseParser;
}

public SimpleEndpoint(
String id,
Function<RequestT, String> method,
Function<RequestT, String> requestUrl,
Function<RequestT, Map<String, String>> queryParameters,
Function<RequestT, Map<String, String>> headers,
boolean hasResponseBody,
JsonpDeserializer<ResponseT> responseParser
) {
this(
id,
method,
requestUrl,
queryParameters,
headers,
hasResponseBody ? returnSelf() : returnNull(),
responseParser
);
}

@Override
public JsonpDeserializer<ResponseT> responseDeserializer() {
return this.responseParser;
Expand All @@ -76,7 +84,7 @@ public <NewResponseT> SimpleEndpoint<RequestT, NewResponseT> withResponseDeseria
requestUrl,
queryParameters,
headers,
hasRequestBody,
body,
newResponseParser
);
}
Expand Down
Expand Up @@ -59,6 +59,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -215,21 +216,41 @@ private <RequestT> org.elasticsearch.client.Request prepareLowLevelRequest(

clientReq.addParameters(params);

if (endpoint.hasRequestBody()) {
// Request has a body and must implement JsonpSerializable or NdJsonpSerializable

if (request instanceof NdJsonpSerializable) {
Object body = endpoint.body(request);
if (body != null) {
// Request has a body
if (body instanceof NdJsonpSerializable) {
List<ByteBuffer> lines = new ArrayList<>();
collectNdJsonLines(lines, (NdJsonpSerializable)request);
collectNdJsonLines(lines, (NdJsonpSerializable) request);
clientReq.setEntity(new MultiBufferEntity(lines, JsonContentType));

} else if (body instanceof BinaryData) {
BinaryData data = (BinaryData)body;

// ES expects the Accept and Content-Type headers to be consistent.
ContentType contentType;
String dataContentType = data.contentType();
if (co.elastic.clients.util.ContentType.APPLICATION_JSON.equals(dataContentType)) {
// Fast path
contentType = JsonContentType;
} else {
contentType = ContentType.parse(dataContentType);
}

clientReq.setEntity(new MultiBufferEntity(
Collections.singletonList(data.asByteBuffer()),
contentType
));

} else {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
JsonGenerator generator = mapper.jsonProvider().createGenerator(baos);
mapper.serialize(request, generator);
mapper.serialize(body, generator);
generator.close();
clientReq.setEntity(new ByteArrayEntity(baos.toByteArray(), JsonContentType));
}
}

// Request parameter intercepted by LLRC
clientReq.addParameter("ignore", "400,401,403,404,405");
return clientReq;
Expand Down

0 comments on commit f33dfe7

Please sign in to comment.