Skip to content

Commit

Permalink
[SYSTEMDS-3592] Frame Compress Sample based
Browse files Browse the repository at this point in the history
This commit change the frame compression to be sample based,
it also change the detect schema back to be sample based.

Closes #1970
  • Loading branch information
Baunsgaard committed Jan 5, 2024
1 parent a9c2980 commit b3aac0d
Show file tree
Hide file tree
Showing 26 changed files with 663 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.apache.sysds.runtime.frame.data.columns;

import java.util.HashMap;
import java.util.Map;

public abstract class ABooleanArray extends Array<Boolean> {

public ABooleanArray(int size) {
Expand All @@ -43,4 +46,19 @@ public ABooleanArray(int size) {
public boolean possiblyContainsNaN(){
return false;
}

@Override
protected Map<Boolean, Long> createRecodeMap() {
Map<Boolean, Long> map = new HashMap<>();
long id = 1;
for(int i = 0; i < size() && id <= 2; i++) {
Boolean val = get(i);
if(val != null) {
Long v = map.putIfAbsent(val, id);
if(v == null)
id++;
}
}
return map;
}
}
124 changes: 114 additions & 10 deletions src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.hadoop.io.Writable;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
import org.apache.sysds.runtime.compress.estim.sample.SampleEstimatorFactory;
import org.apache.sysds.runtime.frame.data.columns.ArrayFactory.FrameArrayType;
import org.apache.sysds.runtime.frame.data.compress.ArrayCompressionStatistics;
Expand Down Expand Up @@ -79,7 +81,8 @@ public final void setCache(SoftReference<Map<T, Long>> m) {

/**
* Get a recode map that maps each unique value in the array, to a long ID. Null values are ignored, and not included
* in the mapping. The resulting recode map in stored in a soft reference to speed up repeated calls to the same column.
* in the mapping. The resulting recode map in stored in a soft reference to speed up repeated calls to the same
* column.
*
* @return A recode map
*/
Expand Down Expand Up @@ -128,7 +131,8 @@ protected Map<T, Long> createRecodeMap() {
protected Map<T, Integer> getDictionary() {
final Map<T, Integer> dict = new HashMap<>();
Integer id = 0;
for(int i = 0; i < size(); i++) {
final int s = size();
for(int i = 0; i < s; i++) {
final T val = get(i);
final Integer v = dict.get(val);
if(v == null)
Expand All @@ -138,6 +142,30 @@ protected Map<T, Integer> getDictionary() {
return dict;
}

/**
* Get the dictionary of contained values, including null with threshold.
*
* If the number of distinct values are found to be above the threshold value, then abort constructing the
* dictionary.
*
* @return a dictionary containing all unique values or null if threshold of distinct is exceeded.
*/
protected Map<T, Integer> tryGetDictionary(int threshold) {
final Map<T, Integer> dict = new HashMap<>();
Integer id = 0;
final int s = size();
for(int i = 0; i < s && id < threshold; i++) {
final T val = get(i);
final Integer v = dict.get(val);
if(v == null)
dict.put(val, id++);
}
if(id >= threshold)
return null;
else
return dict;
}

/**
* Get the number of elements in the array, this does not necessarily reflect the current allocated size.
*
Expand Down Expand Up @@ -233,7 +261,7 @@ public double getAsNaNDouble(int i) {
* @param ru row upper (inclusive)
* @param value value array to take values from (same type)
*/
public void set(int rl, int ru, Array<T> value){
public void set(int rl, int ru, Array<T> value) {
for(int i = rl; i <= ru; i++)
set(i, value.get(i));
}
Expand All @@ -246,7 +274,7 @@ public void set(int rl, int ru, Array<T> value){
* @param value value array to take values from
* @param rlSrc the offset into the value array to take values from
*/
public void set(int rl, int ru, Array<T> value, int rlSrc){
public void set(int rl, int ru, Array<T> value, int rlSrc) {
for(int i = rl, off = rlSrc; i <= ru; i++, off++)
set(i, value.get(off));
}
Expand Down Expand Up @@ -354,7 +382,18 @@ public final void setFromOtherTypeNz(Array<?> value) {
*
* @return A better or equivalent value type to represent the column, including null information.
*/
public abstract Pair<ValueType, Boolean> analyzeValueType();
public final Pair<ValueType, Boolean> analyzeValueType() {
return analyzeValueType(size());
}

/**
* Analyze the column to figure out if the value type can be refined to a better type. The return is in two parts,
* first the type it can be, second if it contains nulls.
*
* @param maxCells maximum number of cells to analyze
* @return A better or equivalent value type to represent the column, including null information.
*/
public abstract Pair<ValueType, Boolean> analyzeValueType(int maxCells);

/**
* Get the internal FrameArrayType, to specify the encoding of the Types, note there are more Frame Array Types than
Expand Down Expand Up @@ -405,7 +444,22 @@ public boolean containsNull() {

public abstract boolean possiblyContainsNaN();

public Array<?> safeChangeType(ValueType t, boolean containsNull){
try{
return changeType(t, containsNull);
}
catch(Exception e){
Pair<ValueType, Boolean> ct = analyzeValueType(); // full analysis
return changeType(ct.getKey(), ct.getValue());
}
}

public Array<?> changeType(ValueType t, boolean containsNull) {
return containsNull ? changeTypeWithNulls(t) : changeType(t);
}

public Array<?> changeTypeWithNulls(ValueType t) {

final ABooleanArray nulls = getNulls();
if(nulls == null)
return changeType(t);
Expand Down Expand Up @@ -520,7 +574,7 @@ public final Array<?> changeType(ValueType t) {
/**
* Change type to a Hash46 array type
*
* @return A Hash64 array
* @return A Hash64 array
*/
protected abstract Array<Object> changeTypeHash64();

Expand Down Expand Up @@ -653,6 +707,12 @@ public boolean equals(Object other) {

}

public double[] extractDouble(double[] ret, int rl, int ru) {
for(int i = rl; i < ru; i++)
ret[i - rl] = getAsDouble(i);
return ret;
}

public abstract boolean equals(Array<T> other);

public ArrayCompressionStatistics statistics(int nSamples) {
Expand All @@ -666,25 +726,69 @@ public ArrayCompressionStatistics statistics(int nSamples) {
else
d.put(key, 1);
}
Pair<ValueType, Boolean> vt = analyzeValueType(nSamples);
if(vt.getKey() == ValueType.UNKNOWN)
vt = analyzeValueType();

if(vt.getKey() == ValueType.UNKNOWN)
vt = new Pair<>(ValueType.STRING, false);

final int[] freq = new int[d.size()];
int id = 0;
for(Entry<T, Integer> e : d.entrySet())
freq[id++] = e.getValue();

int estDistinct = SampleEstimatorFactory.distinctCount(freq, size(), nSamples);
long memSize = getInMemorySize(); // uncompressed size
int memSizePerElement = (int) ((memSize * 8L) / size());

// memory size is different depending on valuetype.
long memSize = vt.getKey() != getValueType() ? //
ArrayFactory.getInMemorySize(vt.getKey(), size(), containsNull()) : //
getInMemorySize(); // uncompressed size

int memSizePerElement;
switch(vt.getKey()) {
case UINT4:
case UINT8:
case INT32:
case FP32:
memSizePerElement = 4;
break;
case INT64:
case FP64:
case HASH64:
memSizePerElement = 8;
break;
case CHARACTER:
memSizePerElement = 2;
break;
case BOOLEAN:
memSizePerElement = 1;
case UNKNOWN:
case STRING:
default:
memSizePerElement = (int) (memSize / size());
}

long ddcSize = DDCArray.estimateInMemorySize(memSizePerElement, estDistinct, size());

if(ddcSize < memSize)
return new ArrayCompressionStatistics(memSizePerElement, //
estDistinct, true, getValueType(), FrameArrayType.DDC, memSize, ddcSize);

estDistinct, true, vt.getKey(), vt.getValue(), FrameArrayType.DDC, getInMemorySize(), ddcSize);
else if(vt.getKey() != getValueType() )
return new ArrayCompressionStatistics(memSizePerElement, //
estDistinct, true, vt.getKey(), vt.getValue(), null, getInMemorySize(), memSize);
return null;
}

public AMapToData createMapping(Map<T, Integer> d) {
final int s = size();
final AMapToData m = MapToFactory.create(s, d.size());

for(int i = 0; i < s; i++)
m.set(i, d.get(get(i)));
return m;
}

public class ArrayIterator implements Iterator<T> {
int index = -1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
import org.apache.sysds.utils.MemoryEstimates;

public interface ArrayFactory {
Expand Down Expand Up @@ -309,8 +310,12 @@ else if(target != null && target.size() < rlen)
if(src.getFrameArrayType() == FrameArrayType.OPTIONAL)
target = allocateOptional(src.getValueType(), rlen);
else if(src.getFrameArrayType() == FrameArrayType.DDC) {
Array<?> ddcDict = ((DDCArray<?>) src).getDict();
if(ddcDict.getFrameArrayType() == FrameArrayType.OPTIONAL) {
final DDCArray<?> ddcA = ((DDCArray<?>) src);
final Array<?> ddcDict = ddcA.getDict();
if(ddcDict == null){ // read empty dict.
target = new DDCArray<>(null, MapToFactory.create(rlen, ddcA.getMap().getUnique()));
}
else if(ddcDict.getFrameArrayType() == FrameArrayType.OPTIONAL) {
target = allocateOptional(src.getValueType(), rlen);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ public ValueType getValueType() {
}

@Override
public Pair<ValueType, Boolean> analyzeValueType() {
public Pair<ValueType, Boolean> analyzeValueType(int maxCells) {
return new Pair<>(ValueType.BOOLEAN, false);
}

Expand Down Expand Up @@ -512,7 +512,7 @@ public boolean isShallowSerialize() {

@Override
public boolean isEmpty() {
for(int i = 0; i < _data.length; i++)
for(int i = 0; i < _size / 64 + 1; i++)
if(_data[i] != 0L)
return false;
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public ValueType getValueType() {
}

@Override
public Pair<ValueType, Boolean> analyzeValueType() {
public Pair<ValueType, Boolean> analyzeValueType(int maxCells) {
return new Pair<>(ValueType.BOOLEAN, false);
}

Expand Down Expand Up @@ -312,15 +312,15 @@ public double getAsDouble(int i) {

@Override
public boolean isEmpty() {
for(int i = 0; i < _data.length; i++)
for(int i = 0; i < _size; i++)
if(_data[i])
return false;
return true;
}

@Override
public boolean isAllTrue() {
for(int i = 0; i < _data.length; i++)
for(int i = 0; i < _size; i++)
if(!_data[i])
return false;
return true;
Expand Down Expand Up @@ -375,7 +375,7 @@ public boolean equals(Array<Boolean> other) {

@Override
public String toString() {
StringBuilder sb = new StringBuilder(_data.length * 2 + 10);
StringBuilder sb = new StringBuilder(_size * 2 + 10);
sb.append(super.toString() + ":[");
for(int i = 0; i < _size - 1; i++)
sb.append((_data[i] ? 1 : 0) + ",");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public ValueType getValueType() {
}

@Override
public Pair<ValueType, Boolean> analyzeValueType() {
public Pair<ValueType, Boolean> analyzeValueType(int maxCells) {
return new Pair<>(ValueType.CHARACTER, false);
}

Expand Down Expand Up @@ -308,7 +308,7 @@ else if(FrameUtil.isIntType(value, value.length()) != null)

@Override
public boolean isEmpty() {
for(int i = 0; i < _data.length; i++)
for(int i = 0; i < _size; i++)
if(_data[i] != 0)
return false;
return true;
Expand Down Expand Up @@ -357,7 +357,7 @@ public boolean possiblyContainsNaN(){

@Override
public String toString() {
StringBuilder sb = new StringBuilder(_data.length * 2 + 15);
StringBuilder sb = new StringBuilder(_size * 2 + 15);
sb.append(super.toString());
sb.append(":[");
for(int i = 0; i < _size - 1; i++) {
Expand Down
Loading

0 comments on commit b3aac0d

Please sign in to comment.