diff --git a/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/linalg/SparseVector.java b/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/linalg/SparseVector.java index 572226ee692ba..1e2bb02e6a67b 100644 --- a/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/linalg/SparseVector.java +++ b/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/linalg/SparseVector.java @@ -326,6 +326,13 @@ public double normL2Square() { return d; } + /** + * If the size of sparse vector is not given, it will be treated as + * a sparse vector with infinite size. + * Extract parts of a sparse vector and return the extracted parts + * in a new sparse vector. If the given indices exceed the + * maximum length of the vector, it will throw exception. + */ @Override public SparseVector slice(int[] indices) { SparseVector sliced = new SparseVector(indices.length); @@ -334,6 +341,9 @@ public SparseVector slice(int[] indices) { sliced.values = new double[indices.length]; for (int i = 0; i < indices.length; i++) { + if (this.n >= 0 && indices[i] >= this.n) { + throw new IllegalArgumentException("Index is larger than vector size."); + } int pos = Arrays.binarySearch(this.indices, indices[i]); if (pos >= 0) { sliced.indices[nnz] = i; diff --git a/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/operator/common/dataproc/vector/VectorToColumnsMapper.java b/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/operator/common/dataproc/vector/VectorToColumnsMapper.java new file mode 100644 index 0000000000000..ed8782d4f51bb --- /dev/null +++ b/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/operator/common/dataproc/vector/VectorToColumnsMapper.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.operator.common.dataproc.vector; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.ml.api.misc.param.Params; +import org.apache.flink.ml.common.linalg.DenseVector; +import org.apache.flink.ml.common.linalg.SparseVector; +import org.apache.flink.ml.common.linalg.Vector; +import org.apache.flink.ml.common.mapper.Mapper; +import org.apache.flink.ml.common.utils.OutputColsHelper; +import org.apache.flink.ml.common.utils.TableUtil; +import org.apache.flink.ml.params.dataproc.vector.VectorToColumnsParams; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.Arrays; + +/** + * This mapper maps vector to table columns, and the table is created with the first + * colSize value of the vector. + * For sparse vector without given size, it will be treated as vector with infinite size. + * If the colSize is larger than the vector size, we'll throw exception; + * If the colSize is not larger than the vector size, we'll select the first + * colSize value of the vector. + */ +public class VectorToColumnsMapper extends Mapper { + private int colSize; + private int idx; + private OutputColsHelper outputColsHelper; + + public VectorToColumnsMapper(TableSchema dataSchema, Params params) { + super(dataSchema, params); + String selectedColName = this.params.get(VectorToColumnsParams.SELECTED_COL); + idx = TableUtil.findColIndex(dataSchema.getFieldNames(), selectedColName); + Preconditions.checkArgument(idx >= 0, "Can not find column: " + selectedColName); + String[] outputColNames = this.params.get(VectorToColumnsParams.OUTPUT_COLS); + Preconditions.checkArgument(null != outputColNames, + "VectorToTable: outputColNames must set."); + this.colSize = outputColNames.length; + TypeInformation[] types = new TypeInformation[colSize]; + Arrays.fill(types, Types.DOUBLE); + this.outputColsHelper = new OutputColsHelper(dataSchema, outputColNames, types, + this.params.get(VectorToColumnsParams.RESERVED_COLS)); + } + + @Override + public Row map(Row row) { + Row result = new Row(colSize); + Object obj = row.getField(idx); + if (null == obj) { + for (int i = 0; i < colSize; i++) { + result.setField(i, null); + } + return outputColsHelper.getResultRow(row, result); + } + + Vector vec = (Vector) obj; + + int vectorSize = vec.size(); + if (vectorSize >= 0 && colSize > vectorSize) { + throw new RuntimeException("colSize is larger than vector size! colSize: " + + colSize + ", vectorSize: " + vectorSize); + } + if (vec instanceof SparseVector) { + for (int i = 0; i < colSize; ++i) { + result.setField(i, 0.0); + } + SparseVector sparseVector = (SparseVector) vec; + int nnz = sparseVector.numberOfValues(); + int[] indices = sparseVector.getIndices(); + double[] values = sparseVector.getValues(); + for (int i = 0; i < nnz; ++i) { + if (indices[i] < colSize) { + result.setField(indices[i], values[i]); + } else { + break; + } + } + } else { + DenseVector denseVector = (DenseVector) vec; + for (int i = 0; i < colSize; ++i) { + result.setField(i, denseVector.get(i)); + } + } + return outputColsHelper.getResultRow(row, result); + } + + @Override + public TableSchema getOutputSchema() { + return outputColsHelper.getResultSchema(); + } +} diff --git a/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/params/dataproc/vector/VectorToColumnsParams.java b/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/params/dataproc/vector/VectorToColumnsParams.java new file mode 100644 index 0000000000000..fbdefa5f6e19a --- /dev/null +++ b/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/params/dataproc/vector/VectorToColumnsParams.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.params.dataproc.vector; + +import org.apache.flink.ml.params.shared.colname.HasOutputCols; +import org.apache.flink.ml.params.shared.colname.HasReservedCols; +import org.apache.flink.ml.params.shared.colname.HasSelectedCol; + +/** + * parameters of vector to columns. + */ +public interface VectorToColumnsParams extends + HasSelectedCol, + HasOutputCols, + HasReservedCols { +} diff --git a/flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/operator/common/dataproc/vector/VectorToColumnsMapperTest.java b/flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/operator/common/dataproc/vector/VectorToColumnsMapperTest.java new file mode 100644 index 0000000000000..7b4556faca305 --- /dev/null +++ b/flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/operator/common/dataproc/vector/VectorToColumnsMapperTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.operator.common.dataproc.vector; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.ml.api.misc.param.Params; +import org.apache.flink.ml.common.linalg.DenseVector; +import org.apache.flink.ml.common.linalg.SparseVector; +import org.apache.flink.ml.params.dataproc.vector.VectorToColumnsParams; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Unit test for VectorToColumnsMapper. + */ +public class VectorToColumnsMapperTest { + @Test + public void testDenseReversed() throws Exception { + TableSchema schema = new TableSchema(new String[]{"vec"}, new TypeInformation[]{Types.STRING}); + + Params params = new Params() + .set(VectorToColumnsParams.SELECTED_COL, "vec") + .set(VectorToColumnsParams.OUTPUT_COLS, new String[]{"f0", "f1"}); + + VectorToColumnsMapper mapper = new VectorToColumnsMapper(schema, params); + Row row = mapper.map(Row.of(new DenseVector(new double[]{3.0, 4.0}))); + assertEquals(row.getField(1), 3.0); + assertEquals(row.getField(2), 4.0); + assertEquals(mapper.getOutputSchema(), new TableSchema(new String[]{"vec", "f0", "f1"}, + new TypeInformation[]{Types.STRING, Types.DOUBLE, Types.DOUBLE})); + + } + + @Test + public void testDense() throws Exception { + TableSchema schema = new TableSchema(new String[]{"vec"}, new TypeInformation[]{Types.STRING}); + + Params params = new Params() + .set(VectorToColumnsParams.SELECTED_COL, "vec") + .set(VectorToColumnsParams.RESERVED_COLS, new String[]{}) + .set(VectorToColumnsParams.OUTPUT_COLS, new String[]{"f0", "f1"}); + + VectorToColumnsMapper mapper = new VectorToColumnsMapper(schema, params); + + Row row = mapper.map(Row.of(new DenseVector(new double[]{3.0, 4.0}))); + assertEquals(row.getField(0), 3.0); + assertEquals(row.getField(1), 4.0); + assertEquals(mapper.getOutputSchema(), new TableSchema(new String[]{"f0", "f1"}, + new TypeInformation[]{Types.DOUBLE, Types.DOUBLE})); + } + + @Test + public void testSparse() throws Exception { + TableSchema schema = new TableSchema(new String[]{"vec"}, new TypeInformation[]{Types.STRING}); + Params params = new Params() + .set(VectorToColumnsParams.SELECTED_COL, "vec") + .set(VectorToColumnsParams.OUTPUT_COLS, new String[]{"f0", "f1", "f2"}); + + VectorToColumnsMapper mapper = new VectorToColumnsMapper(schema, params); + + Row row = mapper.map(Row.of(new SparseVector(3, new int[]{1, 2}, new double[]{3.0, 4.0}))); + assertEquals(row.getField(0), new SparseVector(3, new int[]{1, 2}, new double[]{3.0, 4.0})); + assertEquals(row.getField(1), 0.0); + assertEquals(row.getField(2), 3.0); + assertEquals(row.getField(3), 4.0); + assertEquals(mapper.getOutputSchema(), new TableSchema(new String[]{"vec", "f0", "f1", "f2"}, + new TypeInformation[]{Types.STRING, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE})); + } + + @Test + public void testNull() throws Exception { + TableSchema schema = new TableSchema(new String[]{"vec"}, new TypeInformation[]{Types.STRING}); + + Params params = new Params() + .set(VectorToColumnsParams.SELECTED_COL, "vec") + .set(VectorToColumnsParams.RESERVED_COLS, new String[]{}) + .set(VectorToColumnsParams.OUTPUT_COLS, new String[]{"f0", "f1"}); + + VectorToColumnsMapper mapper = new VectorToColumnsMapper(schema, params); + + Row row = mapper.map(Row.of((Object) null)); + assertEquals(row.getField(0), null); + assertEquals(row.getField(1), null); + assertEquals(mapper.getOutputSchema(), new TableSchema(new String[]{"f0", "f1"}, + new TypeInformation[]{Types.DOUBLE, Types.DOUBLE})); + } +}