Skip to content

Commit

Permalink
[SYSTEMDS-2699] CLA IO Compressed Matrix
Browse files Browse the repository at this point in the history
This commit adds the basic blocks for writing a compressed matrix to
disk, and adds a basic test for the case of writing a matrix and
read it back from disk.

Further testing and full integration into DML is needed, and a mechanism
to detect if the format of the compression groups have changed.
  • Loading branch information
Baunsgaard committed Oct 12, 2022
1 parent 37520e8 commit 9241488
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ docs/_site
# Test Artifacts
src/test/scripts/**/*.dmlt
src/test/scripts/functions/mlcontextin/
src/test/java/org/apache/sysds/test/component/compress/io/files
.factorypath

# Excluded sources
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/apache/sysds/common/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ public enum FileFormat {
TEXT, // text cell IJV representation (mm w/o header)
MM, // text matrix market IJV representation
CSV, // text dense representation
COMPRESSED, // Internal SYSTEMDS compressed format
LIBSVM, // text libsvm sparse row representation
JSONL, // text nested JSON (Line) representation
BINARY, // binary block representation (dense/sparse/ultra-sparse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,25 @@ protected CompressedMatrixBlock(MatrixBlock uncompressedMatrixBlock) {
decompressedVersion = new SoftReference<>(uncompressedMatrixBlock);
}

/**
* Direct constructor with everything.
*
* @param rl Number of rows in the block
* @param cl Number of columns
* @param nnz Number of non zeros
* @param overlapping If the matrix is overlapping
* @param groups The list of column groups
*/
protected CompressedMatrixBlock(int rl, int cl, long nnz, boolean overlapping, List<AColGroup> groups) {
super(true);
this.rlen = rl;
this.clen = cl;
this.sparse = false;
this.nonZeros = nnz;
this.overlappingColGroups = overlapping;
this._colGroups = groups;
}

@Override
public void reset(int rl, int cl, boolean sp, long estnnz, double val) {
throw new DMLCompressionException("Invalid to reset a Compressed MatrixBlock");
Expand Down Expand Up @@ -370,6 +389,15 @@ public void readFields(DataInput in) throws IOException {
_colGroups = ColGroupIO.readGroups(in, rlen);
}

public static CompressedMatrixBlock read(DataInput in) throws IOException {
int rlen = in.readInt();
int clen = in.readInt();
long nonZeros = in.readLong();
boolean overlappingColGroups = in.readBoolean();
List<AColGroup> groups = ColGroupIO.readGroups(in, rlen);
return new CompressedMatrixBlock(rlen, clen, nonZeros, overlappingColGroups, groups);
}

@Override
public void write(DataOutput out) throws IOException {
if(nonZeros > 0 && getExactSizeOnDisk() > MatrixBlock.estimateSizeOnDisk(rlen, clen, nonZeros)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysds.runtime.compress.io;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;

import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.io.MatrixReader;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;

public class ReaderCompressed extends MatrixReader {

public static ReaderCompressed create() {
return new ReaderCompressed();
}

public static MatrixBlock readCompressedMatrixFromHDFS(String fname) throws IOException {
return create().readMatrixFromHDFS(fname, 0, 0, 0, 0);
}

@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int blen, long estnnz)
throws IOException, DMLRuntimeException {

JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path(fname);
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);

checkValidInputFile(fs, path);

MatrixBlock cmb = readCompressedMatrix(path, job, fs);

if(cmb.getNumRows() != rlen)
LOG.warn("Metadata file does not correlate with compressed file, NRows : " + cmb.getNumRows() + " vs " + rlen);
if(cmb.getNumColumns() != clen)
LOG.warn(
"Metadata file does not correlate with compressed file, NCols : " + cmb.getNumColumns() + " vs " + clen);

return cmb;
}

@Override
public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int blen, long estnnz)
throws IOException, DMLRuntimeException {
throw new NotImplementedException("Not implemented reading compressedMatrix from input stream");
}

private static MatrixBlock readCompressedMatrix(Path path, JobConf job, FileSystem fs) throws IOException {
if(fs.getFileStatus(path).isDirectory())
return readCompressedMatrixFolder(path, job, fs);
else
return readCompressedMatrixSingleFile(path, job, fs);
}

private static MatrixBlock readCompressedMatrixFolder(Path path, JobConf job, FileSystem fs) {
throw new NotImplementedException();
}

private static MatrixBlock readCompressedMatrixSingleFile(Path path, JobConf job, FileSystem fs) throws IOException {
final InputStream is = fs.open(path);
final DataInput in = new DataInputStream(is);
MatrixBlock ret;
try {
ret = CompressedMatrixBlock.read(in);
}
finally {
is.close();
}
return ret;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysds.runtime.compress.io;

import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory;
import org.apache.sysds.runtime.compress.DMLCompressionException;
import org.apache.sysds.runtime.io.FileFormatProperties;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.io.MatrixWriter;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.HDFSTool;

public class WriterCompressed extends MatrixWriter {

public static WriterCompressed create(FileFormatProperties props) {
return new WriterCompressed();
}

public static void writeCompressedMatrixToHDFS(MatrixBlock src, String fname) throws IOException {
create(null).writeMatrixToHDFS(src, fname, 0, 0, 0, 0, false);
}

@Override
public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int blen, long nnz, boolean diag)
throws IOException {
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path(fname);
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);

HDFSTool.deleteFileIfExistOnHDFS(fname);
try {
writeCompressedMatrixToHDFS(path, job, fs, src);
}
catch(DMLCompressionException ce) {
fs.delete(path, true);
throw ce;
}
finally {
IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
}
}

@Override
public void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, int blen) throws IOException {
throw new NotImplementedException();
}

private void writeCompressedMatrixToHDFS(Path path, JobConf conf, FileSystem fs, MatrixBlock src)
throws IOException {
final OutputStream os = fs.create(path, true);
final DataOutput out = new DataOutputStream(os);
try {
final MatrixBlock mb = src instanceof CompressedMatrixBlock ? // If compressed
src : // Do not compress
CompressedMatrixBlockFactory.compress(src).getLeft(); // otherwise compress

if(!(mb instanceof CompressedMatrixBlock))
throw new DMLCompressionException("Input was not compressed, therefore the file was not saved to disk");

CompressedMatrixBlock cmb = (CompressedMatrixBlock) mb;
cmb.write(out);
}
finally {
os.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.sysds.conf.CompilerConfig.ConfigType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.io.ReaderCompressed;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;

Expand Down Expand Up @@ -66,6 +67,10 @@ public static MatrixReader createMatrixReader(FileFormat fmt) {
reader = (par & mcsr) ? new ReaderHDF5Parallel(
new FileFormatPropertiesHDF5()) : new ReaderHDF5(new FileFormatPropertiesHDF5());
break;

case COMPRESSED:
reader = ReaderCompressed.create();

default:
throw new DMLRuntimeException("Failed to create matrix reader for unknown format: " + fmt.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

package org.apache.sysds.runtime.io;

import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.conf.CompilerConfig.ConfigType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.io.WriterCompressed;

public class MatrixWriterFactory
{
Expand Down Expand Up @@ -85,6 +86,9 @@ else if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE
else
return new WriterHDF5((FileFormatPropertiesHDF5) props);

case COMPRESSED:
return WriterCompressed.create(props);

default:
throw new DMLRuntimeException("Failed to create matrix writer for unknown format: " + fmt.toString());
}
Expand Down
Loading

0 comments on commit 9241488

Please sign in to comment.