Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Github-issue#1048 : s3-sink with local-file buffer implementation. #2

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion data-prepper-plugins/aggregate-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ While not necessary, a great way to set up the Aggregate Processor [identificati
* [histogram](#histogram)
* [rate_limiter](#rate_limiter)
* [percent_sampler](#percent_sampler)
* [tail_sampler](#tail_sampler)
### <a name="group_duration"></a>
* `group_duration` (Optional): A `String` that represents the amount of time that a group should exist before it is concluded automatically. Supports ISO_8601 notation Strings ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms"). Default value is `180s`.

Expand Down Expand Up @@ -177,7 +178,7 @@ While not necessary, a great way to set up the Aggregate Processor [identificati
* `percent_sampler`: Processes the events and controls the number of events aggregated based on the configuration. Only specified `percent` of the events are allowed and the rest are dropped.
* It supports the following config options
* `percent`: percent of events to be allowed during aggregation window
* When the following three events arrive with in one second and the `percent` is set 50
* When the following four events arrive with in one aggregataion period and the `percent` is set 50
```json
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 2500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 }
Expand All @@ -190,6 +191,35 @@ While not necessary, a great way to set up the Aggregate Processor [identificati
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 3100 }
```

### <a name="tail_sampler"></a>
* `tail_sampler`: The system processes incoming events and determines whether or not they should be allowed based on two criteria. The first criterion is based on whether or not an error condition is present. If any of the aggregated events meet this condition, then all events are allowed to be output. The second criterion is triggered when no error condition is specified or if it is false. In this case, only a subset of the events is allowed to pass through, determined by a probabilistic outcome based on the configured percent value. Since it is difficult to determine exactly when "tail sampling" should occur, the wait_period configuration parameter is used to determine when to conduct this sampling based on the idle time after the last received event. When this action is used, the aggregate `group_duration` is not relevant as the conclusion is based on the `wait_period` and not on the group duration.
* It supports the following config options
* `percent`: percent of events to be allowed during aggregation window
* `wait_period`: minimum idle time before tail sampling is triggered
* `error_condition`: optional condition to indicate the error case for tail sampling
* When the following three events arrive with `percent` is set to 33, and no error condition specified (or error condition evaluates to false)
```json
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 2500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }
```
The following Events may be allowed, and no event is generated when the group is concluded (Since this is probablistic sampling, exact output is fully deterministic)
```json
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 }
```
* When the following three events arrive with in one second and the `error_condition` is set to `/bytes > 3000`
```json
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 2500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 3100 }
```
The following Events (all) will be allowed, and no event is generated when the group is concluded
```json
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 2500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 3100 }
```

## Creating New Aggregate Actions

It is easy to create custom Aggregate Actions to be used by the Aggregate Processor. To do so, create a new class that implements the [AggregateAction interface](src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import org.opensearch.dataprepper.model.event.Event;

import java.util.Optional;
import java.util.List;

/**
* Interface for creating custom actions to be used with the {@link AggregateProcessor}.
Expand Down Expand Up @@ -37,7 +37,8 @@ default AggregateActionResponse handleEvent(final Event event, final AggregateAc
* should not pass an event
* @since 1.3
*/
default Optional<Event> concludeGroup(final AggregateActionInput aggregateActionInput) {
return Optional.empty();
default AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {
return new AggregateActionOutput(List.of());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.dataprepper.plugins.processor.aggregate;

import java.util.Map;
import java.util.function.Function;
import java.time.Duration;

/**
* Implementing classes are able to be passed to the functions of {@link AggregateAction}
Expand All @@ -25,4 +27,13 @@ public interface AggregateActionInput {
* @since 2.1
*/
Map<Object, Object> getIdentificationKeys();

/**
* Sets custom shouldConclude function
*
* @param customShouldConclude function doing custom check
* @since 2.2
*/
default void setCustomShouldConclude(Function<Duration, Boolean> customShouldConclude) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.event.Event;
import java.util.List;

public class AggregateActionOutput {

private final List<Event> events;

public AggregateActionOutput(List<Event> events) {
this.events = events;
}

public List<Event> getEvents() {
return events;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.List;
import java.util.concurrent.locks.Lock;

/**
Expand Down Expand Up @@ -49,18 +49,18 @@ private AggregateActionSynchronizer(final AggregateAction aggregateAction, final
this.actionConcludeGroupEventsProcessingErrors = pluginMetrics.counter(ACTION_CONCLUDE_GROUP_EVENTS_PROCESSING_ERRORS);
}

Optional<Event> concludeGroup(final AggregateIdentificationKeysHasher.IdentificationKeysMap hash, final AggregateGroup aggregateGroup, final boolean forceConclude) {
AggregateActionOutput concludeGroup(final AggregateIdentificationKeysHasher.IdentificationKeysMap hash, final AggregateGroup aggregateGroup, final boolean forceConclude) {
final Lock concludeGroupLock = aggregateGroup.getConcludeGroupLock();
final Lock handleEventForGroupLock = aggregateGroup.getHandleEventForGroupLock();

Optional<Event> concludeGroupEvent = Optional.empty();
AggregateActionOutput actionOutput = new AggregateActionOutput(List.of());
if (concludeGroupLock.tryLock()) {
handleEventForGroupLock.lock();

try {
if (aggregateGroup.shouldConcludeGroup(aggregateGroupManager.getGroupDuration()) || forceConclude) {
LOG.debug("Start critical section in concludeGroup");
concludeGroupEvent = aggregateAction.concludeGroup(aggregateGroup);
actionOutput = aggregateAction.concludeGroup(aggregateGroup);
aggregateGroupManager.closeGroup(hash, aggregateGroup);
}
} catch (final Exception e) {
Expand All @@ -71,7 +71,7 @@ Optional<Event> concludeGroup(final AggregateIdentificationKeysHasher.Identifica
concludeGroupLock.unlock();
}
}
return concludeGroupEvent;
return actionOutput;
}

AggregateActionResponse handleEventForGroup(final Event event, final AggregateIdentificationKeysHasher.IdentificationKeysMap hash, final AggregateGroup aggregateGroup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.function.Function;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -17,6 +18,7 @@ class AggregateGroup implements AggregateActionInput {
private final Lock concludeGroupLock;
private final Lock handleEventForGroupLock;
private final Map<Object, Object> identificationKeys;
private Function<Duration, Boolean> customShouldConclude;

AggregateGroup(final Map<Object, Object> identificationKeys) {
this.groupState = new DefaultGroupState();
Expand All @@ -42,11 +44,19 @@ Lock getConcludeGroupLock() {
return concludeGroupLock;
}

@Override
public void setCustomShouldConclude(Function<Duration, Boolean> shouldConclude) {
customShouldConclude = shouldConclude;
}

Lock getHandleEventForGroupLock() {
return handleEventForGroupLock;
}

boolean shouldConcludeGroup(final Duration groupDuration) {
if (customShouldConclude != null) {
return customShouldConclude.apply(groupDuration);
}
return Duration.between(groupStart, Instant.now()).compareTo(groupDuration) >= 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.time.Instant;

@DataPrepperPlugin(name = "aggregate", pluginType = Processor.class, pluginConfigurationType = AggregateProcessorConfig.class)
Expand Down Expand Up @@ -85,11 +84,14 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {

final List<Map.Entry<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = aggregateGroupManager.getGroupsToConclude(forceConclude);
for (final Map.Entry<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> groupEntry : groupsToConclude) {
final Optional<Event> concludeGroupEvent = aggregateActionSynchronizer.concludeGroup(groupEntry.getKey(), groupEntry.getValue(), forceConclude);

if (concludeGroupEvent.isPresent()) {
recordsOut.add(new Record(concludeGroupEvent.get()));
actionConcludeGroupEventsOutCounter.increment();
final AggregateActionOutput actionOutput = aggregateActionSynchronizer.concludeGroup(groupEntry.getKey(), groupEntry.getValue(), forceConclude);

final List<Event> concludeGroupEvents = actionOutput != null ? actionOutput.getEvents() : null;
if (concludeGroupEvents != null && !concludeGroupEvents.isEmpty()) {
concludeGroupEvents.stream().forEach((event) -> {
recordsOut.add(new Record(event));
actionConcludeGroupEventsOutCounter.increment();
});
} else {
actionConcludeGroupEventsDroppedCounter.increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionResponse;
import org.opensearch.dataprepper.plugins.processor.aggregate.GroupState;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
* An AggregateAction that combines multiple Events into a single Event. This
Expand Down Expand Up @@ -79,12 +79,12 @@ private void consumeEvent(GroupState groupState, Event event) {
}

@Override
public Optional<Event> concludeGroup(final AggregateActionInput aggregateActionInput) {
public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {

final Event event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(aggregateActionInput.getGroupState())
.build();
return Optional.of(event);
return new AggregateActionOutput(List.of(event));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionResponse;
import org.opensearch.dataprepper.plugins.processor.aggregate.GroupState;
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;

import java.time.Instant;
import java.util.List;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.HashMap;
import java.util.Optional;

/**
* An AggregateAction that combines multiple Events into a single Event. This action will count the number of events with same keys and will create a combined event
Expand Down Expand Up @@ -70,7 +71,7 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
}

@Override
public Optional<Event> concludeGroup(final AggregateActionInput aggregateActionInput) {
public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {
GroupState groupState = aggregateActionInput.getGroupState();
Event event;
Instant startTime = (Instant)groupState.get(startTimeKey);
Expand Down Expand Up @@ -102,6 +103,6 @@ public Optional<Event> concludeGroup(final AggregateActionInput aggregateActionI
event = (Event)sum;
}

return Optional.of(event);
return new AggregateActionOutput(List.of(event));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionResponse;
import org.opensearch.dataprepper.plugins.processor.aggregate.GroupState;
import static org.opensearch.dataprepper.plugins.processor.aggregate.AggregateProcessor.getTimeNanos;
Expand All @@ -28,7 +29,6 @@
import java.util.HashMap;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Optional;

/**
* An AggregateAction that combines multiple Events into a single Event. This action will create a combined event with histogram buckets of the values
Expand Down Expand Up @@ -151,7 +151,7 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
}

@Override
public Optional<Event> concludeGroup(final AggregateActionInput aggregateActionInput) {
public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {
GroupState groupState = aggregateActionInput.getGroupState();
Event event;
Instant startTime = (Instant)groupState.get(startTimeKey);
Expand Down Expand Up @@ -208,6 +208,6 @@ public Optional<Event> concludeGroup(final AggregateActionInput aggregateActionI
event = (Event)histogram;
}

return Optional.of(event);
return new AggregateActionOutput(List.of(event));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionResponse;
import org.opensearch.dataprepper.plugins.processor.aggregate.GroupState;

import java.util.Optional;
import java.util.List;

/**
* An AggregateAction that combines multiple Events into a single Event. This action will add the unique keys of each smaller Event to the overall groupState,
Expand All @@ -33,13 +34,13 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
}

@Override
public Optional<Event> concludeGroup(final AggregateActionInput aggregateActionInput) {
public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {

final Event event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(aggregateActionInput.getGroupState())
.build();

return Optional.of(event);
return new AggregateActionOutput(List.of(event));
}
}
Loading