Skip to content

Commit

Permalink
Merge pull request #19 from rborer/pool-remove-unused-stats
Browse files Browse the repository at this point in the history
Remove unused stats
  • Loading branch information
ztellman committed Jul 18, 2018
2 parents a71ee4d + 61eab46 commit f8d90d8
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
18 changes: 12 additions & 6 deletions src/io/aleph/dirigiste/Pool.java
Expand Up @@ -201,7 +201,6 @@ public boolean take(AcquireCallback<V> c, boolean skipToFront) throws RejectedEx
private final Generator<K,V> _generator;
private final Controller<K> _controller;

private Map<K,Stats> _stats;
private boolean _isShutdown = false;

private final AtomicInteger _numObjects = new AtomicInteger(0);
Expand Down Expand Up @@ -283,7 +282,6 @@ private void addObject(K key) {

private void startControlLoop(int duration, int iterations) {

double samplesPerSecond = 1000.0 / duration;
int iteration = 0;

try {
Expand Down Expand Up @@ -317,8 +315,8 @@ private void startControlLoop(int duration, int iterations) {

// update worker count
if (iteration == 0) {
_stats = updateStats();
Map<K,Integer> adjustment = _controller.adjustment(_stats);
final Map<K,Stats> _stats = updateStats();
final Map<K,Integer> adjustment = _controller.adjustment(_stats);

// clear out any unused queues
_lock.lock();
Expand All @@ -327,6 +325,15 @@ private void startControlLoop(int duration, int iterations) {
if (entry.getValue().getUtilization(1) == 0
&& _queues.get(key).objects.get() == 0) {
_queues.remove(key).shutdown();

// clean up stats so they don't remain in memory forever
_queueLatencies.remove(key);
_taskLatencies.remove(key);
_queueLengths.remove(key);
_utilizations.remove(key);
_taskArrivalRates.remove(key);
_taskCompletionRates.remove(key);
_taskRejectionRates.remove(key);
}
}
_lock.unlock();
Expand All @@ -335,7 +342,7 @@ private void startControlLoop(int duration, int iterations) {
List<K> upward = new ArrayList<K>();

for (Map.Entry<K,Integer> entry : adjustment.entrySet()) {
int n = entry.getValue().intValue();
int n = entry.getValue();
if (n < 0) {
Queue q = queue(entry.getKey());
for (int i = 0; i < -n; i++) {
Expand All @@ -353,7 +360,6 @@ private void startControlLoop(int duration, int iterations) {
// a random subset
Collections.shuffle(upward);
for (K key : upward) {
Queue q = queue(key);
addObject(key);
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/io/aleph/dirigiste/Stats.java
Expand Up @@ -116,6 +116,10 @@ public Map<K,long[]> toMap() {
}
return m;
}

public void remove(K key) {
_reservoirs.remove(key);
}
}

public static class UniformDoubleReservoirMap<K> {
Expand All @@ -139,6 +143,10 @@ public Map<K,double[]> toMap() {
}
return m;
}

public void remove(K key) {
_reservoirs.remove(key);
}
}

public static double lerp(long low, long high, double t) {
Expand Down Expand Up @@ -240,7 +248,7 @@ public static double mean(long[] vals) {
private final long[] _queueLatencies;
private final long[] _taskLatencies;

public static Stats EMPTY = new Stats(EnumSet.noneOf(Metric.class), 0, new double[] {}, new double[] {}, new double[] {}, new double[] {}, new long[] {}, new long[] {}, new long[] {});
public static final Stats EMPTY = new Stats(EnumSet.noneOf(Metric.class), 0, new double[] {}, new double[] {}, new double[] {}, new double[] {}, new long[] {}, new long[] {}, new long[] {});

public Stats(EnumSet<Metric> metrics, int numWorkers, double[] utilizations, double[] taskArrivalRates, double[] taskCompletionRates, double[] taskRejectionRates, long[] queueLengths, long[] queueLatencies, long[] taskLatencies) {
_metrics = metrics;
Expand Down

0 comments on commit f8d90d8

Please sign in to comment.