Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -22,14 +22,18 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.chaos.policies.Policy;
import org.apache.hadoop.hbase.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* Chaos monkey that given multiple policies will run actions against the cluster.
Expand All @@ -38,14 +42,16 @@ public class PolicyBasedChaosMonkey extends ChaosMonkey {

private static final Logger LOG = LoggerFactory.getLogger(PolicyBasedChaosMonkey.class);
private static final long ONE_SEC = 1000;
private static final long FIVE_SEC = 5 * ONE_SEC;
private static final long ONE_MIN = 60 * ONE_SEC;

public static final long TIMEOUT = ONE_MIN;

final IntegrationTestingUtility util;
final Properties monkeyProps;

private final Policy[] policies;
private final ExecutorService monkeyThreadPool;

/**
* Construct a new ChaosMonkey
* @param util the HBaseIntegrationTestingUtility already configured
Expand All @@ -60,19 +66,30 @@ public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Collection<Policy>
}

public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util,
Policy... policies) {
this.monkeyProps = monkeyProps;
this.util = util;
this.policies = policies;
Collection<Policy> policies) {
this(monkeyProps, util, policies.toArray(new Policy[0]));
}

public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util,
Collection<Policy> policies) {
Policy... policies) {
this.monkeyProps = monkeyProps;
this.util = util;
this.policies = policies.toArray(new Policy[policies.size()]);
this.util = Objects.requireNonNull(util);
this.policies = Objects.requireNonNull(policies);
if (policies.length == 0) {
throw new IllegalArgumentException("policies may not be empty");
}
this.monkeyThreadPool = buildMonkeyThreadPool(policies.length);
}

private static ExecutorService buildMonkeyThreadPool(final int size) {
return Executors.newFixedThreadPool(size, new ThreadFactoryBuilder()
.setDaemon(false)
.setNameFormat("ChaosMonkey-%d")
.setUncaughtExceptionHandler((t, e) -> {
throw new RuntimeException(e);
})
.build());
}

/** Selects a random item from the given items */
public static <T> T selectRandomItem(T[] items) {
Expand Down Expand Up @@ -114,50 +131,33 @@ public static <T> List<T> selectRandomItems(T[] items, float ratio) {
return originalItems.subList(startIndex, startIndex + selectedNumber);
}

private Policy[] policies;
private Thread[] monkeyThreads;

@Override
public void start() throws Exception {
monkeyThreads = new Thread[policies.length];
Policy.PolicyContext context = new Policy.PolicyContext(monkeyProps, this.util);
for (int i=0; i<policies.length; i++) {
policies[i].init(context);
Thread monkeyThread = new Thread(policies[i], "ChaosMonkey");
monkeyThread.start();
monkeyThreads[i] = monkeyThread;
final Policy.PolicyContext context = new Policy.PolicyContext(monkeyProps, util);
for (final Policy policy : policies) {
policy.init(context);
monkeyThreadPool.execute(policy);
}
}

@Override
public void stop(String why) {
if (policies == null) {
return;
}

// stop accepting new work (shouldn't be any with a fixed-size pool)
monkeyThreadPool.shutdown();
// notify all executing policies that it's time to halt.
for (Policy policy : policies) {
policy.stop(why);
}
}

@Override
public boolean isStopped() {
return policies[0].isStopped();
return monkeyThreadPool.isTerminated();
}

/**
* Wait for ChaosMonkey to stop.
* @throws InterruptedException
*/
@Override
public void waitForStop() throws InterruptedException {
if (monkeyThreads == null) {
return;
}
for (Thread monkeyThread : monkeyThreads) {
// TODO: bound the wait time per policy
monkeyThread.join();
}
monkeyThreadPool.awaitTermination(1, TimeUnit.MINUTES);
}

@Override
Expand Down