Skip to content

Commit

Permalink
[SYSTEMDS-3592] Frame Compress
Browse files Browse the repository at this point in the history
This commit adds a compression pipeline for frames to first analyze a
sample, that then is used to determine compression of individual columns.
The distinct estimation tools of the matrix compression frame work
is used.
Next step is parallelization of the compression.

Closes #1856
  • Loading branch information
Baunsgaard committed Jul 5, 2023
1 parent 52aa431 commit f0f8f0c
Show file tree
Hide file tree
Showing 19 changed files with 558 additions and 60 deletions.
6 changes: 5 additions & 1 deletion src/main/java/org/apache/sysds/api/mlcontext/MLContext.java
Expand Up @@ -73,6 +73,9 @@ public class MLContext implements ConfigurableAPI
*/
private static MLContext activeMLContext = null;

/** Welcome message */
public static boolean welcomePrint = false;

/**
* Contains cleanup methods used by MLContextProxy.
*/
Expand Down Expand Up @@ -262,8 +265,9 @@ private void initMLContext(SparkSession spark) {
}
}

if (activeMLContext == null) {
if (!welcomePrint) {
System.out.println(MLContextUtil.welcomeMessage());
welcomePrint = true;
}

this.spark = spark;
Expand Down
Expand Up @@ -30,8 +30,20 @@ public interface SampleEstimatorFactory {

public enum EstimationType {
HassAndStokes, ShlosserEstimator, //
ShlosserJackknifeEstimator, SmoothedJackknifeEstimator,
HassAndStokesNoSolveCache,
ShlosserJackknifeEstimator, SmoothedJackknifeEstimator, HassAndStokesNoSolveCache,
}

/**
* Estimate a distinct number of values based on frequencies.
*
* @param frequencies A list of frequencies of unique values, Note all values contained should be larger than zero
* @param nRows The total number of rows to consider, Note should always be larger or equal to sum(frequencies)
* @param sampleSize The size of the sample, Note this should ideally be scaled to match the sum(frequencies) and
* should always be lower or equal to nRows
* @return A estimated number of unique values
*/
public static int distinctCount(int[] frequencies, int nRows, int sampleSize) {
return distinctCount(frequencies, nRows, sampleSize, EstimationType.HassAndStokes, null);
}

/**
Expand Down
Expand Up @@ -86,7 +86,7 @@ public class FrameBlock implements CacheBlock<FrameBlock>, Externalizable {
/** Buffer size variable: 1M elements, size of default matrix block */
public static final int BUFFER_SIZE = 1 * 1000 * 1000;

/** If debugging is enabled for the FrameBlocks in stable state*/
/** If debugging is enabled for the FrameBlocks in stable state */
public static boolean debug = false;

/** The schema of the data frame as an ordered list of value types */
Expand Down Expand Up @@ -197,6 +197,55 @@ public FrameBlock(ValueType[] schema, String[] colNames, ColumnMetadata[] meta,
_nRow = data[0].size();
}

/**
* Create a FrameBlock containing columns of the specified arrays
*
* @param data The column data contained
*/
public FrameBlock(Array<?>[] data) {
_schema = new ValueType[data.length];
for(int i = 0; i < data.length; i++)
_schema[i] = data[i].getValueType();

_colnames = null;
ensureAllocateMeta();
_coldata = data;
_nRow = data[0].size();

if(debug) {
for(int i = 0; i < data.length; i++) {
if(data[i].size() != getNumRows())
throw new DMLRuntimeException(
"Invalid Frame allocation with different size arrays " + data[i].size() + " vs " + getNumRows());
}
}
}

/**
* Create a FrameBlock containing columns of the specified arrays and names
*
* @param data The column data contained
* @param colnames The column names of the contained columns
*/
public FrameBlock(Array<?>[] data, String[] colnames) {
_schema = new ValueType[data.length];
for(int i = 0; i < data.length; i++)
_schema[i] = data[i].getValueType();

_colnames = colnames;
ensureAllocateMeta();
_coldata = data;
_nRow = data[0].size();

if(debug) {
for(int i = 0; i < data.length; i++) {
if(data[i].size() != getNumRows())
throw new DMLRuntimeException(
"Invalid Frame allocation with different size arrays " + data[i].size() + " vs " + getNumRows());
}
}
}

/**
* Get the number of rows of the frame block.
*
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.apache.sysds.runtime.frame.data.columns;

import org.apache.sysds.runtime.compress.DMLCompressionException;
import org.apache.sysds.runtime.frame.data.compress.ArrayCompressionStatistics;

/**
* A Compressed Array, in general does not allow us to set or modify the array.
Expand Down Expand Up @@ -102,4 +103,10 @@ public void reset(int size) {
throw new DMLCompressionException("Invalid to reset compressed array");
}

@Override
public ArrayCompressionStatistics statistics(int nSamples) {
// already compressed
return null;
}

}
Expand Up @@ -23,14 +23,17 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.estim.sample.SampleEstimatorFactory;
import org.apache.sysds.runtime.frame.data.columns.ArrayFactory.FrameArrayType;
import org.apache.sysds.runtime.frame.data.compress.ArrayCompressionStatistics;
import org.apache.sysds.runtime.matrix.data.Pair;

/**
Expand Down Expand Up @@ -97,14 +100,15 @@ public Map<T, Long> getRecodeMap() {

/**
* Recreate the recode map from what is already there.
*
* @return
*/
protected Map<T, Long> createRecodeMap(){
protected Map<T, Long> createRecodeMap() {
Map<T, Long> map = new HashMap<>();
long id = 0;
for(int i = 0; i < size(); i++) {
T val = get(i);
if(val != null){
if(val != null) {
Long v = map.putIfAbsent(val, id);
if(v == null)
id++;
Expand All @@ -113,19 +117,18 @@ protected Map<T, Long> createRecodeMap(){
return map;
}


/**
* Get the dictionary of the contained values, including null.
*
* @return a dictionary containing all unique values.
*/
protected Map<T, Integer> getDictionary(){
protected Map<T, Integer> getDictionary() {
Map<T, Integer> dict = new HashMap<>();
int id = 0;
for(int i = 0 ; i < size(); i ++){
for(int i = 0; i < size(); i++) {
T val = get(i);
Integer v = dict.putIfAbsent(val, id);
if(v== null)
if(v == null)
id++;
}

Expand Down Expand Up @@ -371,7 +374,7 @@ public ABooleanArray getNulls() {
*
* @return If the array contains null.
*/
public boolean containsNull(){
public boolean containsNull() {
return false;
}

Expand Down Expand Up @@ -424,7 +427,7 @@ public final Array<?> changeType(ValueType t) {
return changeTypeFloat();
case FP64:
return changeTypeDouble();
case UINT4:
case UINT4:
case UINT8:
throw new NotImplementedException();
case INT32:
Expand Down Expand Up @@ -556,7 +559,7 @@ public Pair<Integer, Integer> getMinMaxLength() {
*
* @param select Modify this to true in indexes that are not empty.
*/
public final void findEmpty(boolean[] select){
public final void findEmpty(boolean[] select) {
for(int i = 0; i < select.length; i++)
if(isNotEmpty(i))
select[i] = true;
Expand Down Expand Up @@ -592,28 +595,57 @@ public String toString() {
}

/**
* Hash the given index of the array.
* It is allowed to return NaN on null elements.
* Hash the given index of the array. It is allowed to return NaN on null elements.
*
* @param idx The index to hash
* @return The hash value of that index.
*/
public abstract double hashDouble(int idx);

public ArrayIterator getIterator(){
public ArrayIterator getIterator() {
return new ArrayIterator();
}

public ArrayCompressionStatistics statistics(int nSamples) {

Map<T, Integer> d = new HashMap<>();
for(int i = 0; i < nSamples; i++) {
// super inefficient, but startup
T key = get(i);
if(d.containsKey(key))
d.put(key, d.get(key) + 1);
else
d.put(key, 1);
}

final int[] freq = new int[d.size()];
int id = 0;
for(Entry<T, Integer> e : d.entrySet())
freq[id++] = e.getValue();

int estDistinct = SampleEstimatorFactory.distinctCount(freq, size(), nSamples);
long memSize = getInMemorySize(); // uncompressed size
int memSizePerElement = (int) ((memSize * 8L) / size());

long ddcSize = DDCArray.estimateInMemorySize(memSizePerElement, estDistinct, size());

if(ddcSize < memSize)
return new ArrayCompressionStatistics(memSizePerElement, //
estDistinct, true, FrameArrayType.DDC, memSize, ddcSize);

return null;
}

public class ArrayIterator implements Iterator<T> {
int index = -1;

public int getIndex(){
public int getIndex() {
return index;
}

@Override
public boolean hasNext() {
return index < size()-1;
return index < size() - 1;
}

@Override
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.frame.data.columns.ArrayFactory.FrameArrayType;
import org.apache.sysds.runtime.frame.data.compress.ArrayCompressionStatistics;
import org.apache.sysds.runtime.matrix.data.Pair;
import org.apache.sysds.runtime.util.UtilFunctions;
import org.apache.sysds.utils.MemoryEstimates;
Expand Down Expand Up @@ -539,10 +540,16 @@ public static String longToBits(long l) {
}

@Override
public double hashDouble(int idx){
public double hashDouble(int idx) {
return get(idx) ? 1.0 : 0.0;
}

@Override
public ArrayCompressionStatistics statistics(int nSamples) {
// Unlikely to compress so lets just say... no
return null;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder(_size + 10);
Expand Down
Expand Up @@ -28,6 +28,7 @@

import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.frame.data.columns.ArrayFactory.FrameArrayType;
import org.apache.sysds.runtime.frame.data.compress.ArrayCompressionStatistics;
import org.apache.sysds.runtime.matrix.data.Pair;
import org.apache.sysds.runtime.util.UtilFunctions;
import org.apache.sysds.utils.MemoryEstimates;
Expand Down Expand Up @@ -339,10 +340,16 @@ public static boolean parseBoolean(String value) {
}

@Override
public double hashDouble(int idx){
public double hashDouble(int idx) {
return get(idx) ? 1.0 : 0.0;
}

@Override
public ArrayCompressionStatistics statistics(int nSamples) {
// Unlikely to compress so lets just say... no
return null;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder(_data.length * 2 + 10);
Expand Down

0 comments on commit f0f8f0c

Please sign in to comment.