Skip to content

Commit

Permalink
[SYSTEMDS-3488] Compressed Frame Write and Read Binary
Browse files Browse the repository at this point in the history
This commit contains the infrastructure to write and read binary
compressed frames. Note that the compression framework for frames
is not done, but this code use the basic interface for it while
using the binary blocks code as the base.
  • Loading branch information
Baunsgaard committed Jan 19, 2023
1 parent 2f0294e commit 51dbaf3
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 54 deletions.
Expand Up @@ -25,6 +25,11 @@
import org.apache.sysds.runtime.frame.data.compress.FrameCompressionStatistics;

public class FrameLibCompress {

public static Pair<FrameBlock, FrameCompressionStatistics> compress(FrameBlock in, int k) {
return compress(in, k, null);
}

public static Pair<FrameBlock, FrameCompressionStatistics> compress(FrameBlock in, int k, WTreeRoot root) {
return new ImmutablePair<>(in, new FrameCompressionStatistics());
}
Expand Down
36 changes: 8 additions & 28 deletions src/main/java/org/apache/sysds/runtime/io/FrameReaderFactory.java
Expand Up @@ -30,49 +30,29 @@ public class FrameReaderFactory {
protected static final Log LOG = LogFactory.getLog(FrameReaderFactory.class.getName());

public static FrameReader createFrameReader(FileFormat fmt) {
if( LOG.isDebugEnabled() )
LOG.debug("Creating Frame Reader " + fmt);
FileFormatProperties props = (fmt == FileFormat.CSV) ? new FileFormatPropertiesCSV() : null;
return createFrameReader(fmt, props);
}

public static FrameReader createFrameReader(FileFormat fmt, FileFormatProperties props) {
if( LOG.isDebugEnabled() )
LOG.debug("Creating Frame Reader " + fmt + props);
FrameReader reader = null;

boolean textParallel = ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_TEXTFORMATS);
boolean binaryParallel = ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS);
switch(fmt) {
case TEXT:
if(ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_TEXTFORMATS))
reader = new FrameReaderTextCellParallel();
else
reader = new FrameReaderTextCell();
break;

return textParallel ? new FrameReaderTextCellParallel() : new FrameReaderTextCell();
case CSV:
if(props != null && !(props instanceof FileFormatPropertiesCSV))
throw new DMLRuntimeException("Wrong type of file format properties for CSV writer.");
if(ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_TEXTFORMATS))
reader = new FrameReaderTextCSVParallel((FileFormatPropertiesCSV) props);
else
reader = new FrameReaderTextCSV((FileFormatPropertiesCSV) props);
break;

FileFormatPropertiesCSV fp = (FileFormatPropertiesCSV) props;
return textParallel ? new FrameReaderTextCSVParallel(fp) : new FrameReaderTextCSV(fp);
case COMPRESSED: // use same logic as a binary read
case BINARY:
if(ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS))
reader = new FrameReaderBinaryBlockParallel();
else
reader = new FrameReaderBinaryBlock();
break;
return binaryParallel ? new FrameReaderBinaryBlockParallel() : new FrameReaderBinaryBlock();
case PROTO:
// TODO performance improvement: add parallel reader
reader = new FrameReaderProto();
break;

return new FrameReaderProto();
default:
throw new DMLRuntimeException("Failed to create frame reader for unknown format: " + fmt.toString());
}

return reader;
}
}
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysds.runtime.io;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.frame.data.FrameBlock;
import org.apache.sysds.runtime.frame.data.lib.FrameLibCompress;

public class FrameWriterCompressed extends FrameWriterBinaryBlockParallel {

private final boolean parallel;

public FrameWriterCompressed(boolean parallel) {
this.parallel = parallel;
}

@Override
protected void writeBinaryBlockFrameToHDFS(Path path, JobConf job, FrameBlock src, long rlen, long clen)
throws IOException, DMLRuntimeException {
int k = parallel ? OptimizerUtils.getParallelBinaryWriteParallelism() : 1;
FrameBlock compressed = FrameLibCompress.compress(src, k).getLeft();
super.writeBinaryBlockFrameToHDFS(path, job, compressed, rlen, clen);
}

}
37 changes: 11 additions & 26 deletions src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java
Expand Up @@ -33,40 +33,25 @@ public static FrameWriter createFrameWriter(FileFormat fmt) {
return createFrameWriter(fmt, null);
}

public static FrameWriter createFrameWriter( FileFormat fmt, FileFormatProperties props ) {
FrameWriter writer = null;
public static FrameWriter createFrameWriter(FileFormat fmt, FileFormatProperties props) {
boolean textParallel = ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_TEXTFORMATS);
boolean binaryParallel = ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS);
switch(fmt) {
case TEXT:
if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_TEXTFORMATS) )
writer = new FrameWriterTextCellParallel();
else
writer = new FrameWriterTextCell();
break;

return textParallel ? new FrameWriterTextCellParallel() : new FrameWriterTextCell();
case CSV:
if( props!=null && !(props instanceof FileFormatPropertiesCSV) )
if(props != null && !(props instanceof FileFormatPropertiesCSV))
throw new DMLRuntimeException("Wrong type of file format properties for CSV writer.");
if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_TEXTFORMATS) )
writer = new FrameWriterTextCSVParallel((FileFormatPropertiesCSV)props);
else
writer = new FrameWriterTextCSV((FileFormatPropertiesCSV)props);
break;

FileFormatPropertiesCSV fp = (FileFormatPropertiesCSV) props;
return textParallel ? new FrameWriterTextCSVParallel(fp) : new FrameWriterTextCSV(fp);
case COMPRESSED:
return new FrameWriterCompressed(binaryParallel);
case BINARY:
if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS) )
writer = new FrameWriterBinaryBlockParallel();
else
writer = new FrameWriterBinaryBlock();
break;

return binaryParallel ? new FrameWriterBinaryBlockParallel() : new FrameWriterBinaryBlock();
case PROTO:
// TODO performance improvement: add parallel reader
writer = new FrameWriterProto();
break;

return new FrameWriterProto();
default:
throw new DMLRuntimeException("Failed to create frame writer for unknown format: " + fmt.toString());
}
return writer;
}
}

0 comments on commit 51dbaf3

Please sign in to comment.