Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.generated.${packageName};

import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
<#if (metrics?size>0)>
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsProcess;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
<#list metrics as metrics>
<#if metrics.filterExpressions??>
import org.apache.skywalking.oap.server.core.analysis.metrics.expression.*;
Expand Down Expand Up @@ -66,7 +66,8 @@ public class ${source}Dispatcher implements SourceDispatcher<${source}> {
metrics.${field.fieldSetter}(source.${field.fieldGetter}());
</#list>
metrics.${metrics.entryMethod.methodName}(<#list metrics.entryMethod.argsExpressions as arg>${arg}<#if arg_has_next>, </#if></#list>);
MetricsProcess.INSTANCE.in(metrics);

MetricsStreamProcessor.getInstance().in(metrics);
}
</#list>
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,19 @@ import org.apache.skywalking.oap.server.core.Const;
<#break>
</#if>
</#list>
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.metrics.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.MetricsType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;

/**
* This class is auto generated. Please don't change this class manually.
*
* @author Observability Analysis Language code generator
*/
@MetricsType
@StreamData
@StorageEntity(name = "${tableName}", builder = ${metricsName}Metrics.Builder.class, sourceScopeId = ${sourceScopeId})
@Stream(name = "${tableName}", scopeId = ${sourceScopeId}, storage = @Storage(builder = ${metricsName}Metrics.Builder.class), processor = MetricsStreamProcessor.class)
public class ${metricsName}Metrics extends ${metricsClassName} implements WithMetadata {

<#list fieldsFromSource as sourceField>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,19 @@
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.metrics.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.MetricsType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;

/**
* This class is auto generated. Please don't change this class manually.
*
* @author Observability Analysis Language code generator
*/
@MetricsType
@StreamData
@StorageEntity(name = "service_avg", builder = ServiceAvgMetrics.Builder.class, sourceScopeId = 1)
@Stream(name = "service_avg", scopeId = 1, storage = @Storage(builder = ServiceAvgMetrics.Builder.class), processor = MetricsStreamProcessor.class)
public class ServiceAvgMetrics extends LongAvgMetrics implements WithMetadata {

@Setter @Getter @Column(columnName = "entity_id") @IDColumn private java.lang.String entityId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.core.analysis.generated.service;

import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsProcess;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.metrics.expression.*;
import org.apache.skywalking.oap.server.core.source.*;

Expand Down Expand Up @@ -47,6 +47,7 @@ private void doServiceAvg(Service source) {
metrics.setTimeBucket(source.getTimeBucket());
metrics.setEntityId(source.getEntityId());
metrics.combine(source.getLatency(), 1);
MetricsProcess.INSTANCE.in(metrics);

MetricsStreamProcessor.getInstance().in(metrics);
}
}
1 change: 0 additions & 1 deletion oap-server/generate-tool/src/test/resources/oal_test.oal
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,4 @@ instance_jvm_old_gc_time = from(ServiceInstanceJVMGC.time).filter(phrase == GCPh
instance_jvm_young_gc_count = from(ServiceInstanceJVMGC.count).filter(phrase == GCPhrase.NEW).sum();
instance_jvm_old_gc_count = from(ServiceInstanceJVMGC.count).filter(phrase == GCPhrase.OLD).sum();


// endpoint_Avg_for_prod_serv = from(Endpoint.latency).filter(name == "/product/service").longAvg();
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.skywalking.oap.server.library.util.ResourceUtils;

public class AlarmModuleProvider extends ModuleProvider {

private NotifyHandler notifyHandler;

@Override public String name() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.remote.define.*;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.storage.model.*;
Expand Down Expand Up @@ -79,9 +79,11 @@ private void addServerInterface(List<Class> classes) {
}

private void addInsideService(List<Class> classes) {
classes.add(IModelSetter.class);
classes.add(IModelGetter.class);
classes.add(IModelOverride.class);
classes.add(StreamDataClassGetter.class);
classes.add(StreamDataMappingGetter.class);
classes.add(StreamDataMappingSetter.class);
classes.add(RemoteClientManager.class);
classes.add(RemoteSenderService.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,21 @@
package org.apache.skywalking.oap.server.core;

import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.MetricsTypeListener;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener;
import org.apache.skywalking.oap.server.core.analysis.topn.annotation.TopNTypeListener;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.config.*;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.*;
import org.apache.skywalking.oap.server.core.remote.annotation.*;
import org.apache.skywalking.oap.server.core.remote.client.*;
import org.apache.skywalking.oap.server.core.remote.define.*;
import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageModels;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.core.worker.*;
Expand All @@ -46,32 +42,27 @@
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.slf4j.*;

/**
* @author peng-yongsheng
*/
public class CoreModuleProvider extends ModuleProvider {

private static final Logger logger = LoggerFactory.getLogger(CoreModuleProvider.class);

private final CoreModuleConfig moduleConfig;
private GRPCServer grpcServer;
private JettyServer jettyServer;
private RemoteClientManager remoteClientManager;
private final AnnotationScan annotationScan;
private final StorageAnnotationListener storageAnnotationListener;
private final StreamAnnotationListener streamAnnotationListener;
private final StreamDataAnnotationContainer streamDataAnnotationContainer;
private final StorageModels storageModels;
private final StreamDataMapping streamDataMapping;
private final SourceReceiverImpl receiver;

public CoreModuleProvider() {
super();
this.moduleConfig = new CoreModuleConfig();
this.annotationScan = new AnnotationScan();
this.storageAnnotationListener = new StorageAnnotationListener();
this.streamAnnotationListener = new StreamAnnotationListener();
this.streamDataAnnotationContainer = new StreamDataAnnotationContainer();
this.streamDataMapping = new StreamDataMapping();
this.storageModels = new StorageModels();
this.receiver = new SourceReceiverImpl();
}

Expand Down Expand Up @@ -120,15 +111,17 @@ public CoreModuleProvider() {

this.registerServiceImplementation(SourceReceiver.class, receiver);

this.registerServiceImplementation(StreamDataClassGetter.class, streamDataAnnotationContainer);
this.registerServiceImplementation(StreamDataMappingGetter.class, streamDataMapping);
this.registerServiceImplementation(StreamDataMappingSetter.class, streamDataMapping);

WorkerInstancesService instancesService = new WorkerInstancesService();
this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService);
this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);

this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
this.registerServiceImplementation(IModelGetter.class, storageAnnotationListener);
this.registerServiceImplementation(IModelOverride.class, storageAnnotationListener);
this.registerServiceImplementation(IModelSetter.class, storageModels);
this.registerServiceImplementation(IModelGetter.class, storageModels);
this.registerServiceImplementation(IModelOverride.class, storageModels);

this.registerServiceImplementation(ServiceInventoryCache.class, new ServiceInventoryCache(getManager()));
this.registerServiceImplementation(IServiceInventoryRegister.class, new ServiceInventoryRegister(getManager()));
Expand All @@ -151,12 +144,7 @@ public CoreModuleProvider() {
this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager()));
this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager()));

annotationScan.registerListener(storageAnnotationListener);
annotationScan.registerListener(streamAnnotationListener);
annotationScan.registerListener(new MetricsTypeListener(getManager()));
annotationScan.registerListener(new InventoryTypeListener(getManager()));
annotationScan.registerListener(new RecordTypeListener(getManager()));
annotationScan.registerListener(new TopNTypeListener(getManager()));
annotationScan.registerListener(new StreamAnnotationListener(getManager()));

this.remoteClientManager = new RemoteClientManager(getManager());
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
Expand All @@ -171,7 +159,6 @@ public CoreModuleProvider() {
receiver.scan();

annotationScan.scan(() -> {
streamDataAnnotationContainer.generate(streamAnnotationListener.getStreamClasses());
});
} catch (IOException | IllegalAccessException | InstantiationException e) {
throw new ModuleStartException(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordType;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
Expand All @@ -34,9 +35,8 @@
*/
@Getter
@Setter
@RecordType
@ScopeDeclaration(id = ALARM, name = "Alarm")
@StorageEntity(name = AlarmRecord.INDEX_NAME, builder = AlarmRecord.Builder.class, sourceScopeId = DefaultScopeDefine.ALARM)
@Stream(name = AlarmRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ALARM, storage = @Storage(builder = AlarmRecord.Builder.class), processor = RecordStreamProcessor.class)
public class AlarmRecord extends Record {

public static final String INDEX_NAME = "alarm_record";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.core.alarm;

import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
import org.slf4j.*;

Expand Down Expand Up @@ -47,7 +47,7 @@ public class AlarmStandardPersistence implements AlarmCallback {
record.setStartTime(message.getStartTime());
record.setTimeBucket(TimeBucketUtils.INSTANCE.getSecondTimeBucket(message.getStartTime()));

RecordProcess.INSTANCE.in(record);
RecordStreamProcessor.getInstance().in(record);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@
/**
* @author peng-yongsheng
*/
public interface SourceDispatcher<S extends Source> {
void dispatch(S source);
public interface SourceDispatcher<SOURCE extends Source> {
void dispatch(SOURCE source);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,23 @@
*
*/

package org.apache.skywalking.oap.server.core.remote.annotation;
package org.apache.skywalking.oap.server.core.analysis;

import java.lang.annotation.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;

/**
* @author peng-yongsheng
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface StreamData {
public @interface Stream {

String name();

int scopeId();

Storage storage();

Class<? extends StreamProcessor> processor();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.analysis;

import java.lang.annotation.Annotation;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.worker.*;
import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;

/**
* @author peng-yongsheng
*/
public class StreamAnnotationListener implements AnnotationListener {

private final ModuleDefineHolder moduleDefineHolder;

public StreamAnnotationListener(ModuleDefineHolder moduleDefineHolder) {
this.moduleDefineHolder = moduleDefineHolder;
}

@Override public Class<? extends Annotation> annotation() {
return Stream.class;
}

@SuppressWarnings("unchecked")
@Override public void notify(Class aClass) {
if (aClass.isAnnotationPresent(Stream.class)) {
Stream stream = (Stream)aClass.getAnnotation(Stream.class);

if (stream.processor().equals(InventoryStreamProcessor.class)) {
InventoryStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, false, stream.name(), stream.scopeId(), stream.storage());
} else if (stream.processor().equals(RecordStreamProcessor.class)) {
RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, false, stream.name(), stream.scopeId(), stream.storage());
} else if (stream.processor().equals(MetricsStreamProcessor.class)) {
MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, true, stream.name(), stream.scopeId(), stream.storage());
} else if (stream.processor().equals(TopNStreamProcessor.class)) {
TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, false, stream.name(), stream.scopeId(), stream.storage());
} else {
throw new UnexpectedException("Unknown stream processor.");
}
} else {
throw new UnexpectedException("Stream annotation listener could only parse the class present stream annotation.");
}
}
}
Loading