Skip to content

Commit

Permalink
MONDRIAN: Introduce SegmentCacheManager, and actor (aka active object…
Browse files Browse the repository at this point in the history
…) that

    manages the global (JVM) and external cache.

    This is a development checkpoint. The code builds and runs, but fails many
    tests; in particular, it runs out of memory because we need to limit the
    number of cell requests passed from the statement to the actor in each load
    request.

    Also, there are many clean-ups left to do. See the long list in the javadoc
    of SegmentCacheManager.

    Details:

    1. Move functionality Aggregation to Segment. Long-term, Aggregation
    should not be used as a 'gatekeeper' to Segment, and maybe should not exist
    at all. Remove Aggregation fields columns and axes.

    2. Rename Aggregation.Axis to SegmentAxis.

    3. Remove Segment.setData and instead split out subclass
    SegmentWithData. Now segment is immutable. You don't have to wait for its
    state to change. You wait for a Future<SegmentWithData> to become
    ready.

    4. Remove methods: RolapCube.checkAggregateModifications,
    RolapStar.checkAggregateModifications,
    RolapSchema.checkAggregateModifications,
    RolapStar.pushAggregateModificationsToGlobalCache,
    RolapSchema.pushAggregateModificationsToGlobalCache,
    RolapCube.pushAggregateModificationsToGlobalCache.

    5. Add new implementations of Future: CompletedFuture and SlotFuture.

[git-p4: depot-paths = "//open/mondrian-release/pacino/": change = 14762]
  • Loading branch information
julianhyde committed Nov 14, 2011
1 parent c1ce101 commit 600e17f
Show file tree
Hide file tree
Showing 42 changed files with 3,123 additions and 1,744 deletions.
172 changes: 161 additions & 11 deletions src/main/mondrian/olap/Util.java
Expand Up @@ -31,6 +31,7 @@
import org.olap4j.mdx.*;

import java.io.*;
import java.lang.ref.Reference;
import java.lang.reflect.*;
import java.math.BigDecimal;
import java.net.MalformedURLException;
Expand Down Expand Up @@ -182,19 +183,34 @@ public static <T> boolean isSorted(List<T> list) {

/**
* Parses a string and returns a SHA-256 checksum of it.
* @param source The source string to parse.
*
* @param value The source string to parse.
* @return A checksum of the source string.
*/
public static byte[] checksumSha256(String source) {
MessageDigest algorithm;
public static byte[] digestSha256(String value) {
final MessageDigest algorithm;
try {
algorithm = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
algorithm.reset();
algorithm.update(source.getBytes());
return algorithm.digest();
return algorithm.digest(value.getBytes());
}

/**
* Creates an MD5 hash of a String.
*
* @param value String to create one way hash upon.
* @return MD5 hash.
*/
public static byte[] digestMd5(final String value) {
final MessageDigest algorithm;
try {
algorithm = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
return algorithm.digest(value.getBytes());
}

/**
Expand Down Expand Up @@ -2021,6 +2037,32 @@ public static RuntimeException newElementNotFoundException(
return newError(type + " '" + identifierNode + "' not found");
}

/**
* Calls {@link java.util.concurrent.Future#get()} and converts any
* throwable into a non-checked exception.
*
* @param future Future
* @param message Message to qualify wrapped exception
* @param <T> Result type
* @return Result
*/
public static <T> T safeGet(Future<T> future, String message) {
try {
return future.get();
} catch (InterruptedException e) {
throw newError(e, message);
} catch (ExecutionException e) {
final Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else if (cause instanceof Error) {
throw (Error) cause;
} else {
throw newError(cause, message);
}
}
}

public static class ErrorCellValue {
public String toString() {
return "#ERR";
Expand Down Expand Up @@ -2817,15 +2859,41 @@ public static String readFully(final Reader rdr, final int bufferSize)
final char[] buffer = new char[bufferSize];
final StringBuilder buf = new StringBuilder(bufferSize);

int len = rdr.read(buffer);
while (len != -1) {
int len;
while ((len = rdr.read(buffer)) != -1) {
buf.append(buffer, 0, len);
len = rdr.read(buffer);
}

return buf.toString();
}

/**
* Reads an input stream until it returns EOF and returns the contents as an
* array of bytes.
*
* @param in Input stream
* @param bufferSize size of buffer to allocate for reading.
* @return content of stream as an array of bytes
* @throws IOException on I/O error
*/
public static byte[] readFully(final InputStream in, final int bufferSize)
throws IOException
{
if (bufferSize <= 0) {
throw new IllegalArgumentException(
"Buffer size must be greater than 0");
}

final byte[] buffer = new byte[bufferSize];
final ByteArrayOutputStream baos =
new ByteArrayOutputStream(bufferSize);

int len;
while ((len = in.read(buffer)) != -1) {
baos.write(buffer, 0, len);
}
return baos.toByteArray();
}

/**
* Returns the contents of a URL, substituting tokens.
*
Expand Down Expand Up @@ -2899,7 +2967,7 @@ public static String readURL(
*
* @param url String
* @return Apache VFS FileContent for further processing
* @throws FileSystemException
* @throws FileSystemException on error
*/
public static InputStream readVirtualFile(String url)
throws FileSystemException
Expand Down Expand Up @@ -2973,6 +3041,25 @@ public static InputStream readVirtualFile(String url)
return fileContent.getInputStream();
}

public static String readVirtualFileAsString(
String catalogUrl)
throws IOException
{
InputStream in = readVirtualFile(catalogUrl);
try {
final byte[] bytes = Util.readFully(in, 1024);
final char[] chars = new char[bytes.length];
for (int i = 0; i < chars.length; i++) {
chars[i] = (char) bytes[i];
}
return new String(chars);
} finally {
if (in != null) {
in.close();
}
}
}

/**
* Converts a {@link Properties} object to a string-to-string {@link Map}.
*
Expand Down Expand Up @@ -3462,6 +3549,7 @@ public static Cube getDimensionCube(Dimension dimension) {
}
return null;
}

public static abstract class AbstractFlatList<T>
implements List<T>, RandomAccess
{
Expand Down Expand Up @@ -3745,6 +3833,68 @@ public Object[] toArray() {
}
}

/**
* Garbage-collecting iterator. Iterates over a collection of references,
* and if any of the references has been garbage-collected, removes it from
* the collection.
*
* @param <T> Element type
*/
public static class GcIterator<T> implements Iterator<T> {
private final Iterator<? extends Reference<T>> iterator;
private boolean hasNext;
private T next;

public GcIterator(Iterator<? extends Reference<T>> iterator) {
this.iterator = iterator;
this.hasNext = true;
moveToNext();
}

/**
* Creates an iterator over a collection of references.
*
* @param referenceIterable Collection of references
* @param <T2> element type
* @return iterable over collection
*/
public static <T2> Iterable<T2> over(
final Iterable<? extends Reference<T2>> referenceIterable)
{
return new Iterable<T2>() {
public Iterator<T2> iterator() {
return new GcIterator<T2>(referenceIterable.iterator());
}
};
}

private void moveToNext() {
while (iterator.hasNext()) {
final Reference<T> ref = iterator.next();
next = ref.get();
if (next != null) {
return;
}
iterator.remove();
}
hasNext = false;
}

public boolean hasNext() {
return hasNext;
}

public T next() {
final T next1 = next;
moveToNext();
return next1;
}

public void remove() {
throw new UnsupportedOperationException();
}
}

public static interface Functor1<RT, PT> {
RT apply(PT param);
}
Expand Down

0 comments on commit 600e17f

Please sign in to comment.