Skip to content

Commit

Permalink
port libsvm parser to mxnet as libsvm iter (#55)
Browse files Browse the repository at this point in the history
* copied csv iter to libsvm iter

test

libsvm iter draft

handle round batch == false for csr batch loader

code refactoring

add get stype, shape interface to iiter

separate class for sparse iter

add missing file

fix mem corruption'

rename variables

add comments

also read label from libsvm

add test. update docs. update submodule

Conflicts:
	python/mxnet/sparse_ndarray.py

* update submodule

* fix lint

* update test

* revert naming change

add benchmark scritp for dot (#59)

* add benchmark scritp for dot

add gpu option for bench

add get_data funciton for benchmark

print t_sparse, too;

add comment

change nnz to dnesity

add backward

* add comment

update fm test (#62)

introduce CSRNDarray and rowsparseNDarray to python frontend api (#58)

* introduce CSRNDarray and rowsparseNDarray to python frontend api

* temporarily disable fm_module test

fix lint (#64)

fix typo. disable libsvm io test (#65)

Improve dot (#61)

* Init checkin

* Fix

* Adjust dot parallelization methods

* Set num_omp_threads for benchmark from command line

* Fix omp thread number

* Clean up

* Add scipy as dot baseline

* Fix format

sparse_retain op (#66)

* Initial checkin

* Fix bugs

* Add unit test for sparse_retain

* Add example and modify test

add storage cast for outputs that have non-default storage (#67)

fix gpu build (#69)

Fix test_sparse_retain python3 issue (#68)

revert nnvm version
  • Loading branch information
eric-haibin-lin committed Jun 9, 2017
1 parent 805cad4 commit 6d240a5
Show file tree
Hide file tree
Showing 27 changed files with 1,490 additions and 211 deletions.
191 changes: 191 additions & 0 deletions benchmark/python/sparse_op.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import ctypes

from mxnet.test_utils import *
import scipy.sparse as sp
import os
import time
import argparse

from mxnet.base import check_call, _LIB

parser = argparse.ArgumentParser(description="Benchmark sparse operators",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--num-omp-threads', type=int, default=1, help='number of omp threads to set in MXNet')
args = parser.parse_args()


def get_avazu(data_dir):
if not os.path.isdir(data_dir):
os.system("mkdir " + data_dir)
os.chdir(data_dir)
if (not os.path.exists('avazu-app.t')):
import urllib
zippath = os.path.join(data_dir, "avazu-app.t.bz2")
url = "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/avazu-app.t.bz2"
urllib.urlretrieve(url, zippath)
# decompress
os.system("bzip2 -d avazu-app.t.bz2")
os.chdir("..")


def test_dot_real():
def get_iter(path, data_shape, batch_size):
data_train = mx.io.LibSVMIter(data_libsvm=path,
data_shape=data_shape,
batch_size=batch_size)
data_iter = iter(data_train)
return data_iter
data_dir = os.path.join(os.getcwd(), 'data')
get_avazu(data_dir)
path = os.path.join(data_dir, 'avazu-app.t')
# TODO(haibin) get file size automatically
size = 336490781 >> 20

# model
batch_size = 512
feature_dim = 1000000
data_shape = (feature_dim, )
train_iter = get_iter(path, data_shape, batch_size)

k = 500
weight = mx.nd.random_uniform(low=0, high=1, shape=(feature_dim, k))
weight.wait_to_read()

# start workload
start = time.time()
results = []
num_batch = 0
for batch in train_iter:
data = train_iter.getdata()
results.append(mx.nd.dot(data, weight))
num_batch += 1
for result in results:
result.wait_to_read()

end = time.time()
cost = end - start
print(size / cost, cost, num_batch, num_batch / cost)


def test_dot_synthetic():
"""benchmark mx.nd.dot(sparse_ndarray, dense_ndarray) with given density.
`t_sparse` is the time cost of dot(csr, dns), while `t_dense` is the time cost
of dot(dns, dns), with the same matrix except that it is in default storage type.
"""
def measure_cost_forward_baseline(repeat, dot, lhs, rhs):
start = time.time()
for i in range(repeat):
dot(lhs, rhs)
end = time.time()
diff = end - start
return diff / repeat

def measure_cost_backward_baseline(repeat, dot, transpose, lhs, rhs):
start = time.time()
for i in range(repeat):
dot(transpose(lhs), rhs)
end = time.time()
diff = end -start
return diff / repeat

def measure_cost(repeat, f, *args, **kwargs):
# start bench
start = time.time()
results = []
for i in range(repeat):
results.append(f(*args, **kwargs))
for result in results:
result.wait_to_read()
end = time.time()
diff = end - start
return diff / repeat

def bench_dot_forward(m, k, n, density, ctx, repeat):
set_default_context(ctx)
dns = mx.nd.random_uniform(shape=(k, n)).copyto(ctx)
data_shape = (m, k)
csr_data = rand_ndarray(data_shape, 'csr', density)
dns_data = csr_data.to_dense()
rhs_dns_np = dns.asnumpy()
lhs_csr_sp = sp.csr_matrix(dns_data.asnumpy()) # csr in scipy
lhs_dns_np = lhs_csr_sp.todense()

data = [dns_data, csr_data]
costs = []
for d in data:
dns.wait_to_read()
d.wait_to_read()
cost = measure_cost(repeat, mx.nd.dot, d, dns)
costs.append(cost / repeat)
ratio = costs[1] / costs[0]

costs_baseline = []
cost = measure_cost_forward_baseline(repeat, np.dot, lhs_dns_np, rhs_dns_np)
costs_baseline.append(cost)
cost = measure_cost_forward_baseline(repeat, sp.spmatrix.dot, lhs_csr_sp, rhs_dns_np)
costs_baseline.append(cost)
ratio_baseline = costs_baseline[1] / costs_baseline[0]
fmt = "%0.1f\t\t%s\t%d\t%d\t%d\t%0.6f\t%0.5f\t%0.2f\t\t\t%0.6f\t%0.5f\t\t%0.2f"
print(fmt % (density * 100, str(ctx), n, m, k, costs[1], costs[0], ratio,
costs_baseline[1], costs_baseline[0], ratio_baseline))

def bench_dot_backward(m, k, n, density, ctx, repeat):
set_default_context(ctx)
dns = mx.nd.random_uniform(shape=(m, n)).copyto(ctx)
data_shape = (m, k)
csr_data = rand_ndarray(data_shape, 'csr', density)
dns_data = csr_data.to_dense()
rhs_dns_np = dns.asnumpy()
lhs_csr_sp = sp.csr_matrix(dns_data.asnumpy())
lhs_dns_np = lhs_csr_sp.todense()

data = [dns_data, csr_data]
costs = []
for d in data:
dns.wait_to_read()
d.wait_to_read()
cost = measure_cost(repeat, mx.nd.dot, d, dns, transpose_a=True)
costs.append(cost)
ratio = costs[1] / costs[0]

costs_baseline = []
cost = measure_cost_backward_baseline(repeat, np.dot, np.transpose, lhs_dns_np, rhs_dns_np)
costs_baseline.append(cost)
cost = measure_cost_backward_baseline(repeat, sp.spmatrix.dot, sp.spmatrix.transpose, lhs_csr_sp, rhs_dns_np)
costs_baseline.append(cost)
ratio_baseline = costs_baseline[1] / costs_baseline[0]
fmt = "%0.1f\t\t%s\t%d\t%d\t%d\t%0.6f\t%0.5f\t%0.2f\t\t\t%0.6f\t%0.5f\t\t%0.2f"
print(fmt % (density * 100, str(ctx), n, m, k, costs[1], costs[0], ratio,
costs_baseline[1], costs_baseline[0], ratio_baseline))

print("A = sparse NDArray of shape(m, k)")
print("B = dense NDArray of shape(k, n)")
print("dot_forward\tdot(csr, dns)")
print('density(%)\tcontext\tn\tm\tk\tt_sparse\tt_dense\tt_sparse/t_dense'
'\tt_scipy_sparse\tt_scipy_dense\tt_scipy_sparse/t_scipy_dense')

check_call(_LIB.MXSetNumOMPThreads(ctypes.c_int(args.num_omp_threads)))
# TODO(haibin) make these runtime options
m = 512
k = [50000, 100000]
n = [50, 100]
density = [0.05, 0.02, 0.01, 0.005, 0.001]
num_repeat = 10
# contexts = [mx.cpu(), mx.gpu(0)]
contexts = [mx.cpu()]
for i in range(2):
for ctx in contexts:
for den in density:
bench_dot_forward(m, k[i], n[i], den, ctx, num_repeat)

print("dot_backward\tdot(csr.T, dns)")
print('density(%)\tcontext\tn\tm\tk\tt_sparse\tt_dense\tt_sparse/t_dense'
'\tt_scipy_sparse\tt_scipy_dense\tt_scipy_sparse/t_scipy_dense')
for i in range(2):
for ctx in contexts:
for den in density:
bench_dot_backward(m, k[i], n[i], den, ctx, num_repeat)

if __name__ == "__main__":
test_dot_real()
test_dot_synthetic()
2 changes: 1 addition & 1 deletion dmlc-core
13 changes: 13 additions & 0 deletions include/mxnet/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ class IIterator : public dmlc::DataIter<DType> {
}
}; // class IIterator

/*!
* \brief iterator type
* \param DType data type
*/
template<typename DType>
class SparseIIterator : public IIterator<DType> {
public:
/*! \brief storage type of the data or label */
virtual const NDArrayStorageType GetStorageType(bool is_data) const = 0;
/*! \brief shape of the data or label */
virtual const TShape GetShape(bool is_data) const = 0;
}; // class SparseIIterator

/*! \brief a single data instance */
struct DataInst {
/*! \brief unique id for instance */
Expand Down
36 changes: 25 additions & 11 deletions include/mxnet/ndarray.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,44 +115,44 @@ class NDArray {
}
/*! \brief constructor for NDArray with storage type
*/
NDArray(const NDArrayStorageType storage_type, const TShape &shape, Context ctx,
NDArray(const NDArrayStorageType stype, const TShape &shape, Context ctx,
bool delay_alloc = true, int dtype = mshadow::default_type_flag,
std::vector<int> aux_types = {}, std::vector<TShape> aux_shapes = {},
TShape storage_shape = TShape(mshadow::Shape1(0)))
: shape_(shape), offset_(0), dtype_(dtype), entry_({nullptr, 0, 0}) {
// Assign default aux types if not given
if (aux_types.size() == 0) {
if (storage_type == kRowSparseStorage) {
if (stype == kRowSparseStorage) {
aux_types = {ROW_SPARSE_IDX_TYPE};
} else if (storage_type == kCSRStorage) {
} else if (stype == kCSRStorage) {
aux_types = {CSR_IND_PTR_TYPE, CSR_IDX_DTYPE};
} else {
LOG(FATAL) << "Unknown storage type" << storage_type;
LOG(FATAL) << "Unknown storage type " << stype;
}
}
// Assign default shapes if not given
// unknown shapes are intialized as {0} such that Size() would return 0
if (aux_shapes.size() == 0) {
if (storage_type == kRowSparseStorage) {
if (stype == kRowSparseStorage) {
aux_shapes = {TShape(mshadow::Shape1(0))};
} else if (storage_type == kCSRStorage) {
} else if (stype == kCSRStorage) {
// aux shapes for indptr and indices
aux_shapes = {TShape(mshadow::Shape1(0)), TShape(mshadow::Shape1(0))};
} else {
LOG(FATAL) << "Unknown storage type" << storage_type;
LOG(FATAL) << "Unknown storage type " << stype;
}
}
if (storage_shape.Size() == 0) {
if (storage_type == kRowSparseStorage) {
if (stype == kRowSparseStorage) {
storage_shape = shape;
storage_shape[0] = aux_shapes[rowsparse::kIdx][0];
} else if (storage_type == kCSRStorage) {
} else if (stype == kCSRStorage) {
storage_shape = aux_shapes[csr::kIdx];
} else {
LOG(FATAL) << "Unknown storage type" << storage_type;
LOG(FATAL) << "Unknown storage type " << stype;
}
}
ptr_ = std::make_shared<Chunk>(storage_type, storage_shape, ctx, delay_alloc,
ptr_ = std::make_shared<Chunk>(stype, storage_shape, ctx, delay_alloc,
dtype, aux_types, aux_shapes);
#if MKL_EXPERIMENTAL == 1
Mkl_mem_ = std::make_shared<MKLMemHolder>();
Expand Down Expand Up @@ -306,6 +306,20 @@ class NDArray {
CHECK(!is_none());
return ptr_->aux_types[i];
}
/*!
* \return the number of aux data used for given storage type
*/
static size_t NumAuxData(NDArrayStorageType stype) {
size_t num = 0;
switch (stype) {
case kDefaultStorage: num = 0; break;
case kCSRStorage: num = 2; break;
case kRowSparseStorage: num = 1; break;
default: LOG(FATAL) << "Unknown storage type" << stype; break;
}
return num;
}

inline NDArrayStorageType storage_type() const {
if (is_none()) return kUndefinedStorage;
return ptr_->storage_type;
Expand Down
2 changes: 1 addition & 1 deletion nnvm
Submodule nnvm updated 0 files
13 changes: 2 additions & 11 deletions python/mxnet/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from .base import mx_uint, NDArrayHandle, ExecutorHandle
from .base import check_call, c_array, py_str
from .ndarray import NDArray
from .sparse_ndarray import SparseNDArray, _STORAGE_TYPE_STR_TO_ID
from .sparse_ndarray import _ndarray_cls
from . import ndarray as nd

# those functions are not used here, we just import them to keep backward compatibility
Expand Down Expand Up @@ -92,16 +92,7 @@ def _get_outputs(self):
check_call(_LIB.MXExecutorOutputs(self.handle,
ctypes.byref(out_size), ctypes.byref(handles)))
num_output = out_size.value
outputs = []
for i in range(num_output):
storage_type = ctypes.c_int(0)
check_call(_LIB.MXNDArrayGetStorageType(ctypes.cast(handles[i], NDArrayHandle),
ctypes.byref(storage_type)))
assert(storage_type != _STORAGE_TYPE_STR_TO_ID['undefined'])
output = NDArray(NDArrayHandle(handles[i])) \
if storage_type.value == _STORAGE_TYPE_STR_TO_ID['default'] \
else SparseNDArray(NDArrayHandle(handles[i]))
outputs.append(output)
outputs = [_ndarray_cls(NDArrayHandle(handles[i])) for i in range(num_output)]
return outputs

def forward(self, is_train=False, **kwargs):
Expand Down
5 changes: 3 additions & 2 deletions python/mxnet/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .base import mx_real_t
from .base import check_call, build_param_doc as _build_param_doc
from .ndarray import NDArray
from .sparse_ndarray import _ndarray_cls
from .ndarray import array
from .ndarray import concatenate

Expand Down Expand Up @@ -752,12 +753,12 @@ def iter_next(self):
def getdata(self):
hdl = NDArrayHandle()
check_call(_LIB.MXDataIterGetData(self.handle, ctypes.byref(hdl)))
return NDArray(hdl, False)
return _ndarray_cls(hdl, False)

def getlabel(self):
hdl = NDArrayHandle()
check_call(_LIB.MXDataIterGetLabel(self.handle, ctypes.byref(hdl)))
return NDArray(hdl, False)
return _ndarray_cls(hdl, False)

def getindex(self):
index_size = ctypes.c_uint64(0)
Expand Down
Loading

0 comments on commit 6d240a5

Please sign in to comment.