From 483caef1276c80f60bcc6c97836c8008d62ec72b Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 26 May 2015 00:35:05 +0200 Subject: [PATCH] [FLINK-2053] [ml] Adds automatic type registration of flink-ml types. Adds de-duplication of registered types at ExecutionConfig. Fixes bug in Breeze SparseVector to Flink SparseVector conversion. --- .../flink/api/common/ExecutionConfig.java | 11 ++-- .../flink/api/common/ExecutionConfigTest.java | 47 +++++++++++++++ .../typeutils/runtime/PojoSerializer.java | 3 +- .../runtime/kryo/KryoSerializer.java | 5 +- .../flink/ml/classification/CoCoA.scala | 9 ++- .../{FlinkTools.scala => FlinkMLTools.scala} | 33 +++++++++- .../org/apache/flink/ml/math/Breeze.scala | 4 +- .../flink/ml/math/BreezeVectorConverter.scala | 10 +++- .../apache/flink/ml/pipeline/Estimator.scala | 3 +- .../apache/flink/ml/pipeline/Predictor.scala | 3 +- .../flink/ml/pipeline/Transformer.scala | 3 +- .../ml/preprocessing/StandardScaler.scala | 2 - .../apache/flink/ml/recommendation/ALS.scala | 6 +- .../flink/ml/common/FlinkMLToolsSuite.scala | 60 +++++++++++++++++++ ....scala => PolynomialFeaturesITSuite.scala} | 2 +- .../flink/ml/pipeline/PipelineITSuite.scala | 5 +- 16 files changed, 181 insertions(+), 25 deletions(-) create mode 100644 flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java rename flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/{FlinkTools.scala => FlinkMLTools.scala} (91%) create mode 100644 flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/common/FlinkMLToolsSuite.scala rename flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/{PolynomialBaseITSuite.scala => PolynomialFeaturesITSuite.scala} (99%) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 3af153a395db1..8baedb4fc3d34 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -97,9 +98,9 @@ public class ExecutionConfig implements Serializable { private final List, Class>>> defaultKryoSerializerClasses = new ArrayList, Class>>>(); - private final List> registeredKryoTypes = new ArrayList>(); + private final LinkedHashSet> registeredKryoTypes = new LinkedHashSet>(); - private final List> registeredPojoTypes = new ArrayList>(); + private final LinkedHashSet> registeredPojoTypes = new LinkedHashSet>(); // -------------------------------------------------------------------------------------------- @@ -505,11 +506,11 @@ public List, Class>>> getDefaultKryoSeria /** * Returns the registered Kryo types. */ - public List> getRegisteredKryoTypes() { + public LinkedHashSet> getRegisteredKryoTypes() { if (isForceKryoEnabled()) { // if we force kryo, we must also return all the types that // were previously only registered as POJO - List> result = new ArrayList>(); + LinkedHashSet> result = new LinkedHashSet>(); result.addAll(registeredKryoTypes); for(Class t : registeredPojoTypes) { if (!result.contains(t)) { @@ -525,7 +526,7 @@ public List> getRegisteredKryoTypes() { /** * Returns the registered POJO types. */ - public List> getRegisteredPojoTypes() { + public LinkedHashSet> getRegisteredPojoTypes() { return registeredPojoTypes; } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java new file mode 100644 index 0000000000000..ad3ad91f6978a --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +import org.junit.Test; +import static org.junit.Assert.*; + +import java.util.Arrays; +import java.util.List; + +public class ExecutionConfigTest { + + @Test + public void testDoubleTypeRegistration() { + ExecutionConfig config = new ExecutionConfig(); + List> types = Arrays.asList((Class)Double.class, Integer.class, Double.class); + List> expectedTypes = Arrays.asList((Class)Double.class, Integer.class); + + for(Class tpe: types) { + config.registerKryoType(tpe); + } + + int counter = 0; + + for(Class tpe: config.getRegisteredKryoTypes()){ + assertEquals(tpe, expectedTypes.get(counter++)); + } + + assertTrue(counter == expectedTypes.size()); + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index c61ad8d157b62..5d4553d7d78f4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -78,7 +79,7 @@ public PojoSerializer( this.numFields = fieldSerializers.length; this.executionConfig = executionConfig; - List> registeredPojoTypes = executionConfig.getRegisteredPojoTypes(); + LinkedHashSet> registeredPojoTypes = executionConfig.getRegisteredPojoTypes(); for (int i = 0; i < numFields; i++) { this.fields[i].setAccessible(true); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java index e14546ed20890..8ae3562feea66 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java @@ -42,6 +42,7 @@ import java.io.EOFException; import java.io.IOException; import java.lang.reflect.Modifier; +import java.util.LinkedHashSet; import java.util.List; /** @@ -63,7 +64,7 @@ public class KryoSerializer extends TypeSerializer { private final List, Class>>> registeredTypesWithSerializerClasses; private final List, Serializer>> defaultSerializers; private final List, Class>>> defaultSerializerClasses; - private final List> registeredTypes; + private final LinkedHashSet> registeredTypes; private final Class type; @@ -305,7 +306,7 @@ private void checkKryoInitialized() { // For testing // -------------------------------------------------------------------------------------------- - Kryo getKryo() { + public Kryo getKryo() { checkKryoInitialized(); return this.kryo; } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala index 4ba9299cf8745..fea6be5f64957 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala @@ -26,7 +26,7 @@ import scala.util.Random import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration -import org.apache.flink.ml.common.FlinkTools.ModuloKeyPartitioner +import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner import org.apache.flink.ml.common._ import org.apache.flink.ml.math.Vector import org.apache.flink.ml.math.Breeze._ @@ -244,6 +244,11 @@ object CoCoA{ case Some(weights) => { input.map(new PredictionMapper[T]).withBroadcastSet(weights, WEIGHT_VECTOR) } + + case None => { + throw new RuntimeException("The CoCoA model has not been trained. Call first fit" + + "before calling the predict operation.") + } } } } @@ -310,7 +315,7 @@ object CoCoA{ val numberVectors = input map { x => 1 } reduce { _ + _ } // Group the input data into blocks in round robin fashion - val blockedInputNumberElements = FlinkTools.block( + val blockedInputNumberElements = FlinkMLTools.block( input, blocks, Some(ModuloKeyPartitioner)). diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala similarity index 91% rename from flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala rename to flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala index 57bf98e9b6860..553ec0065490a 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala @@ -40,7 +40,38 @@ import scala.reflect.ClassTag * Takes a DataSet of elements T and groups them in n blocks. * */ -object FlinkTools { +object FlinkMLTools { + + /** Registers the different FlinkML related types for Kryo serialization + * + * @param env + */ + def registerFlinkMLTypes(env: ExecutionEnvironment): Unit = { + + // Vector types + env.registerType(classOf[org.apache.flink.ml.math.DenseVector]) + env.registerType(classOf[org.apache.flink.ml.math.SparseVector]) + + // Matrix types + env.registerType(classOf[org.apache.flink.ml.math.DenseMatrix]) + env.registerType(classOf[org.apache.flink.ml.math.SparseMatrix]) + + // Breeze Vector types + env.registerType(classOf[breeze.linalg.DenseVector[_]]) + env.registerType(classOf[breeze.linalg.SparseVector[_]]) + + // Breeze specialized types + env.registerType(breeze.linalg.DenseVector.zeros[Double](0).getClass) + env.registerType(breeze.linalg.SparseVector.zeros[Double](0).getClass) + + // Breeze Matrix types + env.registerType(classOf[breeze.linalg.DenseMatrix[Double]]) + env.registerType(classOf[breeze.linalg.CSCMatrix[Double]]) + + // Breeze specialized types + env.registerType(breeze.linalg.DenseMatrix.zeros[Double](0, 0).getClass) + env.registerType(breeze.linalg.CSCMatrix.zeros[Double](0, 0).getClass) + } /** Writes a [[DataSet]] to the specified path and returns it as a DataSource for subsequent * operations. diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala index fbe35d4109bce..74d4d8fa04e31 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala @@ -78,10 +78,10 @@ object Breeze { def asBreeze: BreezeVector[Double] = { vector match { case dense: DenseVector => - new BreezeDenseVector[Double](dense.data) + new breeze.linalg.DenseVector(dense.data) case sparse: SparseVector => - new BreezeSparseVector[Double](sparse.indices, sparse.data, sparse.size) + new BreezeSparseVector(sparse.indices, sparse.data, sparse.size) } } } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala index 687772ed4bc65..f5f74699f4bac 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala @@ -56,7 +56,10 @@ object BreezeVectorConverter{ dense.length, dense.iterator.toIterable) case sparse: BreezeSparseVector[Double] => - new SparseVector(sparse.length, sparse.index, sparse.data) + new SparseVector( + sparse.used, + sparse.index.take(sparse.used), + sparse.data.take(sparse.used)) } } } @@ -68,7 +71,10 @@ object BreezeVectorConverter{ case dense: BreezeDenseVector[Double] => new DenseVector(dense.data) case sparse: BreezeSparseVector[Double] => - new SparseVector(sparse.length, sparse.index, sparse.data) + new SparseVector( + sparse.used, + sparse.index.take(sparse.used), + sparse.data.take(sparse.used)) } } } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala index 6acac8f3af40c..088b18452efa3 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala @@ -21,7 +21,7 @@ package org.apache.flink.ml.pipeline import scala.reflect.ClassTag import org.apache.flink.api.scala.DataSet -import org.apache.flink.ml.common.{ParameterMap, WithParameters} +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} /** Base trait for Flink's pipeline operators. * @@ -50,6 +50,7 @@ trait Estimator[Self] extends WithParameters with Serializable { training: DataSet[Training], fitParameters: ParameterMap = ParameterMap.Empty)(implicit fitOperation: FitOperation[Self, Training]): Unit = { + FlinkMLTools.registerFlinkMLTypes(training.getExecutionEnvironment) fitOperation.fit(this, fitParameters, training) } } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala index ebfa7871dfe33..c0e66a077e0b4 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala @@ -21,7 +21,7 @@ package org.apache.flink.ml.pipeline import scala.reflect.ClassTag import org.apache.flink.api.scala.DataSet -import org.apache.flink.ml.common.{ParameterMap, WithParameters} +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} /** Predictor trait for Flink's pipeline operators. * @@ -53,6 +53,7 @@ trait Predictor[Self] extends Estimator[Self] with WithParameters with Serializa predictParameters: ParameterMap = ParameterMap.Empty)(implicit predictor: PredictOperation[Self, Testing, Prediction]) : DataSet[Prediction] = { + FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) predictor.predict(this, predictParameters, testing) } } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala index 52e3f7f322d6a..02360bcfeb686 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala @@ -21,7 +21,7 @@ package org.apache.flink.ml.pipeline import scala.reflect.ClassTag import org.apache.flink.api.scala.DataSet -import org.apache.flink.ml.common.{ParameterMap, WithParameters} +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} /** Transformer trait for Flink's pipeline operators. * @@ -60,6 +60,7 @@ trait Transformer[Self <: Transformer[Self]] transformParameters: ParameterMap = ParameterMap.Empty) (implicit transformOperation: TransformOperation[Self, Input, Output]) : DataSet[Output] = { + FlinkMLTools.registerFlinkMLTypes(input.getExecutionEnvironment) transformOperation.transform(that, transformParameters, input) } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala index bd952c32fe335..2e3ed9528fd4e 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala @@ -61,7 +61,6 @@ import scala.reflect.ClassTag */ class StandardScaler extends Transformer[StandardScaler] { - var metricsOption: Option[DataSet[(linalg.Vector[Double], linalg.Vector[Double])]] = None /** Sets the target mean of the transformed data @@ -183,7 +182,6 @@ object StandardScaler { varianceVector.update(i, 1.0) } } - (metric._2 / metric._1, varianceVector) } } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala index d8efdaf8ef5ef..c5db6e49e207d 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala @@ -471,12 +471,12 @@ object ALS { blockIDPartitioner) val (userIn, userOut) = persistencePath match { - case Some(path) => FlinkTools.persist(uIn, uOut, path + "userIn", path + "userOut") + case Some(path) => FlinkMLTools.persist(uIn, uOut, path + "userIn", path + "userOut") case None => (uIn, uOut) } val (itemIn, itemOut) = persistencePath match { - case Some(path) => FlinkTools.persist(iIn, iOut, path + "itemIn", path + "itemOut") + case Some(path) => FlinkMLTools.persist(iIn, iOut, path + "itemIn", path + "itemOut") case None => (iIn, iOut) } @@ -502,7 +502,7 @@ object ALS { } val pItems = persistencePath match { - case Some(path) => FlinkTools.persist(items, path + "items") + case Some(path) => FlinkMLTools.persist(items, path + "items") case None => items } diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/common/FlinkMLToolsSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/common/FlinkMLToolsSuite.scala new file mode 100644 index 0000000000000..525ba4d8f2290 --- /dev/null +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/common/FlinkMLToolsSuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common + +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{FlatSpec, Matchers} + +class FlinkMLToolsSuite extends FlatSpec with Matchers with FlinkTestBase { + behavior of "FlinkMLTools" + + it should "register the required types" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + FlinkMLTools.registerFlinkMLTypes(env) + + val executionConfig = env.getConfig + + val serializer = new KryoSerializer[Nothing](classOf[Nothing], executionConfig) + + val kryo = serializer.getKryo() + + kryo.getRegistration(classOf[org.apache.flink.ml.math.DenseVector]).getId > 0 should be(true) + kryo.getRegistration(classOf[org.apache.flink.ml.math.SparseVector]).getId > 0 should be(true) + kryo.getRegistration(classOf[org.apache.flink.ml.math.DenseMatrix]).getId > 0 should be(true) + kryo.getRegistration(classOf[org.apache.flink.ml.math.SparseMatrix]).getId > 0 should be(true) + + kryo.getRegistration(classOf[breeze.linalg.DenseMatrix[_]]).getId > 0 should be(true) + kryo.getRegistration(classOf[breeze.linalg.CSCMatrix[_]]).getId > 0 should be(true) + kryo.getRegistration(classOf[breeze.linalg.DenseVector[_]]).getId > 0 should be(true) + kryo.getRegistration(classOf[breeze.linalg.SparseVector[_]]).getId > 0 should be(true) + + kryo.getRegistration(breeze.linalg.DenseVector.zeros[Double](0).getClass).getId > 0 should + be(true) + kryo.getRegistration(breeze.linalg.SparseVector.zeros[Double](0).getClass).getId > 0 should + be(true) + kryo.getRegistration(breeze.linalg.DenseMatrix.zeros[Double](0, 0).getClass).getId > 0 should + be(true) + kryo.getRegistration(breeze.linalg.CSCMatrix.zeros[Double](0, 0).getClass).getId > 0 should + be(true) + } + +} diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialFeaturesITSuite.scala similarity index 99% rename from flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala rename to flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialFeaturesITSuite.scala index 0f045aba61789..674c1c4a79ee3 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialFeaturesITSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.{Matchers, FlatSpec} import org.apache.flink.api.scala._ import org.apache.flink.test.util.FlinkTestBase -class PolynomialBaseITSuite +class PolynomialFeaturesITSuite extends FlatSpec with Matchers with FlinkTestBase { diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala index 880319555aabb..9909a18ef89b4 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala @@ -18,9 +18,12 @@ package org.apache.flink.ml.pipeline +import breeze.linalg +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.scala._ import org.apache.flink.ml.common.LabeledVector -import org.apache.flink.ml.math.DenseVector +import org.apache.flink.ml.math._ import org.apache.flink.ml.preprocessing.{PolynomialFeatures, StandardScaler} import org.apache.flink.ml.regression.MultipleLinearRegression import org.apache.flink.test.util.FlinkTestBase