From d0662ca44bb5a2cc5fbfcfe41213a9665892d278 Mon Sep 17 00:00:00 2001 From: Andy Seaborne Date: Tue, 3 Apr 2018 22:36:37 +0100 Subject: [PATCH 1/3] JENA-1516: Remove alloc-write. --- .../jena/tdb/base/objectfile/ObjectFile.java | 21 +------ .../tdb/base/objectfile/ObjectFileLogger.java | 23 -------- .../base/objectfile/ObjectFileReadonly.java | 6 +- .../base/objectfile/ObjectFileWrapper.java | 10 ---- .../jena/tdb/base/objectfile/StringFile.java | 12 ++-- .../transaction/ObjectFileTransComplex.java | 26 --------- .../objectfile/AbstractTestObjectFile.java | 36 +----------- .../objectfile/TestObjectFileBuffering.java | 57 +++++++++---------- 8 files changed, 39 insertions(+), 152 deletions(-) diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFile.java b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFile.java index 6d2a37003dd..2bceaa4ff14 100644 --- a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFile.java +++ b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFile.java @@ -24,7 +24,6 @@ import org.apache.jena.atlas.lib.Closeable ; import org.apache.jena.atlas.lib.Pair ; import org.apache.jena.atlas.lib.Sync ; -import org.apache.jena.tdb.base.block.Block ; /** * An ObjectFile is an append-read file, that is you can append data @@ -37,19 +36,6 @@ public interface ObjectFile extends Sync, Closeable /** A label to identify this ObjectFile */ public String getLabel() ; - - /** Allocate space for a write - pass this buffer to completeWrite . - * The data to be written can be smaller than that requested but - * the data must be in position 0 -> limit. - */ - public Block allocWrite(int bytesSpace) ; - - /** Announce that a write is complete (buffer must come from allocWrite) - */ - public void completeWrite(Block buffer) ; - - /** Decide not to perform the write */ - public void abortWrite(Block buffer) ; /** Write out the buffer - return the accessor number */ public long write(ByteBuffer buffer) ; @@ -65,14 +51,13 @@ public interface ObjectFile extends Sync, Closeable /** Reset the "append" point; may only be moved earlier. * The new position must correspond to a position returned by - * {@link #write(ByteBuffer)} or an id in a {@link Block Block} from {@link #completeWrite(Block)} + * {@link #write(ByteBuffer)}. */ public void reposition(long id) ; - /** - */ + /** Truncate the file */ public void truncate(long size) ; - /** All the bytebuffers - debugging aid */ + /** All the contents as ByteBuffers - debugging aid */ public Iterator> all() ; } diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileLogger.java b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileLogger.java index f4a54b5fa72..5a39cb05dba 100644 --- a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileLogger.java +++ b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileLogger.java @@ -22,7 +22,6 @@ import java.util.Iterator ; import org.apache.jena.atlas.lib.Pair ; -import org.apache.jena.tdb.base.block.Block ; import org.slf4j.Logger ; import org.slf4j.LoggerFactory ; @@ -40,28 +39,6 @@ public ObjectFileLogger(String label, ObjectFile other) log = defaultLogger ; } - @Override - public Block allocWrite(int maxBytes) - { - Block blk = other.allocWrite(maxBytes) ; - info("allocWrite("+maxBytes+") -> "+blk.getId()) ; - return blk ; - } - - @Override - public void completeWrite(Block buffer) - { - info("completeWrite("+buffer.getId()+")") ; - other.completeWrite(buffer) ; - } - - @Override - public void abortWrite(Block buffer) - { - info("abortWrite("+buffer.getId()+")") ; - other.abortWrite(buffer) ; - } - @Override public long write(ByteBuffer buffer) { diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileReadonly.java b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileReadonly.java index 9044b825caa..4ac759cd819 100644 --- a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileReadonly.java +++ b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileReadonly.java @@ -21,17 +21,13 @@ import java.nio.ByteBuffer ; import org.apache.jena.tdb.TDBException; -import org.apache.jena.tdb.base.block.Block ; public class ObjectFileReadonly extends ObjectFileWrapper { public ObjectFileReadonly(ObjectFile other) { super(other) ; } - @Override public Block allocWrite(int maxBytes) { throw new TDBException("Read-only object file") ; } - @Override public void completeWrite(Block buffer) { throw new TDBException("Read-only object file") ; } - @Override public void abortWrite(Block buffer) { throw new TDBException("Read-only object file") ; } @Override public long write(ByteBuffer buffer) { throw new TDBException("Read-only object file") ; } @Override public void reposition(long id) { throw new TDBException("Read-only object file") ; } @Override public void truncate(long size) { throw new TDBException("Read-only object file") ; } - @Override public String toString() { return "RO:"+super.toString() ; } + @Override public String toString() { return "RO:"+super.toString() ; } } diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileWrapper.java b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileWrapper.java index 851840634f9..0108afde788 100644 --- a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileWrapper.java +++ b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileWrapper.java @@ -22,7 +22,6 @@ import java.util.Iterator ; import org.apache.jena.atlas.lib.Pair ; -import org.apache.jena.tdb.base.block.Block ; /** * An ObjectFile is an append-read file, that is you can append data @@ -35,15 +34,6 @@ public class ObjectFileWrapper implements ObjectFile public ObjectFileWrapper(ObjectFile other) { this.other = other ; } - @Override - public Block allocWrite(int maxBytes) { return other.allocWrite(maxBytes) ; } - - @Override - public void completeWrite(Block buffer) { other.completeWrite(buffer) ; } - - @Override - public void abortWrite(Block buffer) { other.abortWrite(buffer) ; } - @Override public long write(ByteBuffer buffer) { return other.write(buffer) ; } diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/StringFile.java b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/StringFile.java index a0a49dd97c3..bdd3b286fbb 100644 --- a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/StringFile.java +++ b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/StringFile.java @@ -23,7 +23,6 @@ import org.apache.jena.atlas.lib.Bytes ; import org.apache.jena.atlas.lib.Closeable ; import org.apache.jena.atlas.lib.Sync ; -import org.apache.jena.tdb.base.block.Block ; import org.apache.jena.tdb.lib.StringAbbrev ; /** Wrap a {@link ObjectFile} with a string encoder/decoder. @@ -47,11 +46,12 @@ public StringFile(ObjectFile file) public long write(String str) { str = compress(str) ; - Block block = file.allocWrite(4*str.length()) ; - int len = Bytes.toByteBuffer(str, block.getByteBuffer()) ; - block.getByteBuffer().flip() ; - file.completeWrite(block) ; - return block.getId() ; + + ByteBuffer bb = ByteBuffer.allocate(4*str.length()); + int len = Bytes.toByteBuffer(str, bb) ; + bb.flip() ; + long x = file.write(bb) ; + return x; } public String read(long id) diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/ObjectFileTransComplex.java b/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/ObjectFileTransComplex.java index cca941ef19e..adf88e89702 100644 --- a/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/ObjectFileTransComplex.java +++ b/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/ObjectFileTransComplex.java @@ -25,7 +25,6 @@ import org.apache.jena.atlas.lib.Pair ; import org.apache.jena.atlas.lib.StrUtils ; import org.apache.jena.atlas.logging.Log ; -import org.apache.jena.tdb.base.block.Block ; import org.apache.jena.tdb.base.file.FileException ; import org.apache.jena.tdb.base.objectfile.ObjectFile ; @@ -163,31 +162,6 @@ public void truncate(long id) otherAllocOffset = base.length() ; } - @Override - public Block allocWrite(int maxBytes) - { - if ( passthrough ) return base.allocWrite(maxBytes) ; - Block block = transObjects.allocWrite(maxBytes) ; - block = new Block(block.getId()+otherAllocOffset, block.getByteBuffer()) ; - return block ; - } - - @Override - public void completeWrite(Block block) - { - if ( passthrough ) { base.completeWrite(block) ; return ; } - block = new Block(block.getId()-otherAllocOffset, block.getByteBuffer()) ; - transObjects.completeWrite(block) ; - } - - @Override - public void abortWrite(Block block) - { - if ( passthrough ) { base.abortWrite(block) ; return ; } - block = new Block(block.getId()-otherAllocOffset, block.getByteBuffer()) ; - transObjects.abortWrite(block) ; - } - /** Convert from a id to the id in the "other" file */ private long mapToOther(long x) { return x-otherAllocOffset ; } /** Convert from a id in other to an external id */ diff --git a/jena-tdb/src/test/java/org/apache/jena/tdb/base/objectfile/AbstractTestObjectFile.java b/jena-tdb/src/test/java/org/apache/jena/tdb/base/objectfile/AbstractTestObjectFile.java index 424c337970d..322e86da76c 100644 --- a/jena-tdb/src/test/java/org/apache/jena/tdb/base/objectfile/AbstractTestObjectFile.java +++ b/jena-tdb/src/test/java/org/apache/jena/tdb/base/objectfile/AbstractTestObjectFile.java @@ -23,8 +23,6 @@ import java.nio.ByteBuffer ; import org.apache.jena.atlas.junit.BaseTest ; -import org.apache.jena.tdb.base.block.Block ; -import org.apache.jena.tdb.base.objectfile.ObjectFile ; import org.junit.After ; import org.junit.Before ; import org.junit.Test ; @@ -35,28 +33,14 @@ public abstract class AbstractTestObjectFile extends BaseTest @Before public void before() { file = make() ; } @After public void after() { release(file); } + + // Test 02 and 04 were for alloc-write. @Test public void objectfile_01() { - assertEquals(0, file.length()) ; } - @Test public void objectfile_02() - { - Block block = file.allocWrite(10) ; - fill(block.getByteBuffer()) ; - file.completeWrite(block) ; - long x1 = block.getId() ; - assertEquals(0, x1) ; - - ByteBuffer bb = file.read(x1) ; - - // position - - assertTrue(sameValue(block.getByteBuffer(), bb)) ; - } - @Test public void objectfile_03() { ByteBuffer bb = ByteBuffer.allocate(10) ; @@ -65,22 +49,6 @@ public abstract class AbstractTestObjectFile extends BaseTest assertEquals(0, x1) ; } - @Test public void objectfile_04() - { - Block block1 = file.allocWrite(10) ; - fill(block1.getByteBuffer()) ; - file.completeWrite(block1) ; - - Block block2 = file.allocWrite(20) ; - fill(block2.getByteBuffer()) ; - file.completeWrite(block2) ; - - long x1 = block1.getId() ; - long x2 = block2.getId() ; - - assertFalse(x1 == x2) ; - } - @Test public void objectfile_05() { ByteBuffer bb1 = ByteBuffer.allocate(10) ; diff --git a/jena-tdb/src/test/java/org/apache/jena/tdb/base/objectfile/TestObjectFileBuffering.java b/jena-tdb/src/test/java/org/apache/jena/tdb/base/objectfile/TestObjectFileBuffering.java index 3265d939e23..99e3c0f31ef 100644 --- a/jena-tdb/src/test/java/org/apache/jena/tdb/base/objectfile/TestObjectFileBuffering.java +++ b/jena-tdb/src/test/java/org/apache/jena/tdb/base/objectfile/TestObjectFileBuffering.java @@ -24,11 +24,8 @@ import java.nio.ByteBuffer ; import org.apache.jena.atlas.junit.BaseTest ; -import org.apache.jena.tdb.base.block.Block ; import org.apache.jena.tdb.base.file.BufferChannel ; import org.apache.jena.tdb.base.file.BufferChannelMem ; -import org.apache.jena.tdb.base.objectfile.ObjectFile ; -import org.apache.jena.tdb.base.objectfile.ObjectFileStorage ; import org.junit.Test ; public class TestObjectFileBuffering extends BaseTest @@ -62,41 +59,41 @@ private void write(int sizeOfBuffer, int... sizes) } } - private void writePrealloc(int sizeOfBuffer, int... sizes) - { - ObjectFile file = make(sizeOfBuffer) ; - int N = sizes.length ; - Block blocks[] = new Block[N] ; - ByteBuffer read[] = new ByteBuffer[N] ; - - for ( int i = 0 ; i < N ; i++ ) - { - blocks[i] = file.allocWrite(sizes[i]) ; - fill(blocks[i].getByteBuffer()) ; - file.completeWrite(blocks[i]) ; - } - - for ( int i = 0 ; i < N ; i++ ) - { - read[i] = file.read(blocks[i].getId()) ; - assertNotSame(blocks[i].getByteBuffer(), read[i]) ; - sameValue(blocks[i].getByteBuffer(), read[i]) ; - } - } +// private void writePrealloc(int sizeOfBuffer, int... sizes) +// { +// ObjectFile file = make(sizeOfBuffer) ; +// int N = sizes.length ; +// Block blocks[] = new Block[N] ; +// ByteBuffer read[] = new ByteBuffer[N] ; +// +// for ( int i = 0 ; i < N ; i++ ) +// { +// blocks[i] = file.allocWrite(sizes[i]) ; +// fill(blocks[i].getByteBuffer()) ; +// file.completeWrite(blocks[i]) ; +// } +// +// for ( int i = 0 ; i < N ; i++ ) +// { +// read[i] = file.read(blocks[i].getId()) ; +// assertNotSame(blocks[i].getByteBuffer(), read[i]) ; +// sameValue(blocks[i].getByteBuffer(), read[i]) ; +// } +// } @Test public void objectfile_50() { write(5, 10) ; } - @Test public void objectfile_51() { writePrealloc(5, 10) ; } + //@Test public void objectfile_51() { writePrealloc(5, 10) ; } @Test public void objectfile_52() { write(12, 10) ; } - @Test public void objectfile_53() { writePrealloc(12, 10) ; } + //@Test public void objectfile_53() { writePrealloc(12, 10) ; } @Test public void objectfile_54() { write(12, 10, 8) ; } // 10 is too big - @Test public void objectfile_55() { writePrealloc(12, 10, 8) ; } // 10 is too big + //@Test public void objectfile_55() { writePrealloc(12, 10, 8) ; } // 10 is too big @Test public void objectfile_56() { write(12, 6, 10) ; } - @Test public void objectfile_57() { writePrealloc(12, 6, 10) ; } + //@Test public void objectfile_57() { writePrealloc(12, 6, 10) ; } @Test public void objectfile_58() { write(20, 6, 10, 5) ; } - @Test public void objectfile_59() { writePrealloc(20, 6, 10, 5) ; } + //@Test public void objectfile_59() { writePrealloc(20, 6, 10, 5) ; } @Test public void objectfile_60() { write(20, 4, 4, 8) ; } - @Test public void objectfile_61() { writePrealloc(20, 4, 4, 8) ; } + //@Test public void objectfile_61() { writePrealloc(20, 4, 4, 8) ; } } From ab8d0f4e71039e1049d355ad37109c2bb515b803 Mon Sep 17 00:00:00 2001 From: Andy Seaborne Date: Tue, 3 Apr 2018 22:52:24 +0100 Subject: [PATCH 2/3] JENA-1516: Simplify write. Sync writer buffer; protect length read. Remove alloc-write. --- .../base/objectfile/ObjectFileStorage.java | 242 +++++------------- .../java/org/apache/jena/tdb/lib/NodeLib.java | 35 +-- 2 files changed, 79 insertions(+), 198 deletions(-) diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java index 8fcd06b64b3..3ad126bfede 100644 --- a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java +++ b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java @@ -27,8 +27,6 @@ import org.apache.jena.atlas.iterator.Iter ; import org.apache.jena.atlas.iterator.IteratorSlotted ; import org.apache.jena.atlas.lib.Pair ; -import org.apache.jena.atlas.logging.Log ; -import org.apache.jena.tdb.base.block.Block ; import org.apache.jena.tdb.base.file.BufferChannel ; import org.apache.jena.tdb.base.file.FileException ; import org.apache.jena.tdb.sys.SystemTDB ; @@ -50,7 +48,8 @@ private void log(String fmt, Object... args) } /* - * No synchronization - assumes that the caller has some appropriate lock + * No synchronization excpet for the write buffer. + * This code assumes that the caller has some appropriate lock * because the combination of file and cache operations needs to be thread safe. * * The position of the channel is assumed to be the end of the file always. @@ -60,25 +59,12 @@ private void log(String fmt, Object... args) * Writing is buffered. */ - // The object length slot. - private ByteBuffer lengthBuffer = ByteBuffer.allocate(SizeOfInt) ; - - // Delayed write buffer. + private final Object lockWriteBuffer = new Object(); private final ByteBuffer writeBuffer ; private final BufferChannel file ; // Access to storage - private long filesize ; // Size of on-disk. + private volatile long filesize ; // Size of on-disk. - // Two-step write - alloc, write - private boolean inAllocWrite = false ; - private Block allocBlock = null ; - private long allocLocation = -1 ; - - // Old values for abort. - int oldBufferPosn = -1 ; - int oldBufferLimit = -1 ; - - public ObjectFileStorage(BufferChannel file) { this(file, ObjectFileWriteCacheSize) ; @@ -94,13 +80,11 @@ public ObjectFileStorage(BufferChannel file, int bufferSize) } @Override + synchronized public long write(ByteBuffer bb) { log("W") ; - if ( inAllocWrite ) - Log.error(this, "In the middle of an alloc-write") ; - inAllocWrite = false ; if ( writeBuffer == null ) { long x = rawWrite(bb) ; @@ -111,35 +95,40 @@ public long write(ByteBuffer bb) int len = bb.limit() - bb.position() ; int spaceNeeded = len + SizeOfInt ; - if ( writeBuffer.position()+spaceNeeded > writeBuffer.capacity() ) - // No room - flush. - flushOutputBuffer() ; - if ( writeBuffer.position()+spaceNeeded > writeBuffer.capacity() ) - { - long x = rawWrite(bb) ; + synchronized(lockWriteBuffer) { + if ( writeBuffer.position()+spaceNeeded > writeBuffer.capacity() ) + // No room - flush. + flushOutputBuffer() ; + if ( writeBuffer.position()+spaceNeeded > writeBuffer.capacity() ) + { + long x = rawWrite(bb) ; + if ( logging ) + log("W -> 0x%X", x); + return x ; + } + + long loc = writeBuffer.position()+filesize ; + writeBuffer.putInt(len) ; + writeBuffer.put(bb) ; if ( logging ) - log("W -> 0x%X", x); - return x ; + log("W -> 0x%X", loc); + return loc ; } - - long loc = writeBuffer.position()+filesize ; - writeBuffer.putInt(len) ; - writeBuffer.put(bb) ; - if ( logging ) - log("W -> 0x%X", loc); - return loc ; } + // The object length slot. + private ByteBuffer writeLengthBuffer = ByteBuffer.allocate(SizeOfInt) ; + private long rawWrite(ByteBuffer bb) { if ( logging ) log("RW %s", bb) ; int len = bb.limit() - bb.position() ; - lengthBuffer.rewind() ; - lengthBuffer.putInt(len) ; - lengthBuffer.flip() ; + writeLengthBuffer.rewind() ; + writeLengthBuffer.putInt(len) ; + writeLengthBuffer.flip() ; long location = file.position() ; - file.write(lengthBuffer) ; + file.write(writeLengthBuffer) ; int x = file.write(bb) ; if ( x != len ) throw new FileException() ; @@ -153,140 +142,25 @@ private long rawWrite(ByteBuffer bb) return location ; } - @Override - public Block allocWrite(int bytesSpace) - { - //log.info("AW("+bytesSpace+"):"+state()) ; - if ( inAllocWrite ) - Log.error(this, "In the middle of an alloc-write") ; - - // Include space for length. - int spaceRequired = bytesSpace + SizeOfInt ; - - // Find space. - if ( writeBuffer != null && spaceRequired > writeBuffer.remaining() ) - flushOutputBuffer() ; - - if ( writeBuffer == null || spaceRequired > writeBuffer.remaining() ) - { - // Too big. Have flushed buffering if buffering. - inAllocWrite = true ; - ByteBuffer bb = ByteBuffer.allocate(bytesSpace) ; - allocBlock = new Block(filesize, bb) ; - allocLocation = -1 ; - //log.info("AW:"+state()+"-> ----") ; - return allocBlock ; - } - - // Will fit. - inAllocWrite = true ; - int start = writeBuffer.position() ; - // Old values for restoration - oldBufferPosn = start ; - oldBufferLimit = writeBuffer.limit() ; - - // id (but don't tell the caller yet). - allocLocation = filesize+start ; - - // Slice it. - writeBuffer.putInt(bytesSpace) ; - writeBuffer.position(start + SizeOfInt) ; - writeBuffer.limit(start+spaceRequired) ; - ByteBuffer bb = writeBuffer.slice() ; - - allocBlock = new Block(allocLocation, bb) ; - - if ( logging ) - log("AW: %s->0x%X", state(), allocLocation) ; - return allocBlock ; - } - - @Override - public void completeWrite(Block block) - { - if ( logging ) - log("CW: %s @0x%X",block, allocLocation) ; - if ( ! inAllocWrite ) - throw new FileException("Not in the process of an allocated write operation pair") ; - if ( allocBlock != null && ( allocBlock.getByteBuffer() != block.getByteBuffer() ) ) - throw new FileException("Wrong byte buffer in an allocated write operation pair") ; - - inAllocWrite = false ; - - ByteBuffer buffer = block.getByteBuffer() ; - - if ( allocLocation == -1 ) - { - // It was too big to use the buffering. - rawWrite(buffer) ; - return ; - } - // Write area is 0 -> limit - if ( 0 != buffer.position() ) - log.warn("ObjectFleStorage: position != 0") ; - buffer.position(0) ; - int actualLength = buffer.limit()-buffer.position() ; - // Insert object length - int idx = (int)(allocLocation-filesize) ; - writeBuffer.putInt(idx, actualLength) ; - // And bytes to idx+actualLength+4 are used - allocBlock = null ; - int newLen = idx+actualLength+4 ; - writeBuffer.position(newLen); - writeBuffer.limit(writeBuffer.capacity()) ; - allocLocation = -1 ; - oldBufferPosn = -1 ; - oldBufferLimit = -1 ; - } - - @Override - public void abortWrite(Block block) - { - allocBlock = null ; - int oldstart = (int)(allocLocation-filesize) ; - if ( oldstart != oldBufferPosn) - throw new FileException("Wrong reset point: calc="+oldstart+" : expected="+oldBufferPosn) ; - - writeBuffer.position(oldstart) ; - writeBuffer.limit(oldBufferLimit) ; - allocLocation = -1 ; - oldBufferPosn = -1 ; - oldBufferLimit = -1 ; - inAllocWrite = false ; - } - private void flushOutputBuffer() { if ( logging ) log("Flush") ; - if ( writeBuffer == null ) return ; - if ( writeBuffer.position() == 0 ) return ; - - if ( false ) - { - String x = getLabel() ; - if ( x.contains("nodes") ) - { - long x1 = filesize ; - long x2 = writeBuffer.position() ; - long x3 = x1 + x2 ; - System.out.printf("Flush(%s) : %d/0x%04X (%d/0x%04X) %d/0x%04X\n", getLabel(), x1, x1, x2, x2, x3, x3) ; - } - } - - long location = filesize ; + if ( writeBuffer == null ) + return; + if ( writeBuffer.position() == 0 ) + return; + long location = filesize; writeBuffer.flip(); - int x = file.write(writeBuffer) ; - filesize += x ; - writeBuffer.clear() ; + int x = file.write(writeBuffer); + filesize += x; + writeBuffer.clear(); } @Override public void reposition(long posn) { - if ( inAllocWrite ) - throw new FileException("In the middle of an alloc-write") ; if ( posn < 0 || posn > length() ) throw new IllegalArgumentException("reposition: Bad location: "+posn) ; flushOutputBuffer() ; @@ -307,38 +181,40 @@ public ByteBuffer read(long loc) if ( logging ) log("R(0x%X)", loc) ; - if ( inAllocWrite ) - throw new FileException("In the middle of an alloc-write") ; if ( loc < 0 ) throw new IllegalArgumentException("ObjectFile.read["+file.getLabel()+"]: Bad read: "+loc) ; // Maybe it's in the in the write buffer. - // Maybe the write buffer should keep more structure? if ( loc >= filesize ) { - if ( loc >= filesize+writeBuffer.position() ) - throw new IllegalArgumentException("ObjectFileStorage.read["+file.getLabel()+"]: Bad read: location="+loc+" >= max="+(filesize+writeBuffer.position())) ; - - int x = writeBuffer.position() ; - int y = writeBuffer.limit() ; - - int offset = (int)(loc-filesize) ; - int len = writeBuffer.getInt(offset) ; - int posn = offset + SizeOfInt ; - // Slice the data bytes, - writeBuffer.position(posn) ; - writeBuffer.limit(posn+len) ; - ByteBuffer bb = writeBuffer.slice() ; - writeBuffer.limit(y) ; - writeBuffer.position(x) ; - return bb ; + // This path should be uncommon. + synchronized(lockWriteBuffer) { + if ( loc >= filesize+writeBuffer.position() ) + throw new IllegalArgumentException("ObjectFileStorage.read["+file.getLabel()+"]: Bad read: location="+loc+" >= max="+(filesize+writeBuffer.position())) ; + int offset = (int)(loc-filesize) ; + int len = writeBuffer.getInt(offset) ; + int posn = offset + SizeOfInt ; + ByteBuffer bb1 = ByteBuffer.allocate(len) ; + for (int i = 0; i < len; i++) + bb1.put(i, writeBuffer.get(posn+i)); + return bb1 ; + } } // No - it's in the underlying file storage. + // XXX Need to make this safe. + // XXX Length buffer + ByteBuffer lengthBuffer = ByteBuffer.allocate(SizeOfInt) ; + lengthBuffer.clear() ; int x = file.read(lengthBuffer, loc) ; - if ( x != 4 ) + if ( x != 4 ) { + String msg = "ObjectFileStorage.read["+file.getLabel()+"]("+loc+")[filesize="+filesize+"][file.size()="+file.size()+"]: Failed to read the length : got "+x+" bytes"; + System.err.println(msg) ; + lengthBuffer.clear() ; + int x1 = file.read(lengthBuffer, loc) ; throw new FileException("ObjectFileStorage.read["+file.getLabel()+"]("+loc+")[filesize="+filesize+"][file.size()="+file.size()+"]: Failed to read the length : got "+x+" bytes") ; + } int len = lengthBuffer.getInt(0) ; // Sanity check. if ( len > filesize-(loc+SizeOfInt) ) diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/lib/NodeLib.java b/jena-tdb/src/main/java/org/apache/jena/tdb/lib/NodeLib.java index a4f69392937..240a12ef572 100644 --- a/jena-tdb/src/main/java/org/apache/jena/tdb/lib/NodeLib.java +++ b/jena-tdb/src/main/java/org/apache/jena/tdb/lib/NodeLib.java @@ -36,7 +36,6 @@ import org.apache.jena.riot.out.NodeFmtLib ; import org.apache.jena.sparql.util.NodeUtils ; import org.apache.jena.tdb.TDBException ; -import org.apache.jena.tdb.base.block.Block ; import org.apache.jena.tdb.base.objectfile.ObjectFile ; import org.apache.jena.tdb.base.record.Record ; import org.apache.jena.tdb.store.Hash ; @@ -53,26 +52,32 @@ public class NodeLib // Characters in IRIs that are illegal and cause SSE problems, but we wish to keep. final private static char MarkerChar = '_' ; final private static char[] invalidIRIChars = { MarkerChar , ' ' } ; + final private static int SIZE = 1024; + // Marshalling space. + final private static ByteBuffer workspace = ByteBuffer.allocate(SIZE); + /** Encode and write a {@link Node} to the {@link ObjectFile}. + * Returns the location, suitable for use with {@link #fetchDecode}. + *

+ * Callers must synchonize to ensure writing is not concurrent. + */ public static long encodeStore(Node node, ObjectFile file) { - // Buffer pool? - - // Nodes can be writtern during reads. - // Make sure this operation is sync'ed. int maxSize = nodec.maxSize(node) ; - Block block = file.allocWrite(maxSize) ; - try { - int len = nodec.encode(node, block.getByteBuffer(), null) ; - file.completeWrite(block) ; - return block.getId() ; - } catch (TDBException ex) - { - file.abortWrite(block) ; - throw ex ; - } + ByteBuffer bb = workspace; + if ( maxSize >= SIZE ) + // Large object. Special buffer. + bb = ByteBuffer.allocate(maxSize); + else + bb.clear(); + int len = nodec.encode(node, bb, null) ; + long x = file.write(bb); + return x; } + /** Read and decode a {@link Node} from the {@link ObjectFile}. + * The {@code id} must have originally been generated by {@link #encodeStore}. + */ public static Node fetchDecode(long id, ObjectFile file) { ByteBuffer bb = file.read(id) ; From 80c1d026e02d5a87cf9dc31df0f9ba98b4ce212a Mon Sep 17 00:00:00 2001 From: Andy Seaborne Date: Wed, 4 Apr 2018 16:01:43 +0100 Subject: [PATCH 3/3] Formatting changes and remove dev code --- .../base/objectfile/ObjectFileStorage.java | 419 ++++++++---------- 1 file changed, 197 insertions(+), 222 deletions(-) diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java index 3ad126bfede..f0e8266eca1 100644 --- a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java +++ b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java @@ -18,20 +18,20 @@ package org.apache.jena.tdb.base.objectfile; -import static org.apache.jena.tdb.sys.SystemTDB.ObjectFileWriteCacheSize ; -import static org.apache.jena.tdb.sys.SystemTDB.SizeOfInt ; +import static org.apache.jena.tdb.sys.SystemTDB.ObjectFileWriteCacheSize; +import static org.apache.jena.tdb.sys.SystemTDB.SizeOfInt; -import java.nio.ByteBuffer ; -import java.util.Iterator ; +import java.nio.ByteBuffer; +import java.util.Iterator; -import org.apache.jena.atlas.iterator.Iter ; -import org.apache.jena.atlas.iterator.IteratorSlotted ; -import org.apache.jena.atlas.lib.Pair ; -import org.apache.jena.tdb.base.file.BufferChannel ; -import org.apache.jena.tdb.base.file.FileException ; -import org.apache.jena.tdb.sys.SystemTDB ; -import org.slf4j.Logger ; -import org.slf4j.LoggerFactory ; +import org.apache.jena.atlas.iterator.Iter; +import org.apache.jena.atlas.iterator.IteratorSlotted; +import org.apache.jena.atlas.lib.Pair; +import org.apache.jena.tdb.base.file.BufferChannel; +import org.apache.jena.tdb.base.file.FileException; +import org.apache.jena.tdb.sys.SystemTDB; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Variable length ByteBuffer file on disk. * Buffering for delayed writes. @@ -39,12 +39,11 @@ public class ObjectFileStorage implements ObjectFile { - private static Logger log = LoggerFactory.getLogger(ObjectFileStorage.class) ; - public static boolean logging = false ; - private void log(String fmt, Object... args) - { - if ( ! logging ) return ; - log.debug(state()+" "+String.format(fmt, args)) ; + private static Logger log = LoggerFactory.getLogger(ObjectFileStorage.class); + public static boolean logging = false; + private void log(String fmt, Object... args) { + if ( ! logging ) return; + log.debug(state()+" "+String.format(fmt, args)); } /* @@ -60,93 +59,84 @@ private void log(String fmt, Object... args) */ private final Object lockWriteBuffer = new Object(); - private final ByteBuffer writeBuffer ; + private final ByteBuffer writeBuffer; - private final BufferChannel file ; // Access to storage - private volatile long filesize ; // Size of on-disk. + private final BufferChannel file; // Access to storage + private volatile long filesize; // Size of on-disk. - public ObjectFileStorage(BufferChannel file) - { - this(file, ObjectFileWriteCacheSize) ; + public ObjectFileStorage(BufferChannel file) { + this(file, ObjectFileWriteCacheSize); } - - public ObjectFileStorage(BufferChannel file, int bufferSize) - { - this.file = file ; - filesize = file.size() ; - this.file.position(filesize) ; // End of file. - log("File size: 0x%X, posn: 0x%X", filesize, file.position()) ; - writeBuffer = (bufferSize >= 0) ? ByteBuffer.allocate(bufferSize) : null ; + + public ObjectFileStorage(BufferChannel file, int bufferSize) { + this.file = file; + filesize = file.size(); + this.file.position(filesize); // End of file. + log("File size: 0x%X, posn: 0x%X", filesize, file.position()); + writeBuffer = (bufferSize >= 0) ? ByteBuffer.allocate(bufferSize) : null; } - + @Override - synchronized - public long write(ByteBuffer bb) - { - log("W") ; - - if ( writeBuffer == null ) - { - long x = rawWrite(bb) ; + synchronized public long write(ByteBuffer bb) { + log("W"); + + if ( writeBuffer == null ) { + long x = rawWrite(bb); log("W -> 0x%X", x); - return x ; + return x; } - - int len = bb.limit() - bb.position() ; - int spaceNeeded = len + SizeOfInt ; - - synchronized(lockWriteBuffer) { - if ( writeBuffer.position()+spaceNeeded > writeBuffer.capacity() ) + + int len = bb.limit() - bb.position(); + int spaceNeeded = len + SizeOfInt; + + synchronized (lockWriteBuffer) { + if ( writeBuffer.position() + spaceNeeded > writeBuffer.capacity() ) // No room - flush. - flushOutputBuffer() ; - if ( writeBuffer.position()+spaceNeeded > writeBuffer.capacity() ) - { - long x = rawWrite(bb) ; - if ( logging ) + flushOutputBuffer(); + if ( writeBuffer.position() + spaceNeeded > writeBuffer.capacity() ) { + long x = rawWrite(bb); + if ( logging ) log("W -> 0x%X", x); - return x ; + return x; } - - long loc = writeBuffer.position()+filesize ; - writeBuffer.putInt(len) ; - writeBuffer.put(bb) ; - if ( logging ) + + long loc = writeBuffer.position() + filesize; + writeBuffer.putInt(len); + writeBuffer.put(bb); + if ( logging ) log("W -> 0x%X", loc); - return loc ; + return loc; } } - - // The object length slot. - private ByteBuffer writeLengthBuffer = ByteBuffer.allocate(SizeOfInt) ; - - private long rawWrite(ByteBuffer bb) - { - if ( logging ) - log("RW %s", bb) ; - int len = bb.limit() - bb.position() ; - writeLengthBuffer.rewind() ; - writeLengthBuffer.putInt(len) ; - writeLengthBuffer.flip() ; - long location = file.position() ; - file.write(writeLengthBuffer) ; - int x = file.write(bb) ; - if ( x != len ) - throw new FileException() ; - filesize = filesize+x+SizeOfInt ; - + + // The object length slot. + private ByteBuffer writeLengthBuffer = ByteBuffer.allocate(SizeOfInt); + + private long rawWrite(ByteBuffer bb) { if ( logging ) - { + log("RW %s", bb); + int len = bb.limit() - bb.position(); + writeLengthBuffer.rewind(); + writeLengthBuffer.putInt(len); + writeLengthBuffer.flip(); + long location = file.position(); + file.write(writeLengthBuffer); + int x = file.write(bb); + if ( x != len ) + throw new FileException(); + filesize = filesize + x + SizeOfInt; + + if ( logging ) { log("Posn: %d", file.position()); - log("RW ->0x%X",location) ; + log("RW ->0x%X", location); } - return location ; + return location; } - - private void flushOutputBuffer() - { + + private void flushOutputBuffer() { if ( logging ) - log("Flush") ; - + log("Flush"); + if ( writeBuffer == null ) return; if ( writeBuffer.position() == 0 ) @@ -159,199 +149,184 @@ private void flushOutputBuffer() } @Override - public void reposition(long posn) - { + public void reposition(long posn) { if ( posn < 0 || posn > length() ) - throw new IllegalArgumentException("reposition: Bad location: "+posn) ; - flushOutputBuffer() ; - file.truncate(posn) ; - filesize = posn ; + throw new IllegalArgumentException("reposition: Bad location: " + posn); + flushOutputBuffer(); + file.truncate(posn); + filesize = posn; } @Override - public void truncate(long size) - { - //System.out.println("truncate: "+size+" ("+filesize+","+writeBuffer.position()+")") ; - reposition(size) ; + public void truncate(long size) { + // System.out.println("truncate: "+size+" + // ("+filesize+","+writeBuffer.position()+")"); + reposition(size); } @Override - public ByteBuffer read(long loc) - { - if ( logging ) - log("R(0x%X)", loc) ; - + public ByteBuffer read(long loc) { + if ( logging ) + log("R(0x%X)", loc); + if ( loc < 0 ) - throw new IllegalArgumentException("ObjectFile.read["+file.getLabel()+"]: Bad read: "+loc) ; - + throw new IllegalArgumentException("ObjectFile.read[" + file.getLabel() + "]: Bad read: " + loc); + // Maybe it's in the in the write buffer. - if ( loc >= filesize ) - { - // This path should be uncommon. - synchronized(lockWriteBuffer) { - if ( loc >= filesize+writeBuffer.position() ) - throw new IllegalArgumentException("ObjectFileStorage.read["+file.getLabel()+"]: Bad read: location="+loc+" >= max="+(filesize+writeBuffer.position())) ; - int offset = (int)(loc-filesize) ; - int len = writeBuffer.getInt(offset) ; - int posn = offset + SizeOfInt ; - ByteBuffer bb1 = ByteBuffer.allocate(len) ; - for (int i = 0; i < len; i++) - bb1.put(i, writeBuffer.get(posn+i)); - return bb1 ; + if ( loc >= filesize ) { + // This path should be uncommon. + synchronized (lockWriteBuffer) { + if ( loc >= filesize + writeBuffer.position() ) + throw new IllegalArgumentException("ObjectFileStorage.read[" + file.getLabel() + "]: Bad read: location=" + loc + + " >= max=" + (filesize + writeBuffer.position())); + int offset = (int)(loc - filesize); + int len = writeBuffer.getInt(offset); + int posn = offset + SizeOfInt; + ByteBuffer bb1 = ByteBuffer.allocate(len); + for ( int i = 0; i < len; i++ ) + bb1.put(i, writeBuffer.get(posn + i)); + return bb1; } } - + // No - it's in the underlying file storage. - // XXX Need to make this safe. - // XXX Length buffer - ByteBuffer lengthBuffer = ByteBuffer.allocate(SizeOfInt) ; - - lengthBuffer.clear() ; - int x = file.read(lengthBuffer, loc) ; + ByteBuffer lengthBuffer = ByteBuffer.allocate(SizeOfInt); + + lengthBuffer.clear(); + int x = file.read(lengthBuffer, loc); if ( x != 4 ) { - String msg = "ObjectFileStorage.read["+file.getLabel()+"]("+loc+")[filesize="+filesize+"][file.size()="+file.size()+"]: Failed to read the length : got "+x+" bytes"; - System.err.println(msg) ; - lengthBuffer.clear() ; - int x1 = file.read(lengthBuffer, loc) ; - throw new FileException("ObjectFileStorage.read["+file.getLabel()+"]("+loc+")[filesize="+filesize+"][file.size()="+file.size()+"]: Failed to read the length : got "+x+" bytes") ; + String msg = "ObjectFileStorage.read[" + file.getLabel() + "](" + loc + ")[filesize=" + filesize + "]" + + "[file.size()=" + file.size() + "]: Failed to read the length : got " + x + " bytes"; + lengthBuffer.clear(); + int x1 = file.read(lengthBuffer, loc); + throw new FileException(msg); } - int len = lengthBuffer.getInt(0) ; - // Sanity check. - if ( len > filesize-(loc+SizeOfInt) ) - { - String msg = "ObjectFileStorage.read["+file.getLabel()+"]("+loc+")[filesize="+filesize+"][file.size()="+file.size()+"]: Impossibly large object : "+len+" bytes > filesize-(loc+SizeOfInt)="+(filesize-(loc+SizeOfInt)) ; - SystemTDB.errlog.error(msg) ; - throw new FileException(msg) ; + int len = lengthBuffer.getInt(0); + // Sanity check. + if ( len > filesize - (loc + SizeOfInt) ) { + String msg = "ObjectFileStorage.read[" + file.getLabel() + "](" + loc + ")[filesize=" + filesize + "][file.size()=" + + file.size() + "]: Impossibly large object : " + len + " bytes > filesize-(loc+SizeOfInt)=" + + (filesize - (loc + SizeOfInt)); + throw new FileException(msg); } - - ByteBuffer bb = ByteBuffer.allocate(len) ; + + ByteBuffer bb = ByteBuffer.allocate(len); if ( len == 0 ) // Zero bytes. - return bb ; - x = file.read(bb, loc+SizeOfInt) ; - bb.flip() ; + return bb; + x = file.read(bb, loc + SizeOfInt); + bb.flip(); if ( x != len ) - throw new FileException("ObjectFileStorage.read: Failed to read the object ("+len+" bytes) : got "+x+" bytes") ; - return bb ; + throw new FileException("ObjectFileStorage.read: Failed to read the object (" + len + " bytes) : got " + x + " bytes"); + return bb; } - + @Override - public long length() - { - if ( writeBuffer == null ) return filesize ; - return filesize+writeBuffer.position() ; + public long length() { + if ( writeBuffer == null ) + return filesize; + return filesize + writeBuffer.position(); } - + @Override - public boolean isEmpty() - { - if ( writeBuffer == null ) return filesize == 0 ; - return writeBuffer.position() == 0 && filesize == 0 ; + public boolean isEmpty() { + if ( writeBuffer == null ) + return filesize == 0; + return writeBuffer.position() == 0 && filesize == 0; } - @Override - public void close() { flushOutputBuffer() ; file.close() ; } + public void close() { flushOutputBuffer(); file.close(); } @Override - public void sync() { flushOutputBuffer() ; file.sync() ; } + public void sync() { flushOutputBuffer(); file.sync(); } @Override - public String getLabel() { return file.getLabel() ; } + public String getLabel() { return file.getLabel(); } @Override - public String toString() { return file.getLabel() ; } + public String toString() { return file.getLabel(); } @Override - public Iterator> all() - { - flushOutputBuffer() ; - //file.position(0) ; - ObjectIterator iter = new ObjectIterator(0, filesize) ; - //return iter ; - - if ( writeBuffer == null || writeBuffer.position() == 0 ) return iter ; - return Iter.concat(iter, new BufferIterator(writeBuffer)) ; + public Iterator> all() { + flushOutputBuffer(); + // file.position(0); + ObjectIterator iter = new ObjectIterator(0, filesize); + if ( writeBuffer == null || writeBuffer.position() == 0 ) + return iter; + return Iter.concat(iter, new BufferIterator(writeBuffer)); } - - private String state() - { + + private String state() { if ( writeBuffer == null ) - return String.format(getLabel()+": filesize=0x%X, file=(0x%X, 0x%X)", filesize, file.position(), file.size()) ; + return String.format(getLabel() + ": filesize=0x%X, file=(0x%X, 0x%X)", filesize, file.position(), file.size()); else - return String.format(getLabel()+": filesize=0x%X, file=(0x%X, 0x%X), writeBuffer=(0x%X,0x%X)", filesize, file.position(), file.size(), writeBuffer.position(), writeBuffer.limit()) ; - + return String.format(getLabel() + ": filesize=0x%X, file=(0x%X, 0x%X), writeBuffer=(0x%X,0x%X)", filesize, file.position(), + file.size(), writeBuffer.position(), writeBuffer.limit()); + } - private class BufferIterator extends IteratorSlotted> implements Iterator> - { - private ByteBuffer buffer ; - private int posn ; + private class BufferIterator extends IteratorSlotted> implements Iterator> { + private ByteBuffer buffer; + private int posn; - public BufferIterator(ByteBuffer buffer) - { - this.buffer = buffer ; - this.posn = 0 ; + public BufferIterator(ByteBuffer buffer) { + this.buffer = buffer; + this.posn = 0; } @Override - protected Pair moveToNext() - { + protected Pair moveToNext() { if ( posn >= buffer.limit() ) - return null ; - - int x = buffer.getInt(posn) ; - posn += SystemTDB.SizeOfInt ; - ByteBuffer bb = ByteBuffer.allocate(x) ; - int p = buffer.position() ; - buffer.position(posn) ; - buffer.get(bb.array()) ; + return null; + + int x = buffer.getInt(posn); + posn += SystemTDB.SizeOfInt; + ByteBuffer bb = ByteBuffer.allocate(x); + int p = buffer.position(); + buffer.position(posn); + buffer.get(bb.array()); buffer.position(p); - posn += x ; - return new Pair<>((long)x, bb) ; + posn += x; + return new Pair<>((long)x, bb); } @Override - protected boolean hasMore() - { + protected boolean hasMore() { return posn < buffer.limit(); } - } - - private class ObjectIterator implements Iterator> - { - final private long start ; - final private long finish ; - private long current ; - - public ObjectIterator(long start, long finish) - { - this.start = start ; - this.finish = finish ; - this.current = start ; + + private class ObjectIterator implements Iterator> { + final private long start; + final private long finish; + private long current; + + public ObjectIterator(long start, long finish) { + this.start = start; + this.finish = finish; + this.current = start; } - + @Override - public boolean hasNext() - { - return ( current < finish ) ; + public boolean hasNext() { + return (current < finish); } @Override - public Pair next() - { + public Pair next() { // read, but reserving the file position. - long x = current ; - long filePosn = file.position() ; - ByteBuffer bb = read(current) ; - file.position(filePosn) ; - current = current + bb.limit() + 4 ; - return new Pair<>(x, bb) ; + long x = current; + long filePosn = file.position(); + ByteBuffer bb = read(current); + file.position(filePosn); + current = current + bb.limit() + 4; + return new Pair<>(x, bb); } @Override - public void remove() - { throw new UnsupportedOperationException() ; } + public void remove() { + throw new UnsupportedOperationException(); + } } }