Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protect the background processes. #741

Merged
merged 2 commits into from
Jan 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.FileUtils;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -70,7 +71,10 @@ public synchronized void initialize() throws IOException {
offset.deserialize(offsetRecord);
initialized = true;

Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::flush, 10, 3, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(this::flush,
t -> logger.error("flush offset file in background failure.", t)
), 10, 3, TimeUnit.SECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.apache.skywalking.apm.network.proto.UpstreamSegment;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,7 +51,9 @@ public enum SegmentBufferReader {

public void initialize(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::preRead, 3, 3, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(this::preRead,
t -> logger.error("Segment buffer pre read failure.", t)), 3, 3, TimeUnit.SECONDS);
}

public void setSegmentParserListenerManager(SegmentParserListenerManager listenerManager) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,7 +35,7 @@
*/
public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker<SegmentStandardization, SegmentStandardization> {

private final Logger logger = LoggerFactory.getLogger(SegmentStandardizationWorker.class);
private static final Logger logger = LoggerFactory.getLogger(SegmentStandardizationWorker.class);

public SegmentStandardizationWorker(ModuleManager moduleManager) {
super(moduleManager);
Expand Down Expand Up @@ -70,7 +71,9 @@ public Factory(ModuleManager moduleManager) {
}

private void startTimer(SegmentStandardizationWorker standardizationWorker) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(standardizationWorker::flushAndSwitch, 10, 3, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(standardizationWorker::flushAndSwitch,
t -> logger.error("Segment standardization failure.", t)), 10, 3, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,7 +50,9 @@ public void start(ModuleManager moduleManager, List<PersistenceWorker> persisten
// final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000;
final long timeInterval = 3;
IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> extractDataAndSave(batchDAO, persistenceWorkers), 1, timeInterval, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO, persistenceWorkers),
t -> logger.error("Extract data and save failure.", t)), 1, timeInterval, TimeUnit.SECONDS);
}

private void extractDataAndSave(IBatchDAO batchDAO, List<PersistenceWorker> persistenceWorkers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@
import org.apache.skywalking.apm.collector.storage.dao.memorymp.IMemorySecondMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.mpoolmp.IMemoryPoolSecondMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author peng-yongsheng
*/
public class DataTTLKeeperTimer {
private final Logger logger = LoggerFactory.getLogger(StorageModuleEsProvider.class);

private final ModuleManager moduleManager;
private final StorageModuleEsNamingListener namingListener;
Expand All @@ -55,7 +59,9 @@ public DataTTLKeeperTimer(ModuleManager moduleManager,
}

public void start() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::delete, 1, 8, TimeUnit.HOURS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(this::delete,
t -> logger.error("Remove data in background failure.", t)), 1, 8, TimeUnit.HOURS);
}

private void delete() {
Expand Down
5 changes: 5 additions & 0 deletions apm-collector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
</modules>

<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.util;

/**
* @author wusheng
*/
public class RunnableWithExceptionProtection implements Runnable {
private Runnable run;
private CallbackWhenException callback;

public RunnableWithExceptionProtection(Runnable run, CallbackWhenException callback) {
this.run = run;
this.callback = callback;
}

@Override
public void run() {
try {
run.run();
} catch (Throwable t) {
callback.handle(t);
}
}

public interface CallbackWhenException {
void handle(Throwable t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*
*/


package org.apache.skywalking.apm.agent.core.jvm;

import io.grpc.ManagedChannel;
Expand All @@ -43,6 +42,7 @@
import org.apache.skywalking.apm.network.proto.JVMMetric;
import org.apache.skywalking.apm.network.proto.JVMMetrics;
import org.apache.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;

/**
* The <code>JVMService</code> represents a timer,
Expand All @@ -57,6 +57,7 @@ public class JVMService implements BootService, Runnable {
private volatile ScheduledFuture<?> collectMetricFuture;
private volatile ScheduledFuture<?> sendMetricFuture;
private Sender sender;

@Override
public void beforeBoot() throws Throwable {
queue = new LinkedBlockingQueue(Config.Jvm.BUFFER_SIZE);
Expand All @@ -68,10 +69,19 @@ public void beforeBoot() throws Throwable {
public void boot() throws Throwable {
collectMetricFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-produce"))
.scheduleAtFixedRate(this, 0, 1, TimeUnit.SECONDS);
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("JVMService produces metrics failure.", t);
}
}), 0, 1, TimeUnit.SECONDS);
sendMetricFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-consume"))
.scheduleAtFixedRate(sender, 0, 1, TimeUnit.SECONDS);
.scheduleAtFixedRate(new RunnableWithExceptionProtection(sender, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("JVMService consumes and upload failure.", t);
}
}
), 0, 1, TimeUnit.SECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
import org.apache.skywalking.apm.network.proto.NetworkAddressRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;

/**
* @author wusheng
Expand Down Expand Up @@ -87,7 +88,11 @@ public void beforeBoot() throws Throwable {
public void boot() throws Throwable {
applicationRegisterFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("AppAndServiceRegisterClient"))
.scheduleAtFixedRate(this, 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*
*/


package org.apache.skywalking.apm.agent.core.remote;

import java.util.concurrent.Executors;
Expand All @@ -25,13 +24,17 @@
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;

/**
* The <code>CollectorDiscoveryService</code> is responsible for start {@link DiscoveryRestServiceClient}.
*
* @author wusheng
*/
public class CollectorDiscoveryService implements BootService {
private static final ILog logger = LogManager.getLogger(CollectorDiscoveryService.class);
private ScheduledFuture<?> future;

@Override
Expand All @@ -42,7 +45,12 @@ public void beforeBoot() throws Throwable {
@Override
public void boot() throws Throwable {
future = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("CollectorDiscoveryService"))
.scheduleAtFixedRate(new DiscoveryRestServiceClient(), 0,
.scheduleAtFixedRate(new RunnableWithExceptionProtection(new DiscoveryRestServiceClient(),
new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0,
Config.Collector.DISCOVERY_CHECK_INTERVAL, TimeUnit.SECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;

/**
* @author wusheng
Expand All @@ -60,7 +61,11 @@ public void beforeBoot() throws Throwable {
public void boot() throws Throwable {
connectCheckFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("GRPCChannelManager"))
.scheduleAtFixedRate(this, 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;

/**
* The <code>SamplingService</code> take charge of how to sample the {@link TraceSegment}. Every {@link TraceSegment}s
Expand Down Expand Up @@ -66,12 +67,16 @@ public void boot() throws Throwable {
this.resetSamplingFactor();
ScheduledExecutorService service = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));
scheduledFuture = service.scheduleAtFixedRate(new Runnable() {
scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {
@Override
public void run() {
resetSamplingFactor();
}
}, 0, 3, TimeUnit.SECONDS);
}, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, 3, TimeUnit.SECONDS);
logger.debug("Agent sampling mechanism started. Sample {} traces in 10 seconds.", Config.Agent.SAMPLE_N_PER_3_SECS);
}
}
Expand Down