Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -131,11 +130,6 @@ public void fetchSubscriptionList() {
}
}

@Override
public void init(final Properties properties) {

}

@Override
public void consume(List<ExportData> data) {
GRPCStreamStatus status = new GRPCStreamStatus();
Expand Down Expand Up @@ -239,11 +233,6 @@ public void onError(List<ExportData> data, Throwable t) {
log.error(t.getMessage(), t);
}

@Override
public void onExit() {

}

private boolean eventTypeMatch(ExportEvent.EventType eventType,
org.apache.skywalking.oap.server.exporter.grpc.EventType subscriptionType) {
return (ExportEvent.EventType.INCREMENT.equals(eventType) && EventType.INCREMENT.equals(subscriptionType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
Expand Down Expand Up @@ -94,11 +93,6 @@ public boolean isEnabled() {
return setting.isEnableKafkaLog();
}

@Override
public void init(final Properties properties) {

}

@Override
public void consume(final List<LogRecord> data) {
for (LogRecord logRecord : data) {
Expand Down Expand Up @@ -131,11 +125,6 @@ public void onError(final List<LogRecord> data, final Throwable t) {

}

@Override
public void onExit() {

}

private LogData transLogData(LogRecord logRecord) throws InvalidProtocolBufferException {
LogData.Builder builder = LogData.newBuilder();
LogDataBody.Builder bodyBuilder = LogDataBody.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
Expand Down Expand Up @@ -85,11 +84,6 @@ public boolean isEnabled() {
return setting.isEnableKafkaTrace();
}

@Override
public void init(final Properties properties) {

}

@Override
public void consume(final List<SegmentRecord> data) {
for (SegmentRecord segmentRecord : data) {
Expand Down Expand Up @@ -122,9 +116,4 @@ public void consume(final List<SegmentRecord> data) {
public void onError(final List<SegmentRecord> data, final Throwable t) {

}

@Override
public void onExit() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@
package org.apache.skywalking.oap.server.core.analysis.worker;

import java.util.List;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
Expand Down Expand Up @@ -130,10 +129,6 @@ private void flush() {
}

private class AggregatorConsumer implements IConsumer<Metrics> {
@Override
public void init(final Properties properties) {
}

@Override
public void consume(List<Metrics> data) {
MetricsAggregateWorker.this.onWork(data);
Expand All @@ -144,10 +139,6 @@ public void onError(List<Metrics> data, Throwable t) {
log.error(t.getMessage(), t);
}

@Override
public void onExit() {
}

@Override
public void nothingToConsume() {
flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
Expand Down Expand Up @@ -401,11 +400,6 @@ private Metrics requireInitialization(Metrics metrics) {
* ID is declared through {@link Object#hashCode()} and {@link Object#equals(Object)} as usual.
*/
private class PersistentConsumer implements IConsumer<Metrics> {
@Override
public void init(final Properties properties) {

}

@Override
public void consume(List<Metrics> data) {
MetricsPersistentWorker.this.onWork(data);
Expand All @@ -415,9 +409,5 @@ public void consume(List<Metrics> data) {
public void onError(List<Metrics> data, Throwable t) {
log.error(t.getMessage(), t);
}

@Override
public void onExit() {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeBufferedData;
import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;

/**
Expand Down Expand Up @@ -97,10 +96,6 @@ public void in(TopN n) {
}

private class TopNConsumer implements IConsumer<TopN> {
@Override
public void init(final Properties properties) {
}

@Override
public void consume(List<TopN> data) {
TopNWorker.this.onWork(data);
Expand All @@ -110,10 +105,5 @@ public void consume(List<TopN> data) {
public void onError(List<TopN> data, Throwable t) {
log.error(t.getMessage(), t);
}

@Override
public void onExit() {

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,16 @@
import io.netty.handler.ssl.SslContext;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
Expand Down Expand Up @@ -154,10 +153,6 @@ public void push(String nextWorkerName, StreamData streamData) {
}

class RemoteMessageConsumer implements IConsumer<RemoteMessage> {
@Override
public void init(final Properties properties) {
}

@Override
public void consume(List<RemoteMessage> remoteMessages) {
try {
Expand All @@ -177,10 +172,6 @@ public void consume(List<RemoteMessage> remoteMessages) {
public void onError(List<RemoteMessage> remoteMessages, Throwable t) {
log.error(t.getMessage(), t);
}

@Override
public void onExit() {
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,20 @@
import java.util.Properties;

public interface IConsumer<T> {
void init(final Properties properties);
default void init(final Properties properties) {
}

void consume(List<T> data);

void onError(List<T> data, Throwable t);

void onExit();
default void onExit() {
}

/**
* Notify the implementation, if there is nothing fetched from the queue. This could be used as a timer to trigger
* reaction if the queue has no element.
*/
default void nothingToConsume() {
return;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.skywalking.oap.server.library.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.oap.server.library.datacarrier.buffer.Channels;
import org.apache.skywalking.oap.server.library.datacarrier.buffer.QueueBuffer;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.datacarrier.partition.ProducerThreadPartitioner;
import org.apache.skywalking.oap.server.library.datacarrier.partition.SimpleRollingPartitioner;

import org.junit.Assert;
import org.junit.Test;
import org.powermock.api.support.membermodification.MemberModifier;
Expand Down Expand Up @@ -119,13 +117,6 @@ public void testBlockingProduce() throws IllegalAccessException {
e.printStackTrace();
}
IConsumer<SampleData> consumer = new IConsumer<SampleData>() {
int i = 0;

@Override
public void init(final Properties properties) {

}

@Override
public void consume(List<SampleData> data) {

Expand All @@ -135,11 +126,6 @@ public void consume(List<SampleData> data) {
public void onError(List<SampleData> data, Throwable t) {

}

@Override
public void onExit() {

}
};
carrier.consume(consumer, 1);
}).start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.SampleData;
Expand Down Expand Up @@ -104,11 +103,6 @@ class SampleConsumer2 implements IConsumer<SampleData> {

public boolean onError = false;

@Override
public void init(final Properties properties) {

}

@Override
public void consume(List<SampleData> data) {
if (onError) {
Expand All @@ -120,11 +114,6 @@ public void consume(List<SampleData> data) {
public void onError(List<SampleData> data, Throwable t) {
IS_OCCUR_ERROR = true;
}

@Override
public void onExit() {

}
}

private IConsumer getConsumer(DataCarrier<SampleData> carrier) throws IllegalAccessException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,11 @@
package org.apache.skywalking.oap.server.library.datacarrier.consumer;

import java.util.List;
import java.util.Properties;
import org.apache.skywalking.oap.server.library.datacarrier.SampleData;

public class SampleConsumer implements IConsumer<SampleData> {
public int i = 1;

@Override
public void init(final Properties properties) {

}

@Override
public void consume(List<SampleData> data) {
for (SampleData one : data) {
Expand All @@ -42,9 +36,4 @@ public void consume(List<SampleData> data) {
public void onError(List<SampleData> data, Throwable t) {

}

@Override
public void onExit() {

}
}
Loading