Skip to content

Commit

Permalink
Create a SteadyTime type in the Manager
Browse files Browse the repository at this point in the history
This replaces Manager.getSteadyTime() long value with a concrete type to
make it more apparent what time is being used. The serialization
and deserialization logic have been encapsulated as methods to make the
conversion consistent.

This closes apache#4482
  • Loading branch information
cshannon committed Apr 26, 2024
1 parent 65dd34f commit 5ed2eff
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.core.util.time.NanoTime;
import org.apache.accumulo.manager.ManagerTime.SteadyTime;
import org.apache.accumulo.manager.metrics.ManagerMetrics;
import org.apache.accumulo.manager.recovery.RecoveryManager;
import org.apache.accumulo.manager.state.TableCounts;
Expand Down Expand Up @@ -1730,7 +1731,7 @@ public void removeBulkImportStatus(String directory) {
* approximately monotonic clock, which will be approximately consistent between different
* managers or different runs of the same manager.
*/
public Long getSteadyTime() {
public SteadyTime getSteadyTime() {
return timeKeeper.getTime();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static java.util.concurrent.TimeUnit.SECONDS;

import java.io.IOException;
import java.util.Comparator;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.accumulo.core.Constants;
Expand All @@ -35,6 +37,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

/**
* Keep a persistent roughly monotone view of how long a manager has been overseeing this cluster.
*/
Expand All @@ -58,8 +62,7 @@ public ManagerTime(Manager manager, AccumuloConfiguration conf) throws IOExcepti

try {
zk.putPersistentData(zPath, "0".getBytes(UTF_8), NodeExistsPolicy.SKIP);
skewAmount =
new AtomicLong(Long.parseLong(new String(zk.getData(zPath), UTF_8)) - System.nanoTime());
skewAmount = new AtomicLong(updatedSkew(zk.getData(zPath)));
} catch (Exception ex) {
throw new IOException("Error updating manager time", ex);
}
Expand All @@ -74,8 +77,8 @@ public ManagerTime(Manager manager, AccumuloConfiguration conf) throws IOExcepti
*
* @return Approximate total duration this cluster has had a Manager, in milliseconds.
*/
public long getTime() {
return NANOSECONDS.toMillis(System.nanoTime() + skewAmount.get());
public SteadyTime getTime() {
return fromSkew(skewAmount.get());
}

public void run() {
Expand All @@ -86,8 +89,7 @@ public void run() {
case INITIAL:
case STOP:
try {
long zkTime = Long.parseLong(new String(zk.getData(zPath), UTF_8));
skewAmount.set(zkTime - System.nanoTime());
skewAmount.set(updatedSkew(zk.getData(zPath)));
} catch (Exception ex) {
if (log.isDebugEnabled()) {
log.debug("Failed to retrieve manager tick time", ex);
Expand All @@ -101,8 +103,7 @@ public void run() {
case UNLOAD_METADATA_TABLETS:
case UNLOAD_ROOT_TABLET:
try {
zk.putPersistentData(zPath,
Long.toString(System.nanoTime() + skewAmount.get()).getBytes(UTF_8),
zk.putPersistentData(zPath, fromSkew(skewAmount.get()).serialize(),
NodeExistsPolicy.OVERWRITE);
} catch (Exception ex) {
if (log.isDebugEnabled()) {
Expand All @@ -111,4 +112,71 @@ public void run() {
}
}
}

@VisibleForTesting
static long updatedSkew(byte[] steadyTime) {
return SteadyTime.deserialize(steadyTime).getTimeNs() - System.nanoTime();
}

@VisibleForTesting
static SteadyTime fromSkew(long skewAmount) {
return new SteadyTime(System.nanoTime() + skewAmount);
}

public static class SteadyTime implements Comparable<SteadyTime> {
public static final Comparator<SteadyTime> STEADY_TIME_COMPARATOR =
Comparator.comparingLong(SteadyTime::getTimeNs);

private final long time;

public SteadyTime(long time) {
this.time = time;
}

public long getTimeMillis() {
return NANOSECONDS.toMillis(time);
}

public long getTimeNs() {
return time;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SteadyTime that = (SteadyTime) o;
return time == that.time;
}

@Override
public int hashCode() {
return Objects.hashCode(time);
}

@Override
public int compareTo(SteadyTime that) {
return STEADY_TIME_COMPARATOR.compare(this, that);
}

byte[] serialize() {
return serialize(this);
}

static SteadyTime deserialize(byte[] steadyTime) {
return from(Long.parseLong(new String(steadyTime, UTF_8)));
}

static byte[] serialize(SteadyTime steadyTime) {
return Long.toString(steadyTime.getTimeNs()).getBytes(UTF_8);
}

public static SteadyTime from(long time) {
return new SteadyTime(time);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ public void run() {
Manager.log.trace("[{}] Requesting TabletServer {} unload {} {}", store.name(),
location.getServerInstance(), tls.extent, goal.howUnload());
client.unloadTablet(manager.managerLock, tls.extent, goal.howUnload(),
manager.getSteadyTime());
manager.getSteadyTime().getTimeMillis());
unloaded++;
totalUnloaded++;
} else {
Expand Down Expand Up @@ -449,7 +449,7 @@ private void hostUnassignedTablet(TabletLists tLists, KeyExtent tablet,

private void hostSuspendedTablet(TabletLists tLists, TabletLocationState tls, Location location,
TableConfiguration tableConf) {
if (manager.getSteadyTime() - tls.suspend.suspensionTime
if (manager.getSteadyTime().getTimeMillis() - tls.suspend.suspensionTime
< tableConf.getTimeInMillis(Property.TABLE_SUSPEND_DURATION)) {
// Tablet is suspended. See if its tablet server is back.
TServerInstance returnInstance = null;
Expand Down Expand Up @@ -1381,7 +1381,7 @@ private void handleDeadTablets(TabletLists tLists, WalStateManager wals)
deadTablets.subList(0, maxServersToShow));
Manager.log.debug("logs for dead servers: {}", deadLogs);
if (canSuspendTablets()) {
store.suspend(deadTablets, deadLogs, manager.getSteadyTime());
store.suspend(deadTablets, deadLogs, manager.getSteadyTime().getTimeMillis());
} else {
store.unassign(deadTablets, deadLogs);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.manager;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.TimeUnit;

import org.apache.accumulo.manager.ManagerTime.SteadyTime;
import org.junit.jupiter.api.Test;

public class ManagerTimeTest {

@Test
public void testSteadyTime() {
long time = System.nanoTime();
var steadyTime = SteadyTime.from(time);
assertEquals(time, steadyTime.getTimeNs());
assertEquals(TimeUnit.NANOSECONDS.toMillis(time), steadyTime.getTimeMillis());

// make sure calling serialize on instance matches static helper
byte[] serialized = steadyTime.serialize();
assertArrayEquals(serialized, SteadyTime.serialize(steadyTime));

// Verify deserialization matches original object
var deserialized = SteadyTime.deserialize(serialized);
assertEquals(steadyTime, deserialized);
assertEquals(0, steadyTime.compareTo(deserialized));
}

@Test
public void testSteadyTimeSkew() {
var time = System.nanoTime();
long skewAmount = 200000;
var skewed = ManagerTime.fromSkew(skewAmount);
assertTrue(skewed.getTimeNs() > time + skewAmount);
assertTrue(skewed.compareTo(SteadyTime.from(time + skewAmount)) > 0);

var steadyTime = SteadyTime.from(System.nanoTime());
var newSkew = ManagerTime.updatedSkew(steadyTime.serialize());
assertTrue(skewed.getTimeNs() > newSkew);
assertTrue(skewed.compareTo(SteadyTime.from(newSkew)) > 0);
}
}

0 comments on commit 5ed2eff

Please sign in to comment.