Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
deepaksahu562 committed Mar 3, 2023
2 parents 515b7b6 + 567e3bf commit 938be79
Show file tree
Hide file tree
Showing 27 changed files with 2,874 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,15 @@ public ObjectInputFilter objectInputFilter() {

final String pattern =
"java.lang.Object;" +
"java.util.*;" +
"java.util.Collections*;" +
"java.util.ArrayList*;" +
"java.util.LinkedList*;" +
"java.util.Map*;" +
"java.util.HashMap*;" +
"java.util.LinkedHashMap*;" +
"java.util.HashSet*;" +
"java.util.LinkedHashSet*;" +
"java.util.Date*;" +
"java.time.*;" +
"com.fasterxml.jackson.databind.node.*;" +
"org.opensearch.dataprepper.peerforwarder.model.*;" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,15 @@
import java.lang.reflect.Modifier;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -241,7 +249,8 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext conte
return Stream.of(
arguments(new LinkedBlockingQueue<>()),
arguments(new ArrayBlockingQueue<>(1)),
arguments(Pattern.compile("[1-9]"))
arguments(Pattern.compile("[1-9]")),
arguments(Calendar.getInstance())
);
}
}
Expand All @@ -252,7 +261,18 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext conte
return Stream.of(
arguments(UUID.randomUUID().toString()),
arguments(Collections.singletonList(UUID.randomUUID().toString())),
arguments(Collections.singleton(UUID.randomUUID().toString())),
arguments(Collections.singletonMap(UUID.randomUUID().toString(), UUID.randomUUID().toString())),
arguments(new ArrayList<>()),
arguments(new LinkedList<>()),
arguments(new HashMap<>()),
arguments(new LinkedHashMap<>()),
arguments(new HashSet<>()),
arguments(new LinkedHashSet<>()),
arguments(Collections.unmodifiableList(new ArrayList<>())),
arguments(Collections.unmodifiableMap(new HashMap<>())),
arguments(Collections.unmodifiableSet(new HashSet<>())),
arguments(new Date()),
arguments(Instant.now()),
arguments(Duration.ofMinutes(5)),
arguments(DefaultEventMetadata.builder().withEventType(UUID.randomUUID().toString()).build())
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/http-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ source:
## Configurations

* port (Optional) => An `int` between 0 and 65535 represents the port source is running on. Default is ```2021```.
* path (Optional) => A `string` which represents the URI path for log ingestion, and it should start with `/`. Path can contain `${pipelineName}` placeholder which will be replaced with pipeline name. Default value is `/log/ingest`.
* path (Optional) => A `string` which represents the URI path for log ingestion. It should start with `/` and length should be at least 1. Path can contain `${pipelineName}` placeholder which will be replaced with pipeline name. Default value is `/log/ingest`.
* health_check_service (Optional) => A `boolean` that determines if a `/health` endpoint on the defined port will be home to a health check. Default is `false`
* unauthenticated_health_check (Optional) => A `boolean` that determines if the health endpoint will require authentication. This option is ignored if no authentication is defined. Default is `false`
* request_timeout (Optional) => An `int` larger than 0 represents request timeout in millis. Default is ```10_000```.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.loghttp;

import jakarta.validation.constraints.Size;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.micrometer.core.instrument.util.StringUtils;
Expand Down Expand Up @@ -36,6 +37,7 @@ public class HTTPSourceConfig {
private int port = DEFAULT_PORT;

@JsonProperty("path")
@Size(min = 1, message = "path length should be at least 1")
private String path = DEFAULT_LOG_INGEST_URI;

@JsonProperty("request_timeout")
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/otel-logs-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ source:
## Configurations

* port(Optional) => An `int` represents the port OTel logs source is running on. Default is ```21892```.
* path(Optional) => A `String` which represents the path for sending unframed HTTP requests. This will add a new service which can be used for supporting unframed gRPC with HTTP idiomatic path to a configurable path other than `/opentelemetry.proto.collector.logs.v1.LogsService/Export` which can still be used. Path can contain `${pipelineName}` placeholder which will be replaced with pipeline name.
* path(Optional) => A `String` which represents the path for sending unframed HTTP requests. This can be used for supporting unframed gRPC with HTTP idiomatic path to a configurable path. It should start with `/` and length should be at least 1. `/opentelemetry.proto.collector.logs.v1.LogsService/Export` endpoint will be disabled for both gRPC and HTTP requests if path is configured. Path can contain `${pipelineName}` placeholder which will be replaced with pipeline name.
* request_timeout(Optional) => An `int` represents request timeout in millis. Default is ```10_000```.
* health_check_service(Optional) => A boolean enables a gRPC health check service under ```grpc.health.v1 / Health / Check```. Default is ```false```.
* proto_reflection_service(Optional) => A boolean enables a reflection service for Protobuf services (see [ProtoReflectionService](https://grpc.github.io/grpc-java/javadoc/io/grpc/protobuf/services/ProtoReflectionService.html) and [gRPC reflection](https://github.com/grpc/grpc-java/blob/master/documentation/server-reflection-tutorial.md) docs). Default is ```false```.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public void start(Buffer<Record<Object>> buffer) {

final GrpcServiceBuilder grpcServiceBuilder = GrpcService
.builder()
.addService(ServerInterceptors.intercept(oTelLogsGrpcService, serverInterceptors))
.useClientTimeoutHeader(false)
.useBlockingTaskExecutor(true);

Expand All @@ -103,6 +102,8 @@ public void start(Buffer<Record<Object>> buffer) {
final String transformedOTelLogsSourcePath = oTelLogsSourcePath.replace(PIPELINE_NAME_PLACEHOLDER, pipelineName);
grpcServiceBuilder.addService(transformedOTelLogsSourcePath,
ServerInterceptors.intercept(oTelLogsGrpcService, serverInterceptors), methodDescriptor);
} else {
grpcServiceBuilder.addService(ServerInterceptors.intercept(oTelLogsGrpcService, serverInterceptors));
}

if (oTelLogsSourceConfig.hasHealthCheck()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@


import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Size;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.dataprepper.model.configuration.PluginModel;

Expand Down Expand Up @@ -46,6 +48,7 @@ public class OTelLogsSourceConfig {
private int port = DEFAULT_PORT;

@JsonProperty(PATH)
@Size(min = 1, message = "path length should be at least 1")
private String path;

@JsonProperty(HEALTH_CHECK_SERVICE)
Expand Down Expand Up @@ -92,6 +95,11 @@ public class OTelLogsSourceConfig {
@JsonProperty("authentication")
private PluginModel authentication;

@AssertTrue(message = "path should start with /")
boolean isPathValid() {
return path == null || path.startsWith("/");
}

public void validateAndInitializeCertAndKeyFileInS3() {
boolean certAndKeyFileInS3 = false;
if (useAcmCertForSSL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ void gRPC_with_auth_request_writes_to_buffer_with_successful_response() throws E
}

@Test
void gRPC_request_writes_to_buffer_with_successful_response_with_custom_path() throws Exception {
void gRPC_request_with_custom_path_throws_when_written_to_default_path() {
when(oTelLogsSourceConfig.getPath()).thenReturn(TEST_PATH);
when(oTelLogsSourceConfig.enableUnframedRequests()).thenReturn(true);

Expand All @@ -705,15 +705,10 @@ void gRPC_request_writes_to_buffer_with_successful_response_with_custom_path() t

final LogsServiceGrpc.LogsServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT)
.build(LogsServiceGrpc.LogsServiceBlockingStub.class);
final ExportLogsServiceResponse exportResponse = client.export(createExportLogsRequest());
assertThat(exportResponse, notNullValue());

final ArgumentCaptor<Collection<Record<Object>>> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Collection.class);
verify(buffer).writeAll(bufferWriteArgumentCaptor.capture(), anyInt());

final Collection<Record<Object>> actualBufferWrites = bufferWriteArgumentCaptor.getValue();
assertThat(actualBufferWrites, notNullValue());
assertThat(actualBufferWrites, hasSize(1));
final StatusRuntimeException actualException = assertThrows(StatusRuntimeException.class, () -> client.export(createExportLogsRequest()));
assertThat(actualException.getStatus(), notNullValue());
assertThat(actualException.getStatus().getCode(), equalTo(Status.UNIMPLEMENTED.getCode()));
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ void testInvalidConfigWithEmptyKeyFile() {

@Test
void testValidConfigWithCustomPath() {
final String testPath = "testPath";
final String testPath = "/testPath";
// Prepare
final PluginSetting customPathPluginSetting = completePluginSettingForOtelLogsSource(
DEFAULT_REQUEST_TIMEOUT_MS,
Expand All @@ -224,6 +224,31 @@ void testValidConfigWithCustomPath() {

// When/Then
assertThat(oTelLogsSourceConfig.getPath(), equalTo(testPath));
assertThat(oTelLogsSourceConfig.isPathValid(), equalTo(true));
}

@Test
void testInValidConfigWithCustomPath() {
final String testPath = "invalidPath";
// Prepare
final PluginSetting customPathPluginSetting = completePluginSettingForOtelLogsSource(
DEFAULT_REQUEST_TIMEOUT_MS,
DEFAULT_PORT,
testPath,
false,
false,
false,
true,
TEST_KEY_CERT,
"",
DEFAULT_THREAD_COUNT,
DEFAULT_MAX_CONNECTION_COUNT);

final OTelLogsSourceConfig oTelLogsSourceConfig = OBJECT_MAPPER.convertValue(customPathPluginSetting.getSettings(), OTelLogsSourceConfig.class);

// When/Then
assertThat(oTelLogsSourceConfig.getPath(), equalTo(testPath));
assertThat(oTelLogsSourceConfig.isPathValid(), equalTo(false));
}

private PluginSetting completePluginSettingForOtelLogsSource(final int requestTimeoutInMillis,
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/otel-metrics-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ source:
## Configurations

* port(Optional) => An `int` represents the port Otel metrics source is running on. Default is ```21891```.
* path(Optional) => A `String` which represents the path for sending unframed HTTP requests. This will add a new service which can be used for supporting unframed gRPC with HTTP idiomatic path to a configurable path other than `/opentelemetry.proto.collector.metrics.v1.MetricsService/Export` which can still be used. Path can contain `${pipelineName}` placeholder which will be replaced with pipeline name.
* path(Optional) => A `String` which represents the path for sending unframed HTTP requests. This can be used for supporting unframed gRPC with HTTP idiomatic path to a configurable path. It should start with `/` and length should be at least 1. `/opentelemetry.proto.collector.metrics.v1.MetricsService/Export` endpoint will be disabled for both gRPC and HTTP requests if path is configured. Path can contain `${pipelineName}` placeholder which will be replaced with pipeline name.
* request_timeout(Optional) => An `int` represents request timeout in millis. Default is ```10_000```.
* health_check_service(Optional) => A boolean enables health check service. When ```true``` enables a gRPC health check service under ```grpc.health.v1.Health/Check```. Default is ```false```. In order to use the health check service, you must also enable ```proto_reflection_service```.
* proto_reflection_service(Optional) => A boolean enables a reflection service for Protobuf services (see [ProtoReflectionService](https://grpc.github.io/grpc-java/javadoc/io/grpc/protobuf/services/ProtoReflectionService.html) and [gRPC reflection](https://github.com/grpc/grpc-java/blob/master/documentation/server-reflection-tutorial.md) docs). Default is ```false```.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public void start(Buffer<Record<ExportMetricsServiceRequest>> buffer) {

final GrpcServiceBuilder grpcServiceBuilder = GrpcService
.builder()
.addService(ServerInterceptors.intercept(oTelMetricsGrpcService, serverInterceptors))
.useClientTimeoutHeader(false)
.useBlockingTaskExecutor(true);

Expand All @@ -104,6 +103,8 @@ public void start(Buffer<Record<ExportMetricsServiceRequest>> buffer) {
final String transformedOTelMetricsSourcePath = oTelMetricsSourcePath.replace(PIPELINE_NAME_PLACEHOLDER, pipelineName);
grpcServiceBuilder.addService(transformedOTelMetricsSourcePath,
ServerInterceptors.intercept(oTelMetricsGrpcService, serverInterceptors), methodDescriptor);
} else {
grpcServiceBuilder.addService(ServerInterceptors.intercept(oTelMetricsGrpcService, serverInterceptors));
}

if (oTelMetricsSourceConfig.hasHealthCheck()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.dataprepper.plugins.source.otelmetrics;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Size;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.dataprepper.model.configuration.PluginModel;

Expand Down Expand Up @@ -46,6 +48,7 @@ public class OTelMetricsSourceConfig {
private int port = DEFAULT_PORT;

@JsonProperty(PATH)
@Size(min = 1, message = "path length should be at least 1")
private String path;

@JsonProperty(HEALTH_CHECK_SERVICE)
Expand Down Expand Up @@ -95,6 +98,11 @@ public class OTelMetricsSourceConfig {
@JsonProperty(UNAUTHENTICATED_HEALTH_CHECK)
private boolean unauthenticatedHealthCheck = false;

@AssertTrue(message = "path should start with /")
boolean isPathValid() {
return path == null || path.startsWith("/");
}

public void validateAndInitializeCertAndKeyFileInS3() {
boolean certAndKeyFileInS3 = false;
if (useAcmCertForSSL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -913,23 +913,19 @@ void gRPC_with_auth_request_writes_to_buffer_with_successful_response() throws E
}

@Test
void gRPC_request_writes_to_buffer_with_successful_response_with_custom_path() throws Exception {
void gRPC_request_with_custom_path_throws_when_written_to_default_path() {
when(oTelMetricsSourceConfig.getPath()).thenReturn(TEST_PATH);
when(oTelMetricsSourceConfig.enableUnframedRequests()).thenReturn(true);

configureObjectUnderTest();
SOURCE.start(buffer);

final MetricsServiceGrpc.MetricsServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT)
.build(MetricsServiceGrpc.MetricsServiceBlockingStub.class);
final ExportMetricsServiceResponse exportResponse = client.export(createExportMetricsRequest());
assertThat(exportResponse, notNullValue());

final ArgumentCaptor<Record<ExportMetricsServiceRequest>> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Record.class);
verify(buffer).write(bufferWriteArgumentCaptor.capture(), anyInt());

final Record<ExportMetricsServiceRequest> actualBufferWrites = bufferWriteArgumentCaptor.getValue();
assertThat(actualBufferWrites, notNullValue());
assertThat(actualBufferWrites.getData().getResourceMetricsCount(), equalTo(1));
final StatusRuntimeException actualException = assertThrows(StatusRuntimeException.class, () -> client.export(createExportMetricsRequest()));
assertThat(actualException.getStatus(), notNullValue());
assertThat(actualException.getStatus().getCode(), equalTo(Status.UNIMPLEMENTED.getCode()));
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ void testInvalidConfigWithEmptyKeyFile() {

@Test
void testValidConfigWithCustomPath() {
final String testPath = "testPath";
final String testPath = "/testPath";
// Prepare
final PluginSetting customPathPluginSetting = completePluginSettingForOtelMetricsSource(
DEFAULT_REQUEST_TIMEOUT_MS,
Expand All @@ -251,6 +251,31 @@ void testValidConfigWithCustomPath() {

// When/Then
assertThat(oTelMetricsSourceConfig.getPath(), equalTo(testPath));
assertThat(oTelMetricsSourceConfig.isPathValid(), equalTo(true));
}

@Test
void testInValidConfigWithCustomPath() {
final String testPath = "invalidPath";
// Prepare
final PluginSetting customPathPluginSetting = completePluginSettingForOtelMetricsSource(
DEFAULT_REQUEST_TIMEOUT_MS,
DEFAULT_PORT,
testPath,
false,
false,
false,
true,
TEST_KEY_CERT,
"",
DEFAULT_THREAD_COUNT,
DEFAULT_MAX_CONNECTION_COUNT);

final OTelMetricsSourceConfig oTelMetricsSourceConfig = OBJECT_MAPPER.convertValue(customPathPluginSetting.getSettings(), OTelMetricsSourceConfig.class);

// When/Then
assertThat(oTelMetricsSourceConfig.getPath(), equalTo(testPath));
assertThat(oTelMetricsSourceConfig.isPathValid(), equalTo(false));
}

private PluginSetting completePluginSettingForOtelMetricsSource(final int requestTimeoutInMillis,
Expand Down

0 comments on commit 938be79

Please sign in to comment.