Skip to content

Commit

Permalink
[SYSTEMDS-3462] FrameBlock Iterators Factory Pattern
Browse files Browse the repository at this point in the history
This commit moves the iterators out of the frame, and to a factory pattern.
This is to in the future allow the customized column allocations to
iterate nicely, and therefore we need specialized code to return different
instances of iterators returned, and therefore to no bloat the code
internally in the FrameBlock we move this logic out.
  • Loading branch information
Baunsgaard committed Nov 9, 2022
1 parent 5ad0e54 commit b0cdf5a
Show file tree
Hide file tree
Showing 18 changed files with 452 additions and 292 deletions.
267 changes: 10 additions & 257 deletions src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
import org.apache.sysds.runtime.frame.data.columns.ColumnMetadata;
import org.apache.sysds.runtime.frame.data.columns.StringArray;
import org.apache.sysds.runtime.frame.data.iterators.IteratorFactory;
import org.apache.sysds.runtime.functionobjects.ValueComparisonFunction;
import org.apache.sysds.runtime.instructions.cp.BooleanObject;
import org.apache.sysds.runtime.instructions.cp.DoubleObject;
Expand Down Expand Up @@ -681,137 +682,6 @@ public void setColumn(int c, Array column) {
_msize = -1;
}

/**
* Get a row iterator over the frame where all fields are encoded
* as strings independent of their value types.
*
* @return string array iterator
*/
public Iterator<String[]> getStringRowIterator() {
return new StringRowIterator(0, _numRows);
}

/**
* Get a row iterator over the frame where all selected fields are
* encoded as strings independent of their value types.
*
* @param cols column selection, 1-based
* @return string array iterator
*/
public Iterator<String[]> getStringRowIterator(int[] cols) {
return new StringRowIterator(0, _numRows, cols);
}

/**
* Get a row iterator over the frame where all selected fields are encoded as strings independent of their value
* types.
*
* @param colID column selection, 1-based
* @return string array iterator
*/
public Iterator<String[]> getStringRowIterator(int colID) {
return new StringRowIterator(0, _numRows, new int[] {colID});
}

/**
* Get a row iterator over the frame where all fields are encoded
* as strings independent of their value types.
*
* @param rl lower row index
* @param ru upper row index
* @return string array iterator
*/
public Iterator<String[]> getStringRowIterator(int rl, int ru) {
return new StringRowIterator(rl, ru);
}

/**
* Get a row iterator over the frame where all selected fields are
* encoded as strings independent of their value types.
*
* @param rl lower row index
* @param ru upper row index
* @param cols column selection, 1-based
* @return string array iterator
*/
public Iterator<String[]> getStringRowIterator(int rl, int ru, int[] cols) {
return new StringRowIterator(rl, ru, cols);
}

/**
* Get a row iterator over the frame where all selected fields are
* encoded as strings independent of their value types.
*
* @param rl lower row index
* @param ru upper row index
* @param colID columnID, 1-based
* @return string array iterator
*/
public Iterator<String[]> getStringRowIterator(int rl, int ru, int colID) {
return new StringRowIterator(rl, ru, new int[] {colID});
}


/**
* Get a row iterator over the frame where all fields are encoded
* as boxed objects according to their value types.
*
* @return object array iterator
*/
public Iterator<Object[]> getObjectRowIterator() {
return new ObjectRowIterator(0, _numRows);
}

/**
* Get a row iterator over the frame where all fields are encoded
* as boxed objects according to the value types of the provided
* target schema.
*
* @param schema target schema of objects
* @return object array iterator
*/
public Iterator<Object[]> getObjectRowIterator(ValueType[] schema) {
ObjectRowIterator iter = new ObjectRowIterator(0, _numRows);
iter.setSchema(schema);
return iter;
}

/**
* Get a row iterator over the frame where all selected fields are
* encoded as boxed objects according to their value types.
*
* @param cols column selection, 1-based
* @return object array iterator
*/
public Iterator<Object[]> getObjectRowIterator(int[] cols) {
return new ObjectRowIterator(0, _numRows, cols);
}

/**
* Get a row iterator over the frame where all fields are encoded
* as boxed objects according to their value types.
*
* @param rl lower row index
* @param ru upper row index
* @return object array iterator
*/
public Iterator<Object[]> getObjectRowIterator(int rl, int ru) {
return new ObjectRowIterator(rl, ru);
}

/**
* Get a row iterator over the frame where all selected fields are
* encoded as boxed objects according to their value types.
*
* @param rl lower row index
* @param ru upper row index
* @param cols column selection, 1-based
* @return object array iterator
*/
public Iterator<Object[]> getObjectRowIterator(int rl, int ru, int[] cols) {
return new ObjectRowIterator(rl, ru, cols);
}

///////
// serialization / deserialization (implementation of writable and externalizable)
// FIXME for FrameBlock fix write and readFields, it does not work if the Arrays are not yet
Expand Down Expand Up @@ -1310,7 +1180,7 @@ public FrameBlock append( FrameBlock that, FrameBlock ret, boolean cbind ) {
ret._coldata = new Array[getNumColumns()];
for( int j=0; j<getNumColumns(); j++ )
ret._coldata[j] = _coldata[j].clone();
Iterator<Object[]> iter = that.getObjectRowIterator(_schema);
Iterator<Object[]> iter = IteratorFactory.getObjectRowIterator(that, _schema);
while( iter.hasNext() )
ret.appendRow(iter.next());
}
Expand Down Expand Up @@ -1480,123 +1350,6 @@ public FrameBlock getSchemaTypeOf() {
return fb;
}

///////
// row iterators (over strings and boxed objects)

private abstract class RowIterator<T> implements Iterator<T[]> {
protected final int[] _cols;
protected final T[] _curRow;
protected final int _maxPos;
protected int _curPos = -1;

protected RowIterator(int rl, int ru) {
this(rl, ru, UtilFunctions.getSeqArray(1, getNumColumns(), 1));
}

protected RowIterator(int rl, int ru, int[] cols) {
_curRow = createRow(cols.length);
_cols = cols;
_maxPos = ru;
_curPos = rl;
}

@Override
public boolean hasNext() {
return (_curPos < _maxPos);
}

@Override
public void remove() {
throw new RuntimeException("RowIterator.remove is unsupported!");
}

protected abstract T[] createRow(int size);
}

private class StringRowIterator extends RowIterator<String> {
public StringRowIterator(int rl, int ru) {
super(rl, ru);
}

public StringRowIterator(int rl, int ru, int[] cols) {
super(rl, ru, cols);
}

@Override
protected String[] createRow(int size) {
return new String[size];
}

@Override
public String[] next( ) {
for( int j=0; j<_cols.length; j++ ) {
Object tmp = get(_curPos, _cols[j]-1);
_curRow[j] = (tmp!=null) ? tmp.toString() : null;
}
_curPos++;
return _curRow;
}
}

private class ObjectRowIterator extends RowIterator<Object> {
private ValueType[] _tgtSchema = null;

public ObjectRowIterator(int rl, int ru) {
super(rl, ru);
}

public ObjectRowIterator(int rl, int ru, int[] cols) {
super(rl, ru, cols);
}

public void setSchema(ValueType[] schema) {
_tgtSchema = schema;
}

@Override
protected Object[] createRow(int size) {
return new Object[size];
}

@Override
public Object[] next( ) {
for( int j=0; j<_cols.length; j++ )
_curRow[j] = getValue(_curPos, _cols[j]-1);
_curPos++;
return _curRow;
}

private Object getValue(int i, int j) {
Object val = get(i, j);
if( _tgtSchema != null )
val = UtilFunctions.objectToObject(_tgtSchema[j], val);
return val;
}
}

private static ValueType isType(String val) {
val = val.trim().toLowerCase().replaceAll("\"", "");
if (val.matches("(true|false|t|f|0|1)"))
return ValueType.BOOLEAN;
else if (val.matches("[-+]?\\d+")){
long maxValue = Long.parseLong(val);
if ((maxValue >= Integer.MIN_VALUE) && (maxValue <= Integer.MAX_VALUE))
return ValueType.INT32;
else
return ValueType.INT64;
}
else if (val.matches("[-+]?[0-9]*\\.?[0-9]+([eE][-+]?[0-9]+)?")){
double maxValue = Double.parseDouble(val);
if ((maxValue >= (-Float.MAX_VALUE)) && (maxValue <= Float.MAX_VALUE))
return ValueType.FP32;
else
return ValueType.FP64;
}
else if (val.equals("infinity") || val.equals("-infinity") || val.equals("nan"))
return ValueType.FP64;
else return ValueType.STRING;
}

public FrameBlock detectSchemaFromRow(double sampleFraction) {
int rows = this.getNumRows();
int cols = this.getNumColumns();
Expand Down Expand Up @@ -1648,7 +1401,7 @@ public String call() {
int randomIndex = ThreadLocalRandom.current().nextInt(0, _rows - 1);
String dataValue = ((_obj.get(randomIndex) != null)?_obj.get(randomIndex).toString().trim().replace("\"", "").toLowerCase():null);
if(dataValue != null){
ValueType current = isType(dataValue);
ValueType current = FrameUtil.isType(dataValue);
if (current == ValueType.STRING) {
state = ValueType.STRING;
break;
Expand Down Expand Up @@ -1683,7 +1436,7 @@ public FrameBlock dropInvalidType(FrameBlock schema) {
if(this.getNumColumns() != schema.getNumColumns())
throw new DMLException("mismatch in number of columns in frame and its schema "+this.getNumColumns()+" != "+schema.getNumColumns());

String[] schemaString = schema.getStringRowIterator().next(); // extract the schema in String array
String[] schemaString = IteratorFactory.getStringRowIterator(this).next(); // extract the schema in String array
for (int i = 0; i < this.getNumColumns(); i++) {
Array obj = this.getColumn(i);
String schemaCol = schemaString[i];
Expand All @@ -1705,7 +1458,7 @@ public FrameBlock dropInvalidType(FrameBlock schema) {
continue;
String dataValue = obj.get(j).toString().trim().replace("\"", "").toLowerCase() ;

ValueType dataType = isType(dataValue);
ValueType dataType = FrameUtil.isType(dataValue);

if(!dataType.toString().contains(type) && !(dataType == ValueType.BOOLEAN && type.equals("INT")) &&
!(dataType == ValueType.BOOLEAN && type.equals("FP"))){
Expand Down Expand Up @@ -1752,8 +1505,8 @@ public FrameBlock invalidByLength(MatrixBlock feaLen) {
}

public static FrameBlock mergeSchema(FrameBlock temp1, FrameBlock temp2) {
String[] rowTemp1 = temp1.getStringRowIterator().next();
String[] rowTemp2 = temp2.getStringRowIterator().next();
String[] rowTemp1 = IteratorFactory.getStringRowIterator(temp1).next();
String[] rowTemp2 = IteratorFactory.getStringRowIterator(temp2).next();

if(rowTemp1.length != rowTemp2.length)
throw new DMLRuntimeException("Schema dimension "
Expand Down Expand Up @@ -1821,7 +1574,7 @@ public FrameBlock frameRowReplication(FrameBlock rowToreplicate) {
}

public FrameBlock valueSwap(FrameBlock schema) {
String[] schemaString = schema.getStringRowIterator().next();
String[] schemaString = IteratorFactory.getStringRowIterator(schema).next();
String dataValue2 = null;
double minSimScore = 0;
int bestIdx = 0;
Expand All @@ -1846,7 +1599,7 @@ public FrameBlock valueSwap(FrameBlock schema) {
if(this.get(j, i) == null)
continue;
String dataValue = this.get(j, i).toString().trim().replace("\"", "").toLowerCase();
ValueType dataType = isType(dataValue);
ValueType dataType = FrameUtil.isType(dataValue);

String type = dataType.toString().replaceAll("\\d", "");
// get the avergae column length
Expand All @@ -1861,7 +1614,7 @@ public FrameBlock valueSwap(FrameBlock schema) {
Object item = this.get(j, w);
String dataValueProb = (item != null) ? item.toString().trim().replace("\"", "")
.toLowerCase() : "0";
ValueType dataTypeProb = isType(dataValueProb);
ValueType dataTypeProb = FrameUtil.isType(dataValueProb);
if(!dataTypeProb.toString().equals(schemaString[w])) {
bestIdx = w;
break;
Expand Down
27 changes: 26 additions & 1 deletion src/main/java/org/apache/sysds/runtime/frame/data/FrameUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

package org.apache.sysds.runtime.frame.data;

import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.frame.data.columns.Array;

@SuppressWarnings({"rawtypes"})
public interface FrameUtil {
@SuppressWarnings({"rawtypes"})
public static Array[] add(Array[] ar, Array e) {
if(ar == null)
return new Array[] {e};
Expand All @@ -31,4 +32,28 @@ public static Array[] add(Array[] ar, Array e) {
ret[ar.length] = e;
return ret;
}

public static ValueType isType(String val) {
val = val.trim().toLowerCase().replaceAll("\"", "");
if(val.matches("(true|false|t|f|0|1)"))
return ValueType.BOOLEAN;
else if(val.matches("[-+]?\\d+")) {
long maxValue = Long.parseLong(val);
if((maxValue >= Integer.MIN_VALUE) && (maxValue <= Integer.MAX_VALUE))
return ValueType.INT32;
else
return ValueType.INT64;
}
else if(val.matches("[-+]?[0-9]*\\.?[0-9]+([eE][-+]?[0-9]+)?")) {
double maxValue = Double.parseDouble(val);
if((maxValue >= (-Float.MAX_VALUE)) && (maxValue <= Float.MAX_VALUE))
return ValueType.FP32;
else
return ValueType.FP64;
}
else if(val.equals("infinity") || val.equals("-infinity") || val.equals("nan"))
return ValueType.FP64;
else
return ValueType.STRING;
}
}

0 comments on commit b0cdf5a

Please sign in to comment.