Skip to content

Commit

Permalink
Support multiple implementations of StorageBuilder in different stora…
Browse files Browse the repository at this point in the history
…ge implementations - stage 2 (#6336)
  • Loading branch information
wu-sheng committed Feb 7, 2021
1 parent fc23dab commit 5e8f1eb
Show file tree
Hide file tree
Showing 19 changed files with 192 additions and 274 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.skywalking.oap.server.core.oal.rt.OALCompileException;
import org.apache.skywalking.oap.server.core.oal.rt.OALDefine;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
Expand All @@ -80,7 +81,6 @@ public class OALRuntime implements OALEngine {
private static final String CLASS_FILE_CHARSET = "UTF-8";
private static final String METRICS_FUNCTION_PACKAGE = "org.apache.skywalking.oap.server.core.analysis.metrics.";
private static final String WITH_METADATA_INTERFACE = "org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata";
private static final String STORAGE_BUILDER_INTERFACE = "org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder";
private static final String DISPATCHER_INTERFACE = "org.apache.skywalking.oap.server.core.analysis.SourceDispatcher";
private static final String METRICS_STREAM_PROCESSOR = "org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor";
private static final String[] METRICS_CLASS_METHODS = {
Expand All @@ -107,6 +107,7 @@ public class OALRuntime implements OALEngine {
private AllDispatcherContext allDispatcherContext;
private StreamAnnotationListener streamAnnotationListener;
private DispatcherDetectorListener dispatcherDetectorListener;
private StorageBuilderFactory storageBuilderFactory;
private final List<Class> metricsClasses;
private final List<Class> dispatcherClasses;
private final boolean openEngineDebug;
Expand All @@ -133,6 +134,11 @@ public void setDispatcherListener(DispatcherDetectorListener listener) throws Mo
dispatcherDetectorListener = listener;
}

@Override
public void setStorageBuilderFactory(final StorageBuilderFactory factory) {
storageBuilderFactory = factory;
}

@Override
public void start(ClassLoader currentClassLoader) throws ModuleStartException, OALCompileException {
if (!IS_RT_TEMP_FOLDER_INIT_COMPLETED) {
Expand Down Expand Up @@ -318,7 +324,7 @@ private void generateMetricsBuilderClass(AnalysisResult metricsStmt) throws OALC
String className = metricsBuilderClassName(metricsStmt, false);
CtClass metricsBuilderClass = classPool.makeClass(metricsBuilderClassName(metricsStmt, true));
try {
metricsBuilderClass.addInterface(classPool.get(STORAGE_BUILDER_INTERFACE));
metricsBuilderClass.addInterface(classPool.get(storageBuilderFactory.builderTemplate().getSuperClass()));
} catch (NotFoundException e) {
log.error("Can't find StorageBuilder interface for " + className + ".", e);
throw new OALCompileException(e.getMessage(), e);
Expand All @@ -342,7 +348,9 @@ private void generateMetricsBuilderClass(AnalysisResult metricsStmt) throws OALC
for (String method : METRICS_BUILDER_CLASS_METHODS) {
StringWriter methodEntity = new StringWriter();
try {
configuration.getTemplate("metrics-builder/" + method + ".ftl").process(metricsStmt, methodEntity);
configuration
.getTemplate(storageBuilderFactory.builderTemplate().getTemplatePath() + "/" + method + ".ftl")
.process(metricsStmt, methodEntity);
metricsBuilderClass.addMethod(CtNewMethod.make(methodEntity.toString(), metricsBuilderClass));
} catch (Exception e) {
log.error("Can't generate method " + method + " for " + className + ".", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public void notify(Class aClass) throws StorageException {
if (aClass.isAnnotationPresent(Stream.class)) {
Stream stream = (Stream) aClass.getAnnotation(Stream.class);

if (DisableRegister.INSTANCE.include(stream.name())) {
return;
}

if (stream.processor().equals(RecordStreamProcessor.class)) {
RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(MetricsStreamProcessor.class)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,24 @@

package org.apache.skywalking.oap.server.core.analysis.worker;

import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;

/**
Expand All @@ -60,15 +62,16 @@ public void in(final ManagementData managementData) {

@Override
public void create(final ModuleDefineHolder moduleDefineHolder, final Stream stream, final Class<? extends ManagementData> streamClass) throws StorageException {
if (DisableRegister.INSTANCE.include(stream.name())) {
return;
}
final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
.provider()
.getService(StorageBuilderFactory.class);
final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(streamClass, stream.builder());

StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IManagementDAO managementDAO;
try {
managementDAO = storageDAO.newManagementDao(stream.builder().newInstance());
} catch (InstantiationException | IllegalAccessException e) {
managementDAO = storageDAO.newManagementDao(builder.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
throw new UnexpectedException("Create " + stream.builder()
.getSimpleName() + " none stream record DAO failure.", e);
}
Expand Down
Loading

0 comments on commit 5e8f1eb

Please sign in to comment.