Skip to content

Commit

Permalink
Update style org.corfudb.runtime.view.* (#743)
Browse files Browse the repository at this point in the history
Update to new style guidelines package org.corfudb.runtime.view.*
  • Loading branch information
annym authored and no2chem committed Jun 20, 2017
1 parent 15ed9d3 commit a2cc666
Show file tree
Hide file tree
Showing 12 changed files with 509 additions and 315 deletions.
52 changes: 30 additions & 22 deletions runtime/src/main/java/org/corfudb/runtime/view/AbstractView.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
package org.corfudb.runtime.view;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import javax.annotation.Nonnull;

import lombok.extern.slf4j.Slf4j;

import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.exceptions.WrongEpochException;

import javax.annotation.Nonnull;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

/**
* All views inherit from AbstractView.
* <p>
* AbstractView requires a runtime, and provides a layoutHelper function.
* <p>
* The layoutHelper function is called whenever a view tries to access a layout.
*
* <p>AbstractView requires a runtime, and provides a layoutHelper function.</p>
*
* <p>The layoutHelper function is called whenever a view tries to access a layout.
* If the layoutHelper catches an exception which is due to connection issues
* or an incorrect epoch, it asks the runtime to invalidate that layout
* by reporting it to a layout server, and retries the function.
* <p>
* Created by mwei on 12/10/15.
* by reporting it to a layout server, and retries the function.</p>
*
* <p>Created by mwei on 12/10/15.</p>
*/
@Slf4j
public abstract class AbstractView {
Expand All @@ -40,20 +41,22 @@ public Layout getCurrentLayout() {
try {
return runtime.layout.get();
} catch (Exception ex) {
log.warn("Error executing remote call, invalidating view and retrying in {}s", runtime.retryRate, ex);
log.warn("Error executing remote call, invalidating view and retrying in {}s",
runtime.retryRate, ex);
runtime.invalidateLayout();
try {
Thread.sleep(runtime.retryRate * 1000);
} catch (InterruptedException ie) {
log.warn("Interrupted Exception when getting current layout.", ie);
}
}
}
}

/**
* Helper function for view to retrieve layouts.
* This function will retry the given function indefinitely, invalidating the view if there was a exception
* contacting the endpoint.
* This function will retry the given function indefinitely,
* invalidating the view if there was a exception contacting the endpoint.
*
* @param function The function to execute.
* @param <T> The return type of the function.
Expand All @@ -63,43 +66,48 @@ public Layout getCurrentLayout() {
* @param <D> Any exception the function may throw.
* @return The return value of the function.
*/
public <T, A extends RuntimeException, B extends RuntimeException, C extends RuntimeException, D extends RuntimeException>
T layoutHelper(LayoutFunction<Layout, T, A, B, C, D> function)
public <T, A extends RuntimeException, B extends RuntimeException, C extends RuntimeException,
D extends RuntimeException> T layoutHelper(LayoutFunction<Layout, T, A, B, C, D>
function)
throws A, B, C, D {
while (true) {
try {
return function.apply(runtime.layout.get());
} catch (RuntimeException re) {
if (re.getCause() instanceof TimeoutException) {
log.warn("Timeout executing remote call, invalidating view and retrying in {}s", runtime.retryRate);
log.warn("Timeout executing remote call, invalidating view and retrying in {}s",
runtime.retryRate);
runtime.invalidateLayout();
try {
Thread.sleep(runtime.retryRate * 1000);
} catch (InterruptedException ie) {
log.warn("Interrupted Exception in layout helper.", ie);
}
} else if (re instanceof WrongEpochException){
} else if (re instanceof WrongEpochException) {
WrongEpochException we = (WrongEpochException) re;
log.warn("Got a wrong epoch exception, updating epoch to {} and invalidate view",
we.getCorrectEpoch());
log.warn("Got a wrong epoch exception, updating epoch to {} and "
+ "invalidate view", we.getCorrectEpoch());
Long newEpoch = (we.getCorrectEpoch());
runtime.nodeRouters.values().forEach(x -> x.setEpoch(newEpoch));
runtime.invalidateLayout();
} else {
throw re;
}
} catch (InterruptedException | ExecutionException ex) {
log.warn("Error executing remote call, invalidating view and retrying in {}s", runtime.retryRate, ex);
log.warn("Error executing remote call, invalidating view and retrying in {}s",
runtime.retryRate, ex);
runtime.invalidateLayout();
try {
Thread.sleep(runtime.retryRate * 1000);
} catch (InterruptedException ie) {
log.warn("Interrupted Exception in layout helper.", ie);
}
}
}
}

@FunctionalInterface
public interface LayoutFunction<v, R, A extends Throwable,
public interface LayoutFunction<V, R, A extends Throwable,
B extends Throwable, C extends Throwable, D extends Throwable> {
R apply(Layout l) throws A, B, C, D;
}
Expand Down
30 changes: 23 additions & 7 deletions runtime/src/main/java/org/corfudb/runtime/view/Address.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,46 @@
* to represent addresses. In the future, it may replace the
* use of long depending on performance.
*
* Created by mwei on 1/6/17.
* <p>Created by mwei on 1/6/17.</p>
*/
public class Address {

/**
* @param addr
* Returns whether a given address is a valid address or not.
*
* @param addr log address
* @return true for all flag non-address constants
*/
public static boolean nonAddress(long addr) { return addr < 0; }
public static boolean nonAddress(long addr) {
return addr < 0;
}

/**
* @param addr
* Returns whether a given address is a valid address or not.
*
* @param addr log address
* @return true is addr is a legitimate address; false for all flag non-address constants
*/
public static boolean isAddress(long addr) { return addr >= 0; }
public static boolean isAddress(long addr) {
return addr >= 0;
}

/**
* Returns minimum address (base value for iterations).
*
* @return a constant which can be used as the base for address iterations
*/
@Getter
private static long minAddress = 0L;
public static boolean isMinAddress(long addr) { return addr == minAddress; }

public static boolean isMinAddress(long addr) {
return addr == minAddress;
}

// TODO should clean this up soon
public static long maxNonAddress() { return -1L; }
public static long maxNonAddress() {
return -1L;
}

/**
* @return A constant which can be used in loops going down up to hitting a non-address.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,16 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import lombok.extern.slf4j.Slf4j;
import org.corfudb.protocols.logprotocol.CheckpointEntry;

import org.corfudb.protocols.wireprotocol.DataType;
import org.corfudb.protocols.wireprotocol.ILogData;
import org.corfudb.protocols.wireprotocol.IToken;
Expand All @@ -16,21 +24,12 @@
import org.corfudb.runtime.exceptions.OverwriteException;
import org.corfudb.runtime.exceptions.WrongEpochException;
import org.corfudb.util.CFUtils;
import org.corfudb.util.Utils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;


/**
* A view of the address space implemented by Corfu.
* <p>
* Created by mwei on 12/10/15.
*
* <p>Created by mwei on 12/10/15.</p>
*/
@Slf4j
public class AddressSpaceView extends AbstractView {
Expand All @@ -46,26 +45,33 @@ public class AddressSpaceView extends AbstractView {
.recordStats()
.build(new CacheLoader<Long, ILogData>() {
@Override
public ILogData load(Long aLong) throws Exception {
return cacheFetch(aLong);
public ILogData load(Long value) throws Exception {
return cacheFetch(value);
}

@Override
public Map<Long, ILogData>
loadAll(Iterable<? extends Long> keys) throws Exception {
public Map<Long, ILogData> loadAll(Iterable<? extends Long> keys) throws Exception {
return cacheFetch((Iterable<Long>) keys);
}
});

/**
* Constructor for the Address Space View.
*/
public AddressSpaceView(@Nonnull final CorfuRuntime runtime) {
super(runtime);

final String pfx = String.format("%s0x%x.cache.", runtime.getMpASV(), this.hashCode());
runtime.getMetrics().register(pfx + "cache-size", (Gauge<Long>) () -> readCache.estimatedSize());
runtime.getMetrics().register(pfx + "evictions", (Gauge<Long>) () -> readCache.stats().evictionCount());
runtime.getMetrics().register(pfx + "hit-rate", (Gauge<Double>) () -> readCache.stats().hitRate());
runtime.getMetrics().register(pfx + "hits", (Gauge<Long>) () -> readCache.stats().hitCount());
runtime.getMetrics().register(pfx + "misses", (Gauge<Long>) () -> readCache.stats().missCount());
runtime.getMetrics().register(pfx + "cache-size",
(Gauge<Long>) () -> readCache.estimatedSize());
runtime.getMetrics().register(pfx + "evictions",
(Gauge<Long>) () -> readCache.stats().evictionCount());
runtime.getMetrics().register(pfx + "hit-rate",
(Gauge<Double>) () -> readCache.stats().hitRate());
runtime.getMetrics().register(pfx + "hits",
(Gauge<Long>) () -> readCache.stats().hitCount());
runtime.getMetrics().register(pfx + "misses",
(Gauge<Long>) () -> readCache.stats().missCount());
}

/**
Expand All @@ -88,8 +94,7 @@ public void resetCaches() {
* another value.
* @throws WrongEpochException If the token epoch is invalid.
*/
public void write(IToken token, Object data)
throws OverwriteException {
public void write(IToken token, Object data) throws OverwriteException {
final ILogData ld = new LogData(DataType.DATA, data);

layoutHelper(l -> {
Expand All @@ -108,7 +113,7 @@ public void write(IToken token, Object data)
.getReplicationProtocol(runtime)
.write(l, ld);
return null;
});
});

// Cache the successful write
if (!runtime.isCacheDisabled()) {
Expand Down Expand Up @@ -141,7 +146,8 @@ public void write(IToken token, Object data)
if (!runtime.isCacheDisabled()) {
ILogData data = readCache.get(address);
if (data == null || data.getType() == DataType.EMPTY) {
throw new RuntimeException("Unexpected return of empty data at address " + address + " on read");
throw new RuntimeException("Unexpected return of empty data at address "
+ address + " on read");
}
return data;
}
Expand All @@ -164,16 +170,16 @@ public Map<Long, ILogData> read(Iterable<Long> addresses) {
/**
* Prefix trim the address space.
*
* At the end of a prefix trim, all addresses equal to or
* <p>At the end of a prefix trim, all addresses equal to or
* less than the address given will be marked for trimming,
* which means that they may return either the original
* data, or a trimmed exception.
* data, or a trimmed exception.</p>
*
* @param address
* @param address log address
*/
public void prefixTrim(final long address) {
log.debug("PrefixTrim[{}]", address);
layoutHelper(l ->{
layoutHelper(l -> {
l.getPrefixSegments(address).stream()
.flatMap(seg -> seg.getStripes().stream())
.flatMap(stripe -> stripe.getLogServers().stream())
Expand Down Expand Up @@ -234,7 +240,7 @@ public void invalidateClientCache() {
*
* @param address An address to read from.
* @return A result to be cached. If the readresult is empty,
* This entry will be scheduled to self invalidate.
* This entry will be scheduled to self invalidate.
*/
private @Nonnull ILogData cacheFetch(long address) {
log.trace("CacheMiss[{}]", address);
Expand All @@ -250,7 +256,7 @@ public void invalidateClientCache() {
*
* @param addresses An address to read from.
* @return A result to be cached. If the readresult is empty,
* This entry will be scheduled to self invalidate.
* This entry will be scheduled to self invalidate.
*/
private @Nonnull Map<Long, ILogData> cacheFetch(Iterable<Long> addresses) {
return StreamSupport.stream(addresses.spliterator(), true)
Expand Down

0 comments on commit a2cc666

Please sign in to comment.