Skip to content

Commit

Permalink
Update style org.corfudb.infrastructure.log (#687) (#738)
Browse files Browse the repository at this point in the history
  • Loading branch information
kjames88 authored and no2chem committed Jun 20, 2017
1 parent b55987f commit 15ed9d3
Show file tree
Hide file tree
Showing 6 changed files with 312 additions and 257 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ public class AddressMetaData {
public final int length;
public final long offset;

/**
* Returns a metadata object for an address.
*
* @param checksum checksum of log data
* @param length length of log data
* @param offset file channel offset
**/
public AddressMetaData(int checksum, int length, long offset) {
this.checksum = checksum;
this.length = length;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
package org.corfudb.infrastructure.log;

import io.netty.util.internal.ConcurrentSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import io.netty.util.internal.ConcurrentSet;
import org.corfudb.protocols.wireprotocol.LogData;
import org.corfudb.runtime.exceptions.OverwriteException;
import org.corfudb.runtime.exceptions.TrimmedException;

/**
* This class implements the StreamLog interface using a Java hash map. The stream log is only stored in-memory and not
* persisted and thus should only be used for testing.
*
* This class implements the StreamLog interface using a Java hash map.
* The stream log is only stored in-memory and not persisted.
* This should only be used for testing.
* Created by maithem on 7/21/16.
*/
public class InMemoryStreamLog implements StreamLog, StreamLogWithRankedAddressSpace {

private final AtomicLong globalTail = new AtomicLong(0L);
private Map<Long, LogData> logCache;
private Map<UUID, Map<Long, LogData>> streamCache;
private Set<Long> trimmed;
final private AtomicLong globalTail = new AtomicLong(0L);
private volatile long startingAddress;

/**
* Returns an object that stores a stream log in memory.
*/
public InMemoryStreamLog() {
logCache = new ConcurrentHashMap();
streamCache = new HashMap();
Expand All @@ -47,11 +50,12 @@ public synchronized void append(long address, LogData entry) {
logCache.put(address, entry);


globalTail.getAndUpdate(maxTail -> entry.getGlobalAddress() > maxTail ? entry.getGlobalAddress() : maxTail);
globalTail.getAndUpdate(maxTail -> entry.getGlobalAddress() > maxTail
? entry.getGlobalAddress() : maxTail);
}

private void checkRange(long address) {
if(address < startingAddress) {
if (address < startingAddress) {
throw new TrimmedException();
}
}
Expand All @@ -68,7 +72,7 @@ public long getGlobalTail() {
}

private void throwLogUnitExceptionsIfNecessary(long address, LogData entry) {
if (entry.getRank()==null) {
if (entry.getRank() == null) {
throw new OverwriteException();
} else {
// the method below might throw DataOutrankedException or ValueAdoptedException
Expand All @@ -84,7 +88,7 @@ public synchronized void trim(long address) {
@Override
public LogData read(long address) {
checkRange(address);
if(trimmed.contains(address)) {
if (trimmed.contains(address)) {
throw new TrimmedException();
}

Expand All @@ -110,7 +114,7 @@ public void release(long address, LogData entry) {
@Override
public synchronized void compact() {
// Prefix Trim
for(long address : logCache.keySet()){
for (long address : logCache.keySet()) {
if (address < startingAddress) {
logCache.remove(address);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*/

package org.corfudb.infrastructure.log;
import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.*;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import lombok.AllArgsConstructor;
import lombok.Data;

/**
* Allows acquiring different read/write locks for different addresses
* Allows acquiring different read/write locks for different addresses.
*
* Created by Konstantin Spirov on 1/22/2015
* <p>Created by Konstantin Spirov on 1/22/2015
*/
public class MultiReadWriteLock {

// all used locks
private ConcurrentHashMap<Long, LockReference> locks = new ConcurrentHashMap<>();

// lock references per thread
private final ThreadLocal<LinkedList<LockMetadata>> threadLockReferences = new ThreadLocal<>();
// all used locks
private ConcurrentHashMap<Long, LockReference> locks = new ConcurrentHashMap<>();

/**
* Acquire a read lock. The recommended use of this method is in try-with-resources statement.
*
* @param address id of the lock to acquire.
* @return A closable that will free the allocations for this lock if necessary
*/
Expand All @@ -50,6 +53,7 @@ public AutoCloseableLock acquireReadLock(final Long address) {

/**
* Acquire a write lock. The recommended use of this method is in try-with-resources statement.
*
* @param address id of the lock to acquire.
* @return A closable that will free the allocations for this lock if necessary
*/
Expand All @@ -73,22 +77,23 @@ public AutoCloseableLock acquireWriteLock(final Long address) {

private ReentrantReadWriteLock constructLockFor(Long name) {
return locks.compute(name, (key, ref) -> {
if (ref == null) {
ref = new LockReference(new ReentrantReadWriteLock());
if (ref == null) {
ref = new LockReference(new ReentrantReadWriteLock());
}
ref.referenceCount++;
return ref;
}
ref.referenceCount++;
return ref;
}
).getLock();
}

private void clearEventuallyLockFor(Long name) {
locks.compute(name, (aLong, ref) -> {
locks.compute(name, (unusedLong, ref) -> {
if (ref == null) {
throw new IllegalStateException("Lock is wrongly used " + ref+" "+System.identityHashCode(Thread.currentThread()));
throw new IllegalStateException("Lock is wrongly used " + ref + " "
+ System.identityHashCode(Thread.currentThread()));
}
ref.referenceCount--;
if (ref.getReferenceCount()==0) {
if (ref.getReferenceCount() == 0) {
return null;
} else {
return ref;
Expand All @@ -97,20 +102,21 @@ private void clearEventuallyLockFor(Long name) {
}



private void registerLockReference(long address, boolean writeLock) {
LinkedList<LockMetadata> threadLocks = threadLockReferences.get();
if (threadLocks==null) {
if (threadLocks == null) {
threadLocks = new LinkedList<>();
threadLockReferences.set(threadLocks);
} else {
LockMetadata last = threadLocks.getLast();
if (last.getAddress()>address) {
throw new IllegalStateException("Wrong lock acquisition order "+last.getAddress()+" > "+address);
if (last.getAddress() > address) {
throw new IllegalStateException("Wrong lock acquisition order " + last.getAddress()
+ " > " + address);
}
if (writeLock) {
if (last.getAddress()==address && !last.isWriteLock()) {
throw new IllegalStateException("Write lock in the scope of read lock for "+address);
if (last.getAddress() == address && !last.isWriteLock()) {
throw new IllegalStateException("Write lock in the scope of read lock for "
+ address);
}
}
}
Expand All @@ -128,6 +134,12 @@ private void deregisterLockReference(long address, boolean writeLock) {
}
}

public interface AutoCloseableLock extends AutoCloseable {
@Override
void close();

}

@Data
@AllArgsConstructor
private class LockMetadata {
Expand All @@ -137,16 +149,11 @@ private class LockMetadata {

@Data
private class LockReference {
public LockReference(ReentrantReadWriteLock lock) {
this.lock = lock;
}
private ReentrantReadWriteLock lock;
private int referenceCount;
}

public interface AutoCloseableLock extends AutoCloseable {
@Override
void close();

public LockReference(ReentrantReadWriteLock lock) {
this.lock = lock;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,39 +1,39 @@
package org.corfudb.infrastructure.log;

import org.corfudb.protocols.wireprotocol.LogData;

import java.io.IOException;

import org.corfudb.protocols.wireprotocol.LogData;

/**
* An interface definition that specifies an api to interact with a StreamLog.
*
* Created by maithem on 7/15/16.
* <p>Created by maithem on 7/15/16.
*/

public interface StreamLog {

/**
* Append an entry to the stream log.
* @param address
* @param entry
* @param address address of append entry
* @param entry entry to append to the log
*/
void append(long address, LogData entry);

/**
* Given an address, read the corresponding stream entry.
* @param address
* @param address address to read from the log
* @return Stream entry if it exists, otherwise return null
*/
LogData read(long address);

/**
* Mark a StreamLog address as trimmed.
* @param address
* @param address address to trim from the log
*/
void trim(long address);

/**
* Prefix trim the global log
* Prefix trim the global log.
* @param address address to trim the log up to
*/
void prefixTrim(long address);
Expand Down Expand Up @@ -61,9 +61,9 @@ public interface StreamLog {
void close();

/**
* unmap/release the memory for entry
* unmap/release the memory for entry.
*
* @param address
* @param address address to release
*/
void release(long address, LogData entry);
}

0 comments on commit 15ed9d3

Please sign in to comment.