Skip to content

Commit

Permalink
Implemented a jna mlock to map and ping index files for RO stores in
Browse files Browse the repository at this point in the history
memory
  • Loading branch information
abh1nay committed Nov 29, 2012
1 parent 349f852 commit 4db8cc3
Show file tree
Hide file tree
Showing 15 changed files with 997 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .classpath
Expand Up @@ -53,9 +53,9 @@
<classpathentry kind="lib" path="lib/snappy-0.2.jar"/>
<classpathentry kind="lib" path="lib/httpclient-4.1.2.jar"/>
<classpathentry kind="lib" path="lib/httpcore-4.1.2.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="lib" path="lib/joda-time-1.6.jar"/>
<classpathentry kind="lib" path="lib/mail-1.4.1.jar"/>
<classpathentry kind="lib" path="lib/azkaban-common-0.05.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="output" path="classes"/>
</classpath>
2 changes: 1 addition & 1 deletion META-INF/MANIFEST.MF
Expand Up @@ -2,6 +2,6 @@ Manifest-Version: 1.0
Ant-Version: Apache Ant 1.7.1
Created-By: 20.1-b02 (Sun Microsystems Inc.)
Implementation-Title: Voldemort
Implementation-Version: 1.1.3
Implementation-Version: 1.1.4
Implementation-Vendor: LinkedIn

20 changes: 19 additions & 1 deletion config/single_node_cluster/config/stores.xml
Expand Up @@ -35,5 +35,23 @@
<schema-info version="1">{"type": "record", "name": "myrec","fields": [{ "name": "original", "type": "string" }, { "name": "new-field", "type": "string", "default":"" }]}</schema-info>
</value-serializer>
</store>

<store>
<name>anagpal-test-old</name>
<persistence>read-only</persistence>
<description>"test store"</description>
<owners>anagpal@linkedin.com</owners>
<routing-strategy>consistent-routing</routing-strategy>
<routing>client</routing>
<replication-factor>1</replication-factor>
<required-reads>1</required-reads>
<required-writes>1</required-writes>
<key-serializer>
<type>json</type>
<schema-info version="0">"string"</schema-info>
</key-serializer>
<value-serializer>
<type>json</type>
<schema-info version="0">{"cnt":"int32", "country":"string"}</schema-info>
</value-serializer>
</store>
</stores>
52 changes: 49 additions & 3 deletions src/java/voldemort/store/readonly/chunk/ChunkedFileSet.java
Expand Up @@ -22,6 +22,7 @@
import voldemort.store.readonly.ReadOnlyStorageFormat;
import voldemort.store.readonly.ReadOnlyStorageMetadata;
import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.store.readonly.io.MappedFileReader;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.Pair;
Expand All @@ -45,6 +46,8 @@ public class ChunkedFileSet {
private final List<Integer> indexFileSizes;
private final List<Integer> dataFileSizes;
private final List<MappedByteBuffer> indexFiles;

private List<MappedFileReader> mappedIndexFileReader;
private final List<FileChannel> dataFiles;
private final HashMap<Object, Integer> chunkIdToChunkStart;
private final HashMap<Object, Integer> chunkIdToNumChunks;
Expand Down Expand Up @@ -76,6 +79,8 @@ public ChunkedFileSet(File directory, RoutingStrategy routingStrategy, int nodeI
this.indexFileSizes = new ArrayList<Integer>();
this.dataFileSizes = new ArrayList<Integer>();
this.indexFiles = new ArrayList<MappedByteBuffer>();
this.mappedIndexFileReader = new ArrayList<MappedFileReader>();

this.dataFiles = new ArrayList<FileChannel>();
this.chunkIdToChunkStart = new HashMap<Object, Integer>();
this.chunkIdToNumChunks = new HashMap<Object, Integer>();
Expand Down Expand Up @@ -148,7 +153,18 @@ else if(index.exists() ^ data.exists())

/* Add the file channel for data */
dataFiles.add(openChannel(data));
indexFiles.add(mapFile(index));

MappedFileReader idxFileReader = null;
try {
idxFileReader = new MappedFileReader(index);
mappedIndexFileReader.add(idxFileReader);
indexFiles.add(idxFileReader.map());
} catch(IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

// indexFiles.add(mapFile(index));
chunkId++;
}
if(chunkId == 0)
Expand Down Expand Up @@ -200,7 +216,18 @@ public void initVersion1() {

/* Add the file channel for data */
dataFiles.add(openChannel(data));
indexFiles.add(mapFile(index));

MappedFileReader idxFileReader = null;
try {
idxFileReader = new MappedFileReader(index);
mappedIndexFileReader.add(idxFileReader);
indexFiles.add(idxFileReader.map());
} catch(IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

// indexFiles.add(mapFile(index));
chunkId++;
globalChunkId++;
}
Expand Down Expand Up @@ -282,7 +309,18 @@ public void initVersion2() {

/* Add the file channel for data */
dataFiles.add(openChannel(data));
indexFiles.add(mapFile(index));

MappedFileReader idxFileReader = null;
try {
idxFileReader = new MappedFileReader(index);
mappedIndexFileReader.add(idxFileReader);
indexFiles.add(idxFileReader.map());
} catch(IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

// indexFiles.add(mapFile(index));
chunkId++;
globalChunkId++;
}
Expand Down Expand Up @@ -348,6 +386,14 @@ public void close() {
} catch(IOException e) {
logger.error("Error while closing file.", e);
}

MappedFileReader idxFileReader = mappedIndexFileReader.get(chunk);
try {
idxFileReader.close();
} catch(IOException e) {

logger.error("Error while closing file.", e);
}
}
}

Expand Down
83 changes: 83 additions & 0 deletions src/java/voldemort/store/readonly/io/BaseCloser.java
@@ -0,0 +1,83 @@
package voldemort.store.readonly.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public abstract class BaseCloser<T> {

protected List<T> delegates = new ArrayList();

private Throwable cause = null;

private boolean executed = false;

public BaseCloser() {}

public BaseCloser(List<T> delegates) {
this.delegates = delegates;
}

public BaseCloser(T... delegates) {
add(delegates);
}

public void add(T delegate) {
delegates.add(delegate);
}

public void add(T... delegates) {
for(T current: delegates) {
this.delegates.add(current);
}
}

public void setCause(Throwable cause) {
this.cause = cause;
}

protected boolean executed() {
return executed;
}

protected void exec() throws GroupIOException {

if(executed)
return;

GroupIOException exc = null;

if(cause != null)
exc = new GroupIOException(cause);

for(T current: delegates) {

if(current == null)
continue;

try {

onDelegate(current);

} catch(Throwable t) {

if(exc == null) {
exc = new GroupIOException(t);
} else {
exc.addSuppressed(t);
}

}

}

executed = true;

if(exc != null)
throw exc;

}

protected abstract void onDelegate(T delegate) throws IOException;

}
42 changes: 42 additions & 0 deletions src/java/voldemort/store/readonly/io/BaseMappedFile.java
@@ -0,0 +1,42 @@
package voldemort.store.readonly.io;

import java.io.File;
import java.nio.channels.FileChannel;

/**
*
*/
public class BaseMappedFile {

protected FileChannel channel;

protected long offset = 0;

protected long length = 0;

protected Closer closer = new Closer();

protected File file;

protected int fd;

protected boolean fadvise = true;

public File getFile() {
return file;
}

public int getFd() {
return fd;
}

public boolean isClosed() {
return closer.isClosed();
}

@Override
public String toString() {
return file.toString();
}

}
29 changes: 29 additions & 0 deletions src/java/voldemort/store/readonly/io/ByteBufferCloser.java
@@ -0,0 +1,29 @@
package voldemort.store.readonly.io;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;

/**
* A closeable which is smart enough to work on byte buffers.
*/
public class ByteBufferCloser implements Closeable {

private ByteBuffer buff;

public ByteBufferCloser(ByteBuffer buff) {
this.buff = buff;
}

@Override
public void close() throws IOException {

sun.misc.Cleaner cl = ((sun.nio.ch.DirectBuffer) buff).cleaner();

if(cl != null) {
cl.clean();
}

}

}
36 changes: 36 additions & 0 deletions src/java/voldemort/store/readonly/io/Closer.java
@@ -0,0 +1,36 @@
package voldemort.store.readonly.io;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;

public class Closer extends BaseCloser<Closeable> implements Closeable {

public Closer() {}

public Closer(List delegates) {
this.delegates = (List<Closeable>) delegates;
}

public Closer(Closeable... delegates) {
add(delegates);
}

@Override
public void close() throws IOException {
exec();
}

public boolean closed() {
return executed();
}

public boolean isClosed() {
return executed();
}

protected void onDelegate(Closeable delegate) throws IOException {
delegate.close();
}

}
46 changes: 46 additions & 0 deletions src/java/voldemort/store/readonly/io/GroupIOException.java
@@ -0,0 +1,46 @@
package voldemort.store.readonly.io;

import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;

public class GroupIOException extends IOException {

private static final long serialVersionUID = 1L;
List<Throwable> suppressed = new ArrayList<Throwable>();

public GroupIOException(Throwable cause) {
suppressed.add(cause);
}

public void addSuppressed(Throwable t) {
suppressed.add(t);
}

@Override
public void printStackTrace(PrintStream out) {

for(Throwable current: suppressed) {
current.printStackTrace(out);
}

// this will print ourselves AND the cause...
super.printStackTrace(out);

}

@Override
public void printStackTrace(PrintWriter out) {

for(Throwable current: suppressed) {
current.printStackTrace(out);
}

// this will print ourselves AND the cause...
super.printStackTrace(out);

}

}

0 comments on commit 4db8cc3

Please sign in to comment.