Skip to content

Commit

Permalink
done. AbstractBTree#writeTuple now uses both KEYS and VALS since
Browse files Browse the repository at this point in the history
otherwise insert() and remove() are unable to reconstruct an
application object which is partly encoded within the key.

continued work on a refactor of the performance counter system.

done. Introduced "hot splits" as complementary to moves and taken  When a service is highly utilized (as identified by the LBS) but not
resource constrained.  The purpose of the "hot split" is to split hot indices and thereby increase the potential concurrency.  Moves are complementary because they are also taken when the LBS identifies the service as "highly utilized" if the service IS resource constrained.

done. Fixed references to resources in the file system which were no
longer valid when running the unit tests from the master directory for the project (these are still file references, so the tests will fail if you check out an individual module and run the tests within that module).


git-svn-id: svn+ssh://svn.code.sf.net/p/bigdata/code/trunk@1441 8f7bc0f5-282e-0410-95e3-8d296e9bb460
  • Loading branch information
thompsonbry committed Apr 8, 2009
1 parent 7d4270e commit a9e98eb
Show file tree
Hide file tree
Showing 46 changed files with 5,553 additions and 2,396 deletions.
32 changes: 16 additions & 16 deletions bigdata-rdf/src/test/com/bigdata/rdf/rio/TestLoadAndVerify.java
Expand Up @@ -74,9 +74,9 @@ public TestLoadAndVerify(String name) {
*/
public void test_loadAndVerify_small() throws Exception {

final String file = "src/test/com/bigdata/rdf/rio/small.rdf";
final String resource = "src/test/com/bigdata/rdf/rio/small.rdf";

doLoadAndVerifyTest( file );
doLoadAndVerifyTest( resource );

}

Expand All @@ -87,9 +87,9 @@ public void test_loadAndVerify_small() throws Exception {
*/
public void test_loadAndVerify_sampleData() throws Exception {

final String file = "src/test/com/bigdata/rdf/rio/sample data.rdf";
final String resource = "src/test/com/bigdata/rdf/rio/sample data.rdf";

doLoadAndVerifyTest( file );
doLoadAndVerifyTest( resource );

}

Expand All @@ -101,11 +101,11 @@ public void test_loadAndVerify_modest() throws Exception {

// final String file = "../rdf-data/nciOncology.owl";
final String file = "../rdf-data/alibaba_v41.rdf";
if(!new File(file).exists()) {
fail("Resource not found: "+file+", test skipped: "+getName());

if (!new File(file).exists()) {

fail("Resource not found: " + file + ", test skipped: " + getName());

return;

}
Expand All @@ -119,13 +119,13 @@ public void test_loadAndVerify_modest() throws Exception {
* re-parse that all expected statements were made persistent in the
* database.
*
* @param file
* @param resource
*
* @throws Exception
*/
protected void doLoadAndVerifyTest(String file) throws Exception {
protected void doLoadAndVerifyTest(final String resource) throws Exception {

assertTrue("File not found? file=" + file, new File(file).exists());
// assertTrue("File not found? file=" + resource, new File(resource).exists());

AbstractTripleStore store;
{
Expand Down Expand Up @@ -179,7 +179,7 @@ protected void doLoadAndVerifyTest(String file) throws Exception {
final DataLoader dataLoader = new DataLoader(properties, store);

// load into the datbase.
dataLoader.loadData(file, "" /* baseURI */, RDFFormat.RDFXML);
dataLoader.loadData(resource, "" /* baseURI */, RDFFormat.RDFXML);

// // database-at-once closure (optional for this test).
// store.getInferenceEngine().computeClosure(null/*focusStore*/);
Expand Down Expand Up @@ -207,7 +207,7 @@ protected void doLoadAndVerifyTest(String file) throws Exception {
final int maxerrors = 20;
{

log.info("Verifying all statements found using reparse: file="+file);
log.info("Verifying all statements found using reparse: file="+resource);

// buffer capacity (#of statements per batch).
final int capacity = 100000;
Expand All @@ -216,10 +216,10 @@ protected void doLoadAndVerifyTest(String file) throws Exception {
capacity, nerrs, maxerrors);

loader.loadRdf(new BufferedReader(new InputStreamReader(
new FileInputStream(file))), ""/* baseURI */,
new FileInputStream(resource))), ""/* baseURI */,
RDFFormat.RDFXML, false/* verify */);

log.info("End of reparse: nerrors="+nerrs+", file="+file);
log.info("End of reparse: nerrors="+nerrs+", file="+resource);

}

Expand Down
7 changes: 5 additions & 2 deletions bigdata/src/java/com/bigdata/btree/AbstractBTree.java
Expand Up @@ -1370,9 +1370,12 @@ protected AbstractNode getRootOrFinger(final byte[] key) {

/**
* Private instance used for mutation operations (insert, remove) which are
* single threaded.
* single threaded. Both {@link IRangeQuery#KEYS} and
* {@link IRangeQuery#VALS} are requested so that indices which encode part
* of the application object within the key can recover the application
* object in {@link #insert(Object, Object)} and {@link #remove(Object)}.
*/
public final Tuple writeTuple = new Tuple(this,/*KEYS|*/VALS);
public final Tuple writeTuple = new Tuple(this, KEYS | VALS);

/**
* A {@link ThreadLocal} {@link Tuple} that is used to copy the value
Expand Down
103 changes: 85 additions & 18 deletions bigdata/src/java/com/bigdata/counters/DefaultInstrumentFactory.java
Expand Up @@ -39,32 +39,99 @@
*/
public class DefaultInstrumentFactory implements IInstrumentFactory {

public static final DefaultInstrumentFactory INSTANCE = new DefaultInstrumentFactory();

private DefaultInstrumentFactory() {

/**
* Instance supports overwrite, so the ring buffer will eventually overwrite
* old values as new values arrive. Minutes are migrated onto hours and
* hours are migrated onto days. Up to 30 days of data will be retained.
*/
public static final DefaultInstrumentFactory OVERWRITE_60M = new DefaultInstrumentFactory(
60/* slots */, PeriodEnum.Minutes, true/* overwrite */);

public static final DefaultInstrumentFactory NO_OVERWRITE_60M = new DefaultInstrumentFactory(
60/* slots */, PeriodEnum.Minutes, false/* overwrite */);

private final int nslots;

private final PeriodEnum period;

private final boolean overwrite;

/**
*
* @param nslots
* The #of units of the period to be retained by the
* {@link History}.
* @param period
* The period in which those units are measured.
* @param overwrite
* If the {@link History} may serve as a ring buffer.
*/
public DefaultInstrumentFactory(final int nslots, final PeriodEnum period,
final boolean overwrite) {

if (nslots <= 0)
throw new IllegalArgumentException();

if (period == null)
throw new IllegalArgumentException();

this.nslots = nslots;

this.period = period;

this.overwrite = overwrite;

}

public IInstrument newInstance(Class type) {
public IInstrument newInstance(final Class type) {

if (type == null)
throw new IllegalArgumentException();

if(type==null) throw new IllegalArgumentException();

if (type == Double.class || type == Float.class) {

return new HistoryInstrument<Double>(new Double[] {});

final History<Double> minutes = new History<Double>(
new Double[nslots], period.getPeriodMillis(), overwrite);

if (overwrite) {

// 24 hours in a day
final History<Double> hours = new History<Double>(24, minutes);

// 30 days of history
// final History<Double> days =
new History<Double>(30, hours);

}

return new HistoryInstrument<Double>(minutes);

} else if (type == Long.class || type == Integer.class) {

return new HistoryInstrument<Long>(new Long[] {});

} else if( type == String.class ) {


final History<Long> minutes = new History<Long>(
new Long[nslots], period.getPeriodMillis(), overwrite);

if (overwrite) {

// 24 hours in a day
final History<Long> hours = new History<Long>(24, minutes);

// 30 days of history
// final History<Long> days =
new History<Long>(30, hours);

}

return new HistoryInstrument<Long>(minutes);

} else if (type == String.class) {

return new StringInstrument();

} else {
throw new UnsupportedOperationException("type: "+type);

throw new UnsupportedOperationException("type: " + type);

}

}
Expand Down
53 changes: 39 additions & 14 deletions bigdata/src/java/com/bigdata/counters/History.java
Expand Up @@ -8,10 +8,7 @@


/**
* Retains history for N periods, where the period is expressed in
* milliseconds. When the history would overflow, the average for the last N
* periods is optionally added to another {@link History} which aggregates
* this one.
* Retains history for N periods, where the period is expressed in milliseconds.
* <p>
* This class is thread-safe.
*
Expand Down Expand Up @@ -389,7 +386,7 @@ public IHistoryEntry<T> next() {

current++;

return (IHistoryEntry) entry.clone();
return (IHistoryEntry<T>) entry.clone();

}

Expand Down Expand Up @@ -516,8 +513,6 @@ synchronized public SampleIterator iterator() {
*/
synchronized public String toString() {

final T average = getAverage(); //@todo move to end

final StringBuilder sb = new StringBuilder();

sb.append("{");
Expand All @@ -540,6 +535,8 @@ synchronized public String toString() {

}

final T average = getAverage();

sb.append("},average=" + average + ",n=" + n);

return sb.toString();
Expand Down Expand Up @@ -897,6 +894,16 @@ synchronized public void add(final long timestamp, final T value) {

if (data[ps] != null) {

if(!overwrite) {
/*
* Note: Overwrite is not always desirable - there is a
* ctor option to disable it.
*/
throw new RuntimeException("Would overwrite data: ps="
+ ps + ", capacity=" + capacity + ", size="
+ size);
}

// clear old slot.

size--;
Expand Down Expand Up @@ -977,15 +984,20 @@ synchronized public void add(final long timestamp, final T value) {
* Constructor used at the base collection period.
*
* @param data
* An array whose size is the capacity of the history buffer.
* The contents of the array will be used to store the data. (This
* API requirement arises since generics are fixed at compile time
* rather than runtime.)
* An array whose size is the capacity of the history buffer. The
* contents of the array will be used to store the data. (This
* API requirement arises since generics are fixed at compile
* time rather than runtime.)
* @param period
* The period covered by each slot in milliseconds.
* @param overwrite
* <code>true</code> iff overwrite of slots in the buffer is
* allowed (when <code>false</code> the buffer will fill up and
* then refuse additional samples if they would overwrite slots
* which are in use).
*/
@SuppressWarnings("unchecked")
protected History(final T[] data, final long period) {
public History(final T[] data, final long period, boolean overwrite) {

if (data == null)
throw new IllegalArgumentException();
Expand All @@ -1002,7 +1014,7 @@ protected History(final T[] data, final long period) {

this.period = period;

// this.source = null;
this.overwrite = overwrite;

this.timestamps = new long[capacity];

Expand Down Expand Up @@ -1044,7 +1056,7 @@ protected History(final int capacity, final History<T> source) {

this.period = source.period * capacity;

// this.source = source;
this.overwrite = true;

this.timestamps = new long[capacity];

Expand Down Expand Up @@ -1073,13 +1085,26 @@ protected History(final int capacity, final History<T> source) {

private final long period;

private final boolean overwrite;

private final boolean _numeric;

private final boolean _long;

private final boolean _double;

private History<T> sink;

/**
* The sink on which the history writes when it overflows -or-
* <code>null</code> if no sink has been assigned (it is assigned by the
* alternate ctor).
*/
protected History<T> getSink() {

return sink;

}

/**
* The timestamp of the last sample reported in a given period.
Expand Down

0 comments on commit a9e98eb

Please sign in to comment.