From 88514a98642763c5ad962efecc44bef887b84110 Mon Sep 17 00:00:00 2001 From: p4nna Date: Thu, 30 Mar 2017 10:00:33 +0200 Subject: [PATCH 01/14] Added an imputer class with Strategy class The imputer imputes missing values into a sparse DataSet of Vectors with different strategies which can be chosen out of the existing ones in the strategy enum class (mean, median or most frequent value) in a row or column --- Imputer.java | 374 ++++++++++++++++++++++++++++++++++++++++++++++++++ Strategy.java | 5 + 2 files changed, 379 insertions(+) create mode 100644 Imputer.java create mode 100644 Strategy.java diff --git a/Imputer.java b/Imputer.java new file mode 100644 index 0000000000000..69e4246e01b46 --- /dev/null +++ b/Imputer.java @@ -0,0 +1,374 @@ +package Imputer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +//import java.util.Set; +//import java.util.SortedSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.collections.ListUtils; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.AggregateOperator; +import org.apache.flink.api.java.operators.MapOperator; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.ml.math.DenseVector; +import org.apache.flink.shaded.com.google.common.collect.Lists; +import org.apache.flink.util.Collector; +import org.apache.hadoop.mapreduce.Reducer; + +import scala.collection.mutable.LinkedList; +import scala.reflect.internal.Trees.New; + +public class Imputer { + + + static DenseVector testvec1= new DenseVector(new double[]{Double.NaN,3.0,1.0, 3.0}); + static DenseVector testvec2= new DenseVector(new double[]{1.0,7.0,Double.NaN, 1.0}); + static DenseVector testvec3= new DenseVector(new double[]{0.0,5.0,Double.NaN, 2.0}); + static DenseVector testvec4= new DenseVector(new double[]{6.5,Double.NaN,0.5, 0.5}); + static DenseVector testvec5= new DenseVector(new double[]{6.5,1.0,0.5, 0.5}); + + static ExecutionEnvironment env= ExecutionEnvironment.getExecutionEnvironment(); + + static DataSet ds = env.fromElements(testvec1, testvec2, testvec3, testvec4, testvec5); +// static DataSet ds = env.fromElements( testvec2, testvec3); + private static double[] meansA; + private static double[] medians; + private static double[] mostValues; +// final static ConcurrentHashMap meansHM= new ConcurrentHashMap<>(); + + + public static void main(String[] args){ + try { +// DataSet dsMean = impute(ds, Strategy.MEAN, 1); +// System.out.println("data set mean "); +// dsMean.print(); +// + DataSet dsMedian = impute(ds, Strategy.MEDIAN, 1); + System.out.println("data set median "); + dsMedian.print(); +// +// DataSet dsMost = impute(ds, Strategy.MOST_FREQUENT, 1); +// System.out.println("data set most frequent "); +// dsMost.print(); +// +// DataSet dsMean0 = impute(ds, Strategy.MEAN, 0); +// System.out.println("data set mean "); +// dsMean0.print(); +// +// DataSet dsMedian0 = impute(ds, Strategy.MEDIAN, 0); +// System.out.println("data set median "); +// dsMedian0.print(); +// +// DataSet dsMax0 = impute(ds, Strategy.MOST_FREQUENT, 0); +// System.out.println("data set max "); +// dsMax0.print(); + + } catch (Exception e) { + System.out.println("here happened an exception"); + e.printStackTrace(); + } + } + + /** + * + * @param sparseData + * @param strategy use MEAN, MEDIAN or the MOST_FREQUENT value to impute missing values + * @param axis 0: impute along columns, 1: imput along rows + * @return dataset without zeroes / missing values + * @throws Exception + */ + public static DataSet impute(DataSet sparseData, Strategy strategy, int axis) throws Exception{ + double val; + DataSet ret = sparseData; + if(axis==0){ //columnwise + switch (strategy){ + case MEAN: + ret=sparseData.map(new MapFunction() { + @Override + public DenseVector map(DenseVector vec) throws Exception { + for(int i = 0; i() { + @Override + public DenseVector map(DenseVector vec) throws Exception { + for(int i = 0; i() { + @Override + public DenseVector map(DenseVector vec) throws Exception { + for(int i = 0; i() { + double v; + @Override + public DenseVector map(DenseVector vec) { + for(int i = 0; i() { + @Override + public DenseVector map(DenseVector vec) throws Exception { + for(int i = 0; i() { + @Override + public DenseVector map(DenseVector vec) throws Exception { + for(int i = 0; i numArray = new ArrayList<>(); + double val; + for(int i =0; i< vec.size(); i++){ + val=vec.apply(i); + if(Double.compare(Double.NaN, val)!=0){ + numArray.add(val); + } + } + Collections.sort(numArray); + int middle = numArray.size() / 2; + if(numArray.size() % 2 == 0){ + double medianA = numArray.get(middle); + double medianB = numArray.get(middle-1); + ret = (medianA + medianB) / 2d; + } else{ + ret = numArray.get(middle); + } + return ret; + } + public static double getValueMOST(DenseVector vec){ + double ret=0; + HashMap frequencies= new HashMap<>(); + for(int i =0; imax){ + max=frequencies.get(key); + maxKey=key; + } + } + ret=maxKey; + return ret; + + } + + public static int numOfElementsNotZero(DenseVector vec){ + int zeros=0; + for(int i=0; i ds, Strategy strategy) throws Exception{ + //(entry, 1, dimension) + DataSet> nonZeroTuples= ds.flatMap(new FlatMapFunction>() { + @Override + public void flatMap(DenseVector vec, Collector> col) throws Exception { + double[] entries= vec.data(); + double entry; + for(int i = 0; i (entry, 1, i) ); + } + } + } + }); + + List>> lists; + DataSet>> nonZeros2= nonZeroTuples.map(new MapFunction, Tuple2> >() { + @Override + public Tuple2> map(Tuple3 t) throws Exception { + return new Tuple2>(t.f2, Lists.newArrayList(t.f0)); + } + }); + + lists= nonZeros2.groupBy(0).reduce(new ReduceFunction>>() { + List ret= new java.util.LinkedList<>(); + @Override + public Tuple2> reduce(Tuple2> t1, + Tuple2> t2) throws Exception { + ret=ListUtils.union(t1.f1,t2.f1); + return new Tuple2>(t1.f0, ret); + } + }).collect(); + + switch(strategy){ + case MEAN: + DataSet> infos= nonZeroTuples.groupBy(2).sum(0).andSum(1); + meansA= new double[lists.size()]; + final String s = "hello"; + List> means= infos.map(new MapFunction, Tuple2>() { + @Override + public Tuple2 map(Tuple3 t) throws Exception { + double mean= (double) t.f0/(double) t.f1; + return new Tuple2(t.f2, mean); + } + }).collect(); + for(Tuple2 t: means){ + meansA[t.f0]=t.f1; + } + break; + case MEDIAN: + double median; + int size; + //lists contains a list for every dimension in which all the values of the data set are written. + // we will later sort every of those lists in order to determine the median. + + medians= new double[lists.size()]; + List l; + for(Tuple2> t: lists){ + l= t.f1; + Collections.sort(l); + System.out.println("list " + t.f0 + " ist "+ l.toString()); + size=l.size(); + if(size%2==0){ + median=(l.get(size/2) + l.get(size/2-1))/2d; + }else{ + median=l.get(size/2); + } + medians[t.f0]=median; + } + for(int i = 0; i frequencies= new HashMap<>(); + mostValues= new double[lists.size()]; + for(Tuple2> t: lists){ + int max=0; + double maxKey=0; + // calculate frequencie hashmap for each dimension + List list=t.f1; + frequencies.clear(); + for(int j=0; jmax){ + max=frequencies.get(k); + maxKey=k; + } + } + mostValues[t.f0]=maxKey; + + } + break; + } + } + + +} diff --git a/Strategy.java b/Strategy.java new file mode 100644 index 0000000000000..e29d389a0cbd6 --- /dev/null +++ b/Strategy.java @@ -0,0 +1,5 @@ +package Imputer; + +public enum Strategy { +MEAN, MEDIAN, MOST_FREQUENT; +} From d17c6de2ad9456a58d24ac4cda44b5ef5ce5c216 Mon Sep 17 00:00:00 2001 From: p4nna Date: Thu, 30 Mar 2017 10:01:47 +0200 Subject: [PATCH 02/14] deleted class in false destination --- Imputer.java | 374 --------------------------------------------------- 1 file changed, 374 deletions(-) delete mode 100644 Imputer.java diff --git a/Imputer.java b/Imputer.java deleted file mode 100644 index 69e4246e01b46..0000000000000 --- a/Imputer.java +++ /dev/null @@ -1,374 +0,0 @@ -package Imputer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -//import java.util.Set; -//import java.util.SortedSet; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.collections.ListUtils; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.operators.AggregateOperator; -import org.apache.flink.api.java.operators.MapOperator; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.ml.math.DenseVector; -import org.apache.flink.shaded.com.google.common.collect.Lists; -import org.apache.flink.util.Collector; -import org.apache.hadoop.mapreduce.Reducer; - -import scala.collection.mutable.LinkedList; -import scala.reflect.internal.Trees.New; - -public class Imputer { - - - static DenseVector testvec1= new DenseVector(new double[]{Double.NaN,3.0,1.0, 3.0}); - static DenseVector testvec2= new DenseVector(new double[]{1.0,7.0,Double.NaN, 1.0}); - static DenseVector testvec3= new DenseVector(new double[]{0.0,5.0,Double.NaN, 2.0}); - static DenseVector testvec4= new DenseVector(new double[]{6.5,Double.NaN,0.5, 0.5}); - static DenseVector testvec5= new DenseVector(new double[]{6.5,1.0,0.5, 0.5}); - - static ExecutionEnvironment env= ExecutionEnvironment.getExecutionEnvironment(); - - static DataSet ds = env.fromElements(testvec1, testvec2, testvec3, testvec4, testvec5); -// static DataSet ds = env.fromElements( testvec2, testvec3); - private static double[] meansA; - private static double[] medians; - private static double[] mostValues; -// final static ConcurrentHashMap meansHM= new ConcurrentHashMap<>(); - - - public static void main(String[] args){ - try { -// DataSet dsMean = impute(ds, Strategy.MEAN, 1); -// System.out.println("data set mean "); -// dsMean.print(); -// - DataSet dsMedian = impute(ds, Strategy.MEDIAN, 1); - System.out.println("data set median "); - dsMedian.print(); -// -// DataSet dsMost = impute(ds, Strategy.MOST_FREQUENT, 1); -// System.out.println("data set most frequent "); -// dsMost.print(); -// -// DataSet dsMean0 = impute(ds, Strategy.MEAN, 0); -// System.out.println("data set mean "); -// dsMean0.print(); -// -// DataSet dsMedian0 = impute(ds, Strategy.MEDIAN, 0); -// System.out.println("data set median "); -// dsMedian0.print(); -// -// DataSet dsMax0 = impute(ds, Strategy.MOST_FREQUENT, 0); -// System.out.println("data set max "); -// dsMax0.print(); - - } catch (Exception e) { - System.out.println("here happened an exception"); - e.printStackTrace(); - } - } - - /** - * - * @param sparseData - * @param strategy use MEAN, MEDIAN or the MOST_FREQUENT value to impute missing values - * @param axis 0: impute along columns, 1: imput along rows - * @return dataset without zeroes / missing values - * @throws Exception - */ - public static DataSet impute(DataSet sparseData, Strategy strategy, int axis) throws Exception{ - double val; - DataSet ret = sparseData; - if(axis==0){ //columnwise - switch (strategy){ - case MEAN: - ret=sparseData.map(new MapFunction() { - @Override - public DenseVector map(DenseVector vec) throws Exception { - for(int i = 0; i() { - @Override - public DenseVector map(DenseVector vec) throws Exception { - for(int i = 0; i() { - @Override - public DenseVector map(DenseVector vec) throws Exception { - for(int i = 0; i() { - double v; - @Override - public DenseVector map(DenseVector vec) { - for(int i = 0; i() { - @Override - public DenseVector map(DenseVector vec) throws Exception { - for(int i = 0; i() { - @Override - public DenseVector map(DenseVector vec) throws Exception { - for(int i = 0; i numArray = new ArrayList<>(); - double val; - for(int i =0; i< vec.size(); i++){ - val=vec.apply(i); - if(Double.compare(Double.NaN, val)!=0){ - numArray.add(val); - } - } - Collections.sort(numArray); - int middle = numArray.size() / 2; - if(numArray.size() % 2 == 0){ - double medianA = numArray.get(middle); - double medianB = numArray.get(middle-1); - ret = (medianA + medianB) / 2d; - } else{ - ret = numArray.get(middle); - } - return ret; - } - public static double getValueMOST(DenseVector vec){ - double ret=0; - HashMap frequencies= new HashMap<>(); - for(int i =0; imax){ - max=frequencies.get(key); - maxKey=key; - } - } - ret=maxKey; - return ret; - - } - - public static int numOfElementsNotZero(DenseVector vec){ - int zeros=0; - for(int i=0; i ds, Strategy strategy) throws Exception{ - //(entry, 1, dimension) - DataSet> nonZeroTuples= ds.flatMap(new FlatMapFunction>() { - @Override - public void flatMap(DenseVector vec, Collector> col) throws Exception { - double[] entries= vec.data(); - double entry; - for(int i = 0; i (entry, 1, i) ); - } - } - } - }); - - List>> lists; - DataSet>> nonZeros2= nonZeroTuples.map(new MapFunction, Tuple2> >() { - @Override - public Tuple2> map(Tuple3 t) throws Exception { - return new Tuple2>(t.f2, Lists.newArrayList(t.f0)); - } - }); - - lists= nonZeros2.groupBy(0).reduce(new ReduceFunction>>() { - List ret= new java.util.LinkedList<>(); - @Override - public Tuple2> reduce(Tuple2> t1, - Tuple2> t2) throws Exception { - ret=ListUtils.union(t1.f1,t2.f1); - return new Tuple2>(t1.f0, ret); - } - }).collect(); - - switch(strategy){ - case MEAN: - DataSet> infos= nonZeroTuples.groupBy(2).sum(0).andSum(1); - meansA= new double[lists.size()]; - final String s = "hello"; - List> means= infos.map(new MapFunction, Tuple2>() { - @Override - public Tuple2 map(Tuple3 t) throws Exception { - double mean= (double) t.f0/(double) t.f1; - return new Tuple2(t.f2, mean); - } - }).collect(); - for(Tuple2 t: means){ - meansA[t.f0]=t.f1; - } - break; - case MEDIAN: - double median; - int size; - //lists contains a list for every dimension in which all the values of the data set are written. - // we will later sort every of those lists in order to determine the median. - - medians= new double[lists.size()]; - List l; - for(Tuple2> t: lists){ - l= t.f1; - Collections.sort(l); - System.out.println("list " + t.f0 + " ist "+ l.toString()); - size=l.size(); - if(size%2==0){ - median=(l.get(size/2) + l.get(size/2-1))/2d; - }else{ - median=l.get(size/2); - } - medians[t.f0]=median; - } - for(int i = 0; i frequencies= new HashMap<>(); - mostValues= new double[lists.size()]; - for(Tuple2> t: lists){ - int max=0; - double maxKey=0; - // calculate frequencie hashmap for each dimension - List list=t.f1; - frequencies.clear(); - for(int j=0; jmax){ - max=frequencies.get(k); - maxKey=k; - } - } - mostValues[t.f0]=maxKey; - - } - break; - } - } - - -} From e4b336fdbf93084c30a8ee0067efcd7a4729c0e1 Mon Sep 17 00:00:00 2001 From: p4nna Date: Thu, 30 Mar 2017 10:02:07 +0200 Subject: [PATCH 03/14] deleted class in false destination --- Strategy.java | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 Strategy.java diff --git a/Strategy.java b/Strategy.java deleted file mode 100644 index e29d389a0cbd6..0000000000000 --- a/Strategy.java +++ /dev/null @@ -1,5 +0,0 @@ -package Imputer; - -public enum Strategy { -MEAN, MEDIAN, MOST_FREQUENT; -} From ee6d57cfa669876f983cbf10eb6ffdd02b5c3052 Mon Sep 17 00:00:00 2001 From: p4nna Date: Thu, 30 Mar 2017 10:04:04 +0200 Subject: [PATCH 04/14] added imputer class with strategy class the imputer impustes values into a sparse DataSet of vectors with different strategies (mean, median or most frequent value as listed in the strategy class) --- .../flink-ml/src/main/java/Imputer.java | 374 ++++++++++++++++++ .../flink-ml/src/main/java/Strategy.java | 5 + 2 files changed, 379 insertions(+) create mode 100644 flink-libraries/flink-ml/src/main/java/Imputer.java create mode 100644 flink-libraries/flink-ml/src/main/java/Strategy.java diff --git a/flink-libraries/flink-ml/src/main/java/Imputer.java b/flink-libraries/flink-ml/src/main/java/Imputer.java new file mode 100644 index 0000000000000..69e4246e01b46 --- /dev/null +++ b/flink-libraries/flink-ml/src/main/java/Imputer.java @@ -0,0 +1,374 @@ +package Imputer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +//import java.util.Set; +//import java.util.SortedSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.collections.ListUtils; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.AggregateOperator; +import org.apache.flink.api.java.operators.MapOperator; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.ml.math.DenseVector; +import org.apache.flink.shaded.com.google.common.collect.Lists; +import org.apache.flink.util.Collector; +import org.apache.hadoop.mapreduce.Reducer; + +import scala.collection.mutable.LinkedList; +import scala.reflect.internal.Trees.New; + +public class Imputer { + + + static DenseVector testvec1= new DenseVector(new double[]{Double.NaN,3.0,1.0, 3.0}); + static DenseVector testvec2= new DenseVector(new double[]{1.0,7.0,Double.NaN, 1.0}); + static DenseVector testvec3= new DenseVector(new double[]{0.0,5.0,Double.NaN, 2.0}); + static DenseVector testvec4= new DenseVector(new double[]{6.5,Double.NaN,0.5, 0.5}); + static DenseVector testvec5= new DenseVector(new double[]{6.5,1.0,0.5, 0.5}); + + static ExecutionEnvironment env= ExecutionEnvironment.getExecutionEnvironment(); + + static DataSet ds = env.fromElements(testvec1, testvec2, testvec3, testvec4, testvec5); +// static DataSet ds = env.fromElements( testvec2, testvec3); + private static double[] meansA; + private static double[] medians; + private static double[] mostValues; +// final static ConcurrentHashMap meansHM= new ConcurrentHashMap<>(); + + + public static void main(String[] args){ + try { +// DataSet dsMean = impute(ds, Strategy.MEAN, 1); +// System.out.println("data set mean "); +// dsMean.print(); +// + DataSet dsMedian = impute(ds, Strategy.MEDIAN, 1); + System.out.println("data set median "); + dsMedian.print(); +// +// DataSet dsMost = impute(ds, Strategy.MOST_FREQUENT, 1); +// System.out.println("data set most frequent "); +// dsMost.print(); +// +// DataSet dsMean0 = impute(ds, Strategy.MEAN, 0); +// System.out.println("data set mean "); +// dsMean0.print(); +// +// DataSet dsMedian0 = impute(ds, Strategy.MEDIAN, 0); +// System.out.println("data set median "); +// dsMedian0.print(); +// +// DataSet dsMax0 = impute(ds, Strategy.MOST_FREQUENT, 0); +// System.out.println("data set max "); +// dsMax0.print(); + + } catch (Exception e) { + System.out.println("here happened an exception"); + e.printStackTrace(); + } + } + + /** + * + * @param sparseData + * @param strategy use MEAN, MEDIAN or the MOST_FREQUENT value to impute missing values + * @param axis 0: impute along columns, 1: imput along rows + * @return dataset without zeroes / missing values + * @throws Exception + */ + public static DataSet impute(DataSet sparseData, Strategy strategy, int axis) throws Exception{ + double val; + DataSet ret = sparseData; + if(axis==0){ //columnwise + switch (strategy){ + case MEAN: + ret=sparseData.map(new MapFunction() { + @Override + public DenseVector map(DenseVector vec) throws Exception { + for(int i = 0; i() { + @Override + public DenseVector map(DenseVector vec) throws Exception { + for(int i = 0; i() { + @Override + public DenseVector map(DenseVector vec) throws Exception { + for(int i = 0; i() { + double v; + @Override + public DenseVector map(DenseVector vec) { + for(int i = 0; i() { + @Override + public DenseVector map(DenseVector vec) throws Exception { + for(int i = 0; i() { + @Override + public DenseVector map(DenseVector vec) throws Exception { + for(int i = 0; i numArray = new ArrayList<>(); + double val; + for(int i =0; i< vec.size(); i++){ + val=vec.apply(i); + if(Double.compare(Double.NaN, val)!=0){ + numArray.add(val); + } + } + Collections.sort(numArray); + int middle = numArray.size() / 2; + if(numArray.size() % 2 == 0){ + double medianA = numArray.get(middle); + double medianB = numArray.get(middle-1); + ret = (medianA + medianB) / 2d; + } else{ + ret = numArray.get(middle); + } + return ret; + } + public static double getValueMOST(DenseVector vec){ + double ret=0; + HashMap frequencies= new HashMap<>(); + for(int i =0; imax){ + max=frequencies.get(key); + maxKey=key; + } + } + ret=maxKey; + return ret; + + } + + public static int numOfElementsNotZero(DenseVector vec){ + int zeros=0; + for(int i=0; i ds, Strategy strategy) throws Exception{ + //(entry, 1, dimension) + DataSet> nonZeroTuples= ds.flatMap(new FlatMapFunction>() { + @Override + public void flatMap(DenseVector vec, Collector> col) throws Exception { + double[] entries= vec.data(); + double entry; + for(int i = 0; i (entry, 1, i) ); + } + } + } + }); + + List>> lists; + DataSet>> nonZeros2= nonZeroTuples.map(new MapFunction, Tuple2> >() { + @Override + public Tuple2> map(Tuple3 t) throws Exception { + return new Tuple2>(t.f2, Lists.newArrayList(t.f0)); + } + }); + + lists= nonZeros2.groupBy(0).reduce(new ReduceFunction>>() { + List ret= new java.util.LinkedList<>(); + @Override + public Tuple2> reduce(Tuple2> t1, + Tuple2> t2) throws Exception { + ret=ListUtils.union(t1.f1,t2.f1); + return new Tuple2>(t1.f0, ret); + } + }).collect(); + + switch(strategy){ + case MEAN: + DataSet> infos= nonZeroTuples.groupBy(2).sum(0).andSum(1); + meansA= new double[lists.size()]; + final String s = "hello"; + List> means= infos.map(new MapFunction, Tuple2>() { + @Override + public Tuple2 map(Tuple3 t) throws Exception { + double mean= (double) t.f0/(double) t.f1; + return new Tuple2(t.f2, mean); + } + }).collect(); + for(Tuple2 t: means){ + meansA[t.f0]=t.f1; + } + break; + case MEDIAN: + double median; + int size; + //lists contains a list for every dimension in which all the values of the data set are written. + // we will later sort every of those lists in order to determine the median. + + medians= new double[lists.size()]; + List l; + for(Tuple2> t: lists){ + l= t.f1; + Collections.sort(l); + System.out.println("list " + t.f0 + " ist "+ l.toString()); + size=l.size(); + if(size%2==0){ + median=(l.get(size/2) + l.get(size/2-1))/2d; + }else{ + median=l.get(size/2); + } + medians[t.f0]=median; + } + for(int i = 0; i frequencies= new HashMap<>(); + mostValues= new double[lists.size()]; + for(Tuple2> t: lists){ + int max=0; + double maxKey=0; + // calculate frequencie hashmap for each dimension + List list=t.f1; + frequencies.clear(); + for(int j=0; jmax){ + max=frequencies.get(k); + maxKey=k; + } + } + mostValues[t.f0]=maxKey; + + } + break; + } + } + + +} diff --git a/flink-libraries/flink-ml/src/main/java/Strategy.java b/flink-libraries/flink-ml/src/main/java/Strategy.java new file mode 100644 index 0000000000000..e29d389a0cbd6 --- /dev/null +++ b/flink-libraries/flink-ml/src/main/java/Strategy.java @@ -0,0 +1,5 @@ +package Imputer; + +public enum Strategy { +MEAN, MEDIAN, MOST_FREQUENT; +} From 57524586cbd63e2f0dfdc70cb34df82e6451c3be Mon Sep 17 00:00:00 2001 From: p4nna Date: Thu, 30 Mar 2017 10:04:47 +0200 Subject: [PATCH 05/14] added a test class for the new imputer class --- .../src/test/Imputer/columnwiseTest.java | 120 ++++++++++++++++ .../src/test/Imputer/rowwiseTest.java | 134 ++++++++++++++++++ 2 files changed, 254 insertions(+) create mode 100644 flink-libraries/flink-ml/src/test/Imputer/columnwiseTest.java create mode 100644 flink-libraries/flink-ml/src/test/Imputer/rowwiseTest.java diff --git a/flink-libraries/flink-ml/src/test/Imputer/columnwiseTest.java b/flink-libraries/flink-ml/src/test/Imputer/columnwiseTest.java new file mode 100644 index 0000000000000..71fa77418ee5c --- /dev/null +++ b/flink-libraries/flink-ml/src/test/Imputer/columnwiseTest.java @@ -0,0 +1,120 @@ +package Imputer; + +import static org.junit.Assert.*; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.ml.math.DenseVector; +import org.junit.Test; + +public class columnwiseTest { + + static DenseVector testvec1= new DenseVector(new double[]{Double.NaN,3.0,1.0, 3.0}); + static DenseVector testvec2= new DenseVector(new double[]{1.0,7.0,Double.NaN, 1.0}); + static DenseVector testvec3= new DenseVector(new double[]{0.0,5.0,Double.NaN, 2.0}); + static DenseVector testvec4= new DenseVector(new double[]{6.5,Double.NaN,0.5, 0.5}); + static DenseVector testvec5= new DenseVector(new double[]{6.5,1.0,0.5, 0.5}); + + static ExecutionEnvironment env= ExecutionEnvironment.getExecutionEnvironment(); + static DataSet ds = env.fromElements( testvec3, testvec4); + + + @Test + public void testMEAN() throws Exception { + DataSet dsMean = ds; + +// DenseVector testvec1e= new DenseVector(new double[]{4.0,3.0,1.0, 3.0}); +// DenseVector testvec2e= new DenseVector(new double[]{1.0,7.0,6.0, 1.0}); + DenseVector testvec3e= new DenseVector(new double[]{0.0,5.0,(5.0+2.0)/3, 2.0}); + DenseVector testvec4e= new DenseVector(new double[]{6.5,2.5,0.5, 0.5}); +// DenseVector testvec5e= new DenseVector(new double[]{6.5,1.0,0.5, 0.5}); + + + DataSet dsExpected = env.fromElements( testvec3e, testvec4e); + + try { + dsMean = Imputer.impute(ds, Strategy.MEAN, 0); + } catch (Exception e) { + fail("MEAN could not be calculated"); + + } + boolean fails=false; + for(DenseVector v: dsExpected.collect()){ + if(!dsMean.collect().contains(v)){ + fails=true; + }; + } + + if(fails){ + fail("MEAN could not be calculated columnwise"); + System.out.println("dsmean: "); dsMean.print(); + System.out.println("dsexpected: " );dsExpected.print(); + } + } + + + + @Test + public void testMEDIAN() throws Exception { + DataSet dsMedian = ds; + + DenseVector testvec3e= new DenseVector(new double[]{0.0,5.0,2.0, 2.0}); + DenseVector testvec4e= new DenseVector(new double[]{6.5,0.5,0.5, 0.5}); + + + DataSet dsExpected = env.fromElements( testvec3e, testvec4e); + + try { + dsMedian = Imputer.impute(ds, Strategy.MEDIAN, 0); + } catch (Exception e) { + fail("MEDIAN could not be calculated"); + + } + boolean fails=false; + for(DenseVector v: dsExpected.collect()){ + if(!dsMedian.collect().contains(v)){ + fails=true; + }; + } + + if(fails){ + fail("MEDIAN could not be calculated columnwise"); + System.out.println("dsmedian: "); dsMedian.print(); + System.out.println("dsexpected: " );dsExpected.print(); + } + } + + + @Test + public void testMOSTFREQUENT() throws Exception { + DataSet dsMost = env.fromElements( testvec2, testvec1, testvec4);; + + DenseVector testvec2e= new DenseVector(new double[]{1.0,7.0,1.0, 1.0}); + DenseVector testvec1e= new DenseVector(new double[]{3.0,3.0,1.0, 3.0}); + DenseVector testvec4e= new DenseVector(new double[]{6.5,0.5,0.5, 0.5}); + + + DataSet dsExpected = env.fromElements( testvec2e, testvec1e, testvec4e); + + try { + dsMost = Imputer.impute(dsMost, Strategy.MOST_FREQUENT, 0); + } catch (Exception e) { + fail("MOSTFREQUENT could not be calculated"); + + } + boolean fails=false; + for(DenseVector v: dsExpected.collect()){ + if(!dsMost.collect().contains(v)){ + fails=true; + }; + } + + if(fails){ + System.out.println("dsmostfrequent: "); dsMost.print(); + System.out.println("dsexpected: " );dsExpected.print(); + fail("MOSTFREQUENT could not be calculated columnwise"); + } + } + + +} diff --git a/flink-libraries/flink-ml/src/test/Imputer/rowwiseTest.java b/flink-libraries/flink-ml/src/test/Imputer/rowwiseTest.java new file mode 100644 index 0000000000000..0d98e23aab0e3 --- /dev/null +++ b/flink-libraries/flink-ml/src/test/Imputer/rowwiseTest.java @@ -0,0 +1,134 @@ +package Imputer; + +import static org.junit.Assert.*; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.ml.math.DenseVector; +import org.junit.Test; + +public class rowwiseTest { + + + static DenseVector testvec1= new DenseVector(new double[]{Double.NaN,3.0,1.0, 3.0}); + static DenseVector testvec2= new DenseVector(new double[]{1.0,7.0,Double.NaN, 1.0}); + static DenseVector testvec3= new DenseVector(new double[]{0.0,5.0,Double.NaN, 2.0}); + static DenseVector testvec4= new DenseVector(new double[]{6.5,Double.NaN,0.5, 0.5}); + static DenseVector testvec5= new DenseVector(new double[]{6.5,1.0,0.5, 0.5}); + + static ExecutionEnvironment env= ExecutionEnvironment.getExecutionEnvironment(); + static DataSet ds = env.fromElements( testvec1, testvec2, testvec3, testvec4, testvec5); + + + @Test + public void testMEAN() throws Exception { + DataSet dsMean = ds; + + DenseVector testvec1e= new DenseVector(new double[]{14.0/4.0,3.0,1.0, 3.0}); + DenseVector testvec2e= new DenseVector(new double[]{1.0,7.0,2.0/3.0, 1.0}); + DenseVector testvec3e= new DenseVector(new double[]{0.0,5.0,2.0/3.0, 2.0}); + DenseVector testvec4e= new DenseVector(new double[]{6.5,4.0,0.5, 0.5}); + DenseVector testvec5e= new DenseVector(new double[]{6.5,1.0,0.5, 0.5}); + + + DataSet dsExpected = env.fromElements(testvec1e, testvec2e, testvec3e, testvec4e, testvec5e); + + try { + dsMean = Imputer.impute(ds, Strategy.MEAN, 1); + } catch (Exception e) { + fail("MEAN could not be calculated"); + + } + boolean fails=false; + for(DenseVector v: dsExpected.collect()){ + if(!dsMean.collect().contains(v)){ + fails=true; + }; + } + + if(fails){ + fail("MEAN could not be calculated rowwise"); + System.out.println("dsmean: "); dsMean.print(); + System.out.println("dsexpected: " );dsExpected.print(); + } + } + + + + @Test + public void testMEDIAN() throws Exception { + DataSet dsMedian = ds; + + DenseVector testvec1e= new DenseVector(new double[]{(7.5/2.0),3.0,1.0, 3.0}); + DenseVector testvec2e= new DenseVector(new double[]{1.0,7.0,0.5, 1.0}); + DenseVector testvec3e= new DenseVector(new double[]{0.0,5.0,0.5, 2.0}); + DenseVector testvec4e= new DenseVector(new double[]{6.5,4.0,0.5, 0.5}); + DenseVector testvec5e= new DenseVector(new double[]{6.5,1.0,0.5, 0.5}); + + + DataSet dsExpected = env.fromElements(testvec1e, testvec2e, testvec3e, testvec4e, testvec5e); + + try { + dsMedian = Imputer.impute(ds, Strategy.MEDIAN, 1); + } catch (Exception e) { + fail("MEDIAN could not be calculated"); + + } + boolean fails=false; + for(DenseVector v: dsExpected.collect()){ + if(!dsMedian.collect().contains(v)){ + fails=true; + }; + } + + if(fails){ + System.out.println("dsmedian: "); dsMedian.print(); + System.out.println("dsexpected: " );dsExpected.print(); + fail("MEDIAN could not be calculated rowwise"); + } + } + + + + @Test + public void testMOSTFREQUENT() throws Exception { + + DenseVector testvec1= new DenseVector(new double[]{Double.NaN,3.0,1.0, Double.NaN}); + DenseVector testvec2= new DenseVector(new double[]{1.0,7.0,Double.NaN, 1.0}); + DenseVector testvec3= new DenseVector(new double[]{0.0,5.0,Double.NaN, 2.0}); + DenseVector testvec4= new DenseVector(new double[]{6.5,2.0,0.5, 0.5}); + DenseVector testvec5= new DenseVector(new double[]{6.5,1.0,0.5, 0.5}); + DataSet dsMost = env.fromElements(testvec1, testvec2, testvec3, testvec4, testvec5);; + + + DenseVector testvec1e= new DenseVector(new double[]{6.5,3.0,1.0, 0.5}); + DenseVector testvec2e= new DenseVector(new double[]{1.0,7.0,0.5, 1.0}); + DenseVector testvec3e= new DenseVector(new double[]{0.0,5.0,0.5, 2.0}); + DenseVector testvec4e= new DenseVector(new double[]{6.5,2.0,0.5, 0.5}); + DenseVector testvec5e= new DenseVector(new double[]{6.5,1.0,0.5, 0.5}); + + + DataSet dsExpected = env.fromElements(testvec1e, testvec2e, testvec3e, testvec4e, testvec5e); + + try { + dsMost = Imputer.impute(dsMost, Strategy.MOST_FREQUENT, 1); + } catch (Exception e) { + fail("MOSTFREQUENT could not be calculated"); + + } + boolean fails=false; + for(DenseVector v: dsExpected.collect()){ + if(!dsMost.collect().contains(v)){ + fails=true; + }; + } + + if(fails){ + System.out.println("dsMost: "); dsMost.print(); + System.out.println("dsexpected: " );dsExpected.print(); + fail("MOSTFREQUENT could not be calculated rowwise"); + } + } + + +} From 72ebd5e210f583cd7e8df21ea8d73c06e835e198 Mon Sep 17 00:00:00 2001 From: p4nna Date: Thu, 30 Mar 2017 10:08:49 +0200 Subject: [PATCH 06/14] [FLINK-5785] Add an Imputer for preparing data, removed unnecessary things and comments, added license --- .../flink-ml/src/main/java/Imputer.java | 53 ++++++------------- 1 file changed, 17 insertions(+), 36 deletions(-) diff --git a/flink-libraries/flink-ml/src/main/java/Imputer.java b/flink-libraries/flink-ml/src/main/java/Imputer.java index 69e4246e01b46..6d68704ff3e43 100644 --- a/flink-libraries/flink-ml/src/main/java/Imputer.java +++ b/flink-libraries/flink-ml/src/main/java/Imputer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package Imputer; import java.util.ArrayList; import java.util.Arrays; @@ -5,8 +22,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; -//import java.util.Set; -//import java.util.SortedSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -41,45 +56,11 @@ public class Imputer { static ExecutionEnvironment env= ExecutionEnvironment.getExecutionEnvironment(); static DataSet ds = env.fromElements(testvec1, testvec2, testvec3, testvec4, testvec5); -// static DataSet ds = env.fromElements( testvec2, testvec3); private static double[] meansA; private static double[] medians; private static double[] mostValues; -// final static ConcurrentHashMap meansHM= new ConcurrentHashMap<>(); - public static void main(String[] args){ - try { -// DataSet dsMean = impute(ds, Strategy.MEAN, 1); -// System.out.println("data set mean "); -// dsMean.print(); -// - DataSet dsMedian = impute(ds, Strategy.MEDIAN, 1); - System.out.println("data set median "); - dsMedian.print(); -// -// DataSet dsMost = impute(ds, Strategy.MOST_FREQUENT, 1); -// System.out.println("data set most frequent "); -// dsMost.print(); -// -// DataSet dsMean0 = impute(ds, Strategy.MEAN, 0); -// System.out.println("data set mean "); -// dsMean0.print(); -// -// DataSet dsMedian0 = impute(ds, Strategy.MEDIAN, 0); -// System.out.println("data set median "); -// dsMedian0.print(); -// -// DataSet dsMax0 = impute(ds, Strategy.MOST_FREQUENT, 0); -// System.out.println("data set max "); -// dsMax0.print(); - - } catch (Exception e) { - System.out.println("here happened an exception"); - e.printStackTrace(); - } - } - /** * * @param sparseData From 31dbfc704247b0c4723d6d3091a16759fbe18041 Mon Sep 17 00:00:00 2001 From: p4nna Date: Thu, 30 Mar 2017 10:09:26 +0200 Subject: [PATCH 07/14] [FLINK-5785] Add an Imputer for preparing data added license --- .../flink-ml/src/main/java/Strategy.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/flink-libraries/flink-ml/src/main/java/Strategy.java b/flink-libraries/flink-ml/src/main/java/Strategy.java index e29d389a0cbd6..5e86d3fe81aba 100644 --- a/flink-libraries/flink-ml/src/main/java/Strategy.java +++ b/flink-libraries/flink-ml/src/main/java/Strategy.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package Imputer; public enum Strategy { From d0f7b816bea49090633b4bc85762bbf70b192b27 Mon Sep 17 00:00:00 2001 From: p4nna Date: Thu, 30 Mar 2017 10:10:03 +0200 Subject: [PATCH 08/14] [FLINK-5785] Add an Imputer for preparing data added license --- .../src/test/Imputer/columnwiseTest.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/flink-libraries/flink-ml/src/test/Imputer/columnwiseTest.java b/flink-libraries/flink-ml/src/test/Imputer/columnwiseTest.java index 71fa77418ee5c..a13b4f3d3cf20 100644 --- a/flink-libraries/flink-ml/src/test/Imputer/columnwiseTest.java +++ b/flink-libraries/flink-ml/src/test/Imputer/columnwiseTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package Imputer; import static org.junit.Assert.*; From 76f996e2ddc5d912c947f20e2109bd53973c8091 Mon Sep 17 00:00:00 2001 From: p4nna Date: Thu, 30 Mar 2017 10:10:33 +0200 Subject: [PATCH 09/14] [FLINK-5785] Add an Imputer for preparing data added license --- .../flink-ml/src/test/Imputer/rowwiseTest.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/flink-libraries/flink-ml/src/test/Imputer/rowwiseTest.java b/flink-libraries/flink-ml/src/test/Imputer/rowwiseTest.java index 0d98e23aab0e3..490b59c91d501 100644 --- a/flink-libraries/flink-ml/src/test/Imputer/rowwiseTest.java +++ b/flink-libraries/flink-ml/src/test/Imputer/rowwiseTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package Imputer; import static org.junit.Assert.*; From d533805c7b37888632238ce87e73e6ef9d081d02 Mon Sep 17 00:00:00 2001 From: p4nna Date: Fri, 31 Mar 2017 17:54:37 +0200 Subject: [PATCH 10/14] [FLINK-5785] Add an Imputer for preparing data should work now. --- Imputer.java | 328 ++++++++++++++++++++++++++++++++++++++++++++++++++ Strategy.java | 21 ++++ 2 files changed, 349 insertions(+) create mode 100644 Imputer.java create mode 100644 Strategy.java diff --git a/Imputer.java b/Imputer.java new file mode 100644 index 0000000000000..60c00782e778b --- /dev/null +++ b/Imputer.java @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.collections.ListUtils; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.util.Collector; +import com.google.common.collect.Lists; +import breeze.linalg.DenseVector; + +public class Imputer { + + private static double[] meansA; + private static double[] medians; + private static double[] mostValues; + + /** + * + * @param sparseData + * @param strategy use MEAN, MEDIAN or the MOST_FREQUENT value to impute missing values + * @param axis 0: impute along columns, 1: imput along rows + * @return dataset without zeroes / missing values + * @throws Exception + */ + public static DataSet> impute(DataSet> sparseData, Strategy strategy, int axis) throws Exception{ + DataSet> ret = sparseData; + if(axis==0){ //columnwise + switch (strategy){ + case MEAN: + ret=sparseData.map(new MapFunction, DenseVector>() { + @Override + public DenseVector map(DenseVector vec) throws Exception { + for(int i = 0; i, DenseVector>() { + @Override + public DenseVector map(DenseVector vec) throws Exception { + for(int i = 0; i, DenseVector>() { + @Override + public DenseVector map(DenseVector vec) throws Exception { + for(int i = 0; i, DenseVector>() { + @Override + public DenseVector map(DenseVector vec) { + for(int i = 0; i, DenseVector>() { + @Override + public DenseVector map(DenseVector vec) throws Exception { + for(int i = 0; i, DenseVector>() { + @Override + public DenseVector map(DenseVector vec) throws Exception { + for(int i = 0; i vec){ + int num=0; + double sum=0; + double v; + for(int i = 0; i vec){ + double ret=0; + List numArray = new ArrayList<>(); + double val; + for(int i =0; i< vec.size(); i++){ + val=(double)vec.apply(i); + if(Double.compare(Double.NaN, val)!=0){ + numArray.add(val); + } + } + Collections.sort(numArray); + int middle = numArray.size() / 2; + if(numArray.size() % 2 == 0){ + double medianA = numArray.get(middle); + double medianB = numArray.get(middle-1); + ret = (medianA + medianB) / 2d; + }else{ + ret = numArray.get(middle); + } + return ret; + } + public static double getValueMOST(DenseVector vec){ + double ret=0; + HashMap frequencies= new HashMap<>(); + for(int i =0; imax){ + max=frequencies.get(key); + maxKey=key; + } + } + ret=maxKey; + return ret; + + } + + public static int numOfElementsNotZero(DenseVector vec){ + int zeros=0; + for(int i=0; i> ds, Strategy strategy) throws Exception{ + //(entry, 1, dimension) + DataSet> nonZeroTuples= ds.flatMap(new FlatMapFunction,Tuple3>() { + @Override + public void flatMap(DenseVector vec, Collector> col) throws Exception { + double[] entries= (double[]) vec.data(); + double entry; + for(int i = 0; i (entry, 1, i) ); + } + } + } + }); + + List>> lists; + DataSet>> nonZeros2= nonZeroTuples.map(new MapFunction, Tuple2> >() { + @Override + public Tuple2> map(Tuple3 t) throws Exception { + return new Tuple2>(t.f2, Lists.newArrayList(t.f0)); + } + }); + + lists= nonZeros2.groupBy(0).reduce(new ReduceFunction>>() { + List ret= new java.util.LinkedList<>(); + @Override + public Tuple2> reduce(Tuple2> t1, + Tuple2> t2) throws Exception { + ret=ListUtils.union(t1.f1,t2.f1); + return new Tuple2>(t1.f0, ret); + } + }).collect(); + + switch(strategy){ + case MEAN: + DataSet> infos= nonZeroTuples.groupBy(2).sum(0).andSum(1); + meansA= new double[lists.size()]; + List> means= infos.map(new MapFunction, Tuple2>() { + @Override + public Tuple2 map(Tuple3 t) throws Exception { + double mean= (double) t.f0/(double) t.f1; + return new Tuple2(t.f2, mean); + } + }).collect(); + for(Tuple2 t: means){ + meansA[t.f0]=t.f1; + } + break; + case MEDIAN: + double median; + int size; + //lists contains a list for every dimension in which all the values of the data set are written. + // we will later sort every of those lists in order to determine the median. + + medians= new double[lists.size()]; + List l; + for(Tuple2> t: lists){ + l= t.f1; + Collections.sort(l); + System.out.println("list " + t.f0 + " ist "+ l.toString()); + size=l.size(); + if(size%2==0){ + median=(l.get(size/2) + l.get(size/2-1))/2d; + }else{ + median=l.get(size/2); + } + medians[t.f0]=median; + } + for(int i = 0; i frequencies= new HashMap<>(); + mostValues= new double[lists.size()]; + for(Tuple2> t: lists){ + int max=0; + double maxKey=0; + // calculate frequencie hashmap for each dimension + List list=t.f1; + frequencies.clear(); + for(int j=0; jmax){ + max=frequencies.get(k); + maxKey=k; + } + } + mostValues[t.f0]=maxKey; + + } + break; + } + } + + +} diff --git a/Strategy.java b/Strategy.java new file mode 100644 index 0000000000000..6a0930213beac --- /dev/null +++ b/Strategy.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing; +public enum Strategy { +MEAN, MEDIAN, MOST_FREQUENT; +} From 10dcdfab0ea27e6191cf6d0efad05a563f389ba4 Mon Sep 17 00:00:00 2001 From: p4nna Date: Fri, 31 Mar 2017 17:56:04 +0200 Subject: [PATCH 11/14] [FLINK-5785] Add an Imputer for preparing data was in wrong place --- Imputer.java | 328 --------------------------------------------------- 1 file changed, 328 deletions(-) delete mode 100644 Imputer.java diff --git a/Imputer.java b/Imputer.java deleted file mode 100644 index 60c00782e778b..0000000000000 --- a/Imputer.java +++ /dev/null @@ -1,328 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.ml.preprocessing; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; - -import org.apache.commons.collections.ListUtils; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.util.Collector; -import com.google.common.collect.Lists; -import breeze.linalg.DenseVector; - -public class Imputer { - - private static double[] meansA; - private static double[] medians; - private static double[] mostValues; - - /** - * - * @param sparseData - * @param strategy use MEAN, MEDIAN or the MOST_FREQUENT value to impute missing values - * @param axis 0: impute along columns, 1: imput along rows - * @return dataset without zeroes / missing values - * @throws Exception - */ - public static DataSet> impute(DataSet> sparseData, Strategy strategy, int axis) throws Exception{ - DataSet> ret = sparseData; - if(axis==0){ //columnwise - switch (strategy){ - case MEAN: - ret=sparseData.map(new MapFunction, DenseVector>() { - @Override - public DenseVector map(DenseVector vec) throws Exception { - for(int i = 0; i, DenseVector>() { - @Override - public DenseVector map(DenseVector vec) throws Exception { - for(int i = 0; i, DenseVector>() { - @Override - public DenseVector map(DenseVector vec) throws Exception { - for(int i = 0; i, DenseVector>() { - @Override - public DenseVector map(DenseVector vec) { - for(int i = 0; i, DenseVector>() { - @Override - public DenseVector map(DenseVector vec) throws Exception { - for(int i = 0; i, DenseVector>() { - @Override - public DenseVector map(DenseVector vec) throws Exception { - for(int i = 0; i vec){ - int num=0; - double sum=0; - double v; - for(int i = 0; i vec){ - double ret=0; - List numArray = new ArrayList<>(); - double val; - for(int i =0; i< vec.size(); i++){ - val=(double)vec.apply(i); - if(Double.compare(Double.NaN, val)!=0){ - numArray.add(val); - } - } - Collections.sort(numArray); - int middle = numArray.size() / 2; - if(numArray.size() % 2 == 0){ - double medianA = numArray.get(middle); - double medianB = numArray.get(middle-1); - ret = (medianA + medianB) / 2d; - }else{ - ret = numArray.get(middle); - } - return ret; - } - public static double getValueMOST(DenseVector vec){ - double ret=0; - HashMap frequencies= new HashMap<>(); - for(int i =0; imax){ - max=frequencies.get(key); - maxKey=key; - } - } - ret=maxKey; - return ret; - - } - - public static int numOfElementsNotZero(DenseVector vec){ - int zeros=0; - for(int i=0; i> ds, Strategy strategy) throws Exception{ - //(entry, 1, dimension) - DataSet> nonZeroTuples= ds.flatMap(new FlatMapFunction,Tuple3>() { - @Override - public void flatMap(DenseVector vec, Collector> col) throws Exception { - double[] entries= (double[]) vec.data(); - double entry; - for(int i = 0; i (entry, 1, i) ); - } - } - } - }); - - List>> lists; - DataSet>> nonZeros2= nonZeroTuples.map(new MapFunction, Tuple2> >() { - @Override - public Tuple2> map(Tuple3 t) throws Exception { - return new Tuple2>(t.f2, Lists.newArrayList(t.f0)); - } - }); - - lists= nonZeros2.groupBy(0).reduce(new ReduceFunction>>() { - List ret= new java.util.LinkedList<>(); - @Override - public Tuple2> reduce(Tuple2> t1, - Tuple2> t2) throws Exception { - ret=ListUtils.union(t1.f1,t2.f1); - return new Tuple2>(t1.f0, ret); - } - }).collect(); - - switch(strategy){ - case MEAN: - DataSet> infos= nonZeroTuples.groupBy(2).sum(0).andSum(1); - meansA= new double[lists.size()]; - List> means= infos.map(new MapFunction, Tuple2>() { - @Override - public Tuple2 map(Tuple3 t) throws Exception { - double mean= (double) t.f0/(double) t.f1; - return new Tuple2(t.f2, mean); - } - }).collect(); - for(Tuple2 t: means){ - meansA[t.f0]=t.f1; - } - break; - case MEDIAN: - double median; - int size; - //lists contains a list for every dimension in which all the values of the data set are written. - // we will later sort every of those lists in order to determine the median. - - medians= new double[lists.size()]; - List l; - for(Tuple2> t: lists){ - l= t.f1; - Collections.sort(l); - System.out.println("list " + t.f0 + " ist "+ l.toString()); - size=l.size(); - if(size%2==0){ - median=(l.get(size/2) + l.get(size/2-1))/2d; - }else{ - median=l.get(size/2); - } - medians[t.f0]=median; - } - for(int i = 0; i frequencies= new HashMap<>(); - mostValues= new double[lists.size()]; - for(Tuple2> t: lists){ - int max=0; - double maxKey=0; - // calculate frequencie hashmap for each dimension - List list=t.f1; - frequencies.clear(); - for(int j=0; jmax){ - max=frequencies.get(k); - maxKey=k; - } - } - mostValues[t.f0]=maxKey; - - } - break; - } - } - - -} From 8e67f01ba1fb707b808473f4961902542aaca369 Mon Sep 17 00:00:00 2001 From: p4nna Date: Fri, 31 Mar 2017 17:56:21 +0200 Subject: [PATCH 12/14] [FLINK-5785] Add an Imputer for preparing data was in wrong place --- Strategy.java | 21 --------------------- 1 file changed, 21 deletions(-) delete mode 100644 Strategy.java diff --git a/Strategy.java b/Strategy.java deleted file mode 100644 index 6a0930213beac..0000000000000 --- a/Strategy.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.ml.preprocessing; -public enum Strategy { -MEAN, MEDIAN, MOST_FREQUENT; -} From c3fdc87e0e9fc07785b4b4b8dc2b1fde4c756d35 Mon Sep 17 00:00:00 2001 From: p4nna Date: Fri, 31 Mar 2017 17:56:59 +0200 Subject: [PATCH 13/14] [FLINK-5785] Add an Imputer for preparing data should work now --- .../flink-ml/src/main/java/Imputer.java | 115 +++++++----------- .../flink-ml/src/main/java/Strategy.java | 5 +- 2 files changed, 46 insertions(+), 74 deletions(-) diff --git a/flink-libraries/flink-ml/src/main/java/Imputer.java b/flink-libraries/flink-ml/src/main/java/Imputer.java index 6d68704ff3e43..60c00782e778b 100644 --- a/flink-libraries/flink-ml/src/main/java/Imputer.java +++ b/flink-libraries/flink-ml/src/main/java/Imputer.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,52 +15,29 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package Imputer; +package org.apache.flink.ml.preprocessing; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.collections.ListUtils; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.operators.AggregateOperator; -import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.ml.math.DenseVector; -import org.apache.flink.shaded.com.google.common.collect.Lists; import org.apache.flink.util.Collector; -import org.apache.hadoop.mapreduce.Reducer; - -import scala.collection.mutable.LinkedList; -import scala.reflect.internal.Trees.New; +import com.google.common.collect.Lists; +import breeze.linalg.DenseVector; public class Imputer { - - static DenseVector testvec1= new DenseVector(new double[]{Double.NaN,3.0,1.0, 3.0}); - static DenseVector testvec2= new DenseVector(new double[]{1.0,7.0,Double.NaN, 1.0}); - static DenseVector testvec3= new DenseVector(new double[]{0.0,5.0,Double.NaN, 2.0}); - static DenseVector testvec4= new DenseVector(new double[]{6.5,Double.NaN,0.5, 0.5}); - static DenseVector testvec5= new DenseVector(new double[]{6.5,1.0,0.5, 0.5}); - - static ExecutionEnvironment env= ExecutionEnvironment.getExecutionEnvironment(); - - static DataSet ds = env.fromElements(testvec1, testvec2, testvec3, testvec4, testvec5); private static double[] meansA; private static double[] medians; private static double[] mostValues; - /** * * @param sparseData @@ -69,17 +46,16 @@ public class Imputer { * @return dataset without zeroes / missing values * @throws Exception */ - public static DataSet impute(DataSet sparseData, Strategy strategy, int axis) throws Exception{ - double val; - DataSet ret = sparseData; + public static DataSet> impute(DataSet> sparseData, Strategy strategy, int axis) throws Exception{ + DataSet> ret = sparseData; if(axis==0){ //columnwise switch (strategy){ case MEAN: - ret=sparseData.map(new MapFunction() { + ret=sparseData.map(new MapFunction, DenseVector>() { @Override - public DenseVector map(DenseVector vec) throws Exception { + public DenseVector map(DenseVector vec) throws Exception { for(int i = 0; i() { + ret=sparseData.map(new MapFunction, DenseVector>() { @Override - public DenseVector map(DenseVector vec) throws Exception { + public DenseVector map(DenseVector vec) throws Exception { for(int i = 0; i() { + ret=sparseData.map(new MapFunction, DenseVector>() { @Override - public DenseVector map(DenseVector vec) throws Exception { + public DenseVector map(DenseVector vec) throws Exception { for(int i = 0; i() { - double v; + ret=sparseData.map(new MapFunction, DenseVector>() { @Override - public DenseVector map(DenseVector vec) { + public DenseVector map(DenseVector vec) { for(int i = 0; i() { + getValue(sparseData, Strategy.MEDIAN); + ret=sparseData.map(new MapFunction, DenseVector>() { @Override - public DenseVector map(DenseVector vec) throws Exception { + public DenseVector map(DenseVector vec) throws Exception { for(int i = 0; i() { + getValue(sparseData, Strategy.MOST_FREQUENT); + ret=sparseData.map(new MapFunction, DenseVector>() { @Override - public DenseVector map(DenseVector vec) throws Exception { + public DenseVector map(DenseVector vec) throws Exception { for(int i = 0; i vec){ int num=0; double sum=0; double v; for(int i = 0; i vec){ double ret=0; List numArray = new ArrayList<>(); double val; for(int i =0; i< vec.size(); i++){ - val=vec.apply(i); + val=(double)vec.apply(i); if(Double.compare(Double.NaN, val)!=0){ numArray.add(val); } @@ -197,19 +172,19 @@ public static double getValueMEDIAN(DenseVector vec){ Collections.sort(numArray); int middle = numArray.size() / 2; if(numArray.size() % 2 == 0){ - double medianA = numArray.get(middle); - double medianB = numArray.get(middle-1); - ret = (medianA + medianB) / 2d; - } else{ - ret = numArray.get(middle); + double medianA = numArray.get(middle); + double medianB = numArray.get(middle-1); + ret = (medianA + medianB) / 2d; + }else{ + ret = numArray.get(middle); } return ret; } - public static double getValueMOST(DenseVector vec){ + public static double getValueMOST(DenseVector vec){ double ret=0; HashMap frequencies= new HashMap<>(); for(int i =0; i vec){ int zeros=0; for(int i=0; i ds, Strategy strategy) throws Exception{ + public static void getValue(DataSet> ds, Strategy strategy) throws Exception{ //(entry, 1, dimension) - DataSet> nonZeroTuples= ds.flatMap(new FlatMapFunction>() { + DataSet> nonZeroTuples= ds.flatMap(new FlatMapFunction,Tuple3>() { @Override - public void flatMap(DenseVector vec, Collector> col) throws Exception { - double[] entries= vec.data(); + public void flatMap(DenseVector vec, Collector> col) throws Exception { + double[] entries= (double[]) vec.data(); double entry; for(int i = 0; i> reduce(Tuple2> t1, case MEAN: DataSet> infos= nonZeroTuples.groupBy(2).sum(0).andSum(1); meansA= new double[lists.size()]; - final String s = "hello"; List> means= infos.map(new MapFunction, Tuple2>() { @Override public Tuple2 map(Tuple3 t) throws Exception { @@ -318,7 +292,6 @@ public Tuple2 map(Tuple3 t) throws Ex break; case MOST_FREQUENT: - double ret=0; double key; HashMap frequencies= new HashMap<>(); mostValues= new double[lists.size()]; diff --git a/flink-libraries/flink-ml/src/main/java/Strategy.java b/flink-libraries/flink-ml/src/main/java/Strategy.java index 5e86d3fe81aba..6a0930213beac 100644 --- a/flink-libraries/flink-ml/src/main/java/Strategy.java +++ b/flink-libraries/flink-ml/src/main/java/Strategy.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,8 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package Imputer; - +package org.apache.flink.ml.preprocessing; public enum Strategy { MEAN, MEDIAN, MOST_FREQUENT; } From 07507b5ca0f1cfebc38f96bb8db32c10f2186bbf Mon Sep 17 00:00:00 2001 From: p4nna Date: Fri, 31 Mar 2017 17:57:37 +0200 Subject: [PATCH 14/14] [FLINK-5785] Add an Imputer for preparing data tests should work now --- .../src/test/Imputer/columnwiseTest.java | 26 +------------------ .../src/test/Imputer/rowwiseTest.java | 2 +- 2 files changed, 2 insertions(+), 26 deletions(-) diff --git a/flink-libraries/flink-ml/src/test/Imputer/columnwiseTest.java b/flink-libraries/flink-ml/src/test/Imputer/columnwiseTest.java index a13b4f3d3cf20..6a4c1f851035c 100644 --- a/flink-libraries/flink-ml/src/test/Imputer/columnwiseTest.java +++ b/flink-libraries/flink-ml/src/test/Imputer/columnwiseTest.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,7 +18,6 @@ package Imputer; import static org.junit.Assert.*; - import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.ml.math.DenseVector; @@ -39,21 +38,14 @@ public class columnwiseTest { @Test public void testMEAN() throws Exception { DataSet dsMean = ds; - -// DenseVector testvec1e= new DenseVector(new double[]{4.0,3.0,1.0, 3.0}); -// DenseVector testvec2e= new DenseVector(new double[]{1.0,7.0,6.0, 1.0}); DenseVector testvec3e= new DenseVector(new double[]{0.0,5.0,(5.0+2.0)/3, 2.0}); DenseVector testvec4e= new DenseVector(new double[]{6.5,2.5,0.5, 0.5}); -// DenseVector testvec5e= new DenseVector(new double[]{6.5,1.0,0.5, 0.5}); - - DataSet dsExpected = env.fromElements( testvec3e, testvec4e); try { dsMean = Imputer.impute(ds, Strategy.MEAN, 0); } catch (Exception e) { fail("MEAN could not be calculated"); - } boolean fails=false; for(DenseVector v: dsExpected.collect()){ @@ -74,18 +66,13 @@ public void testMEAN() throws Exception { @Test public void testMEDIAN() throws Exception { DataSet dsMedian = ds; - DenseVector testvec3e= new DenseVector(new double[]{0.0,5.0,2.0, 2.0}); DenseVector testvec4e= new DenseVector(new double[]{6.5,0.5,0.5, 0.5}); - - DataSet dsExpected = env.fromElements( testvec3e, testvec4e); - try { dsMedian = Imputer.impute(ds, Strategy.MEDIAN, 0); } catch (Exception e) { fail("MEDIAN could not be calculated"); - } boolean fails=false; for(DenseVector v: dsExpected.collect()){ @@ -93,31 +80,23 @@ public void testMEDIAN() throws Exception { fails=true; }; } - if(fails){ fail("MEDIAN could not be calculated columnwise"); System.out.println("dsmedian: "); dsMedian.print(); System.out.println("dsexpected: " );dsExpected.print(); } } - - @Test public void testMOSTFREQUENT() throws Exception { DataSet dsMost = env.fromElements( testvec2, testvec1, testvec4);; - DenseVector testvec2e= new DenseVector(new double[]{1.0,7.0,1.0, 1.0}); DenseVector testvec1e= new DenseVector(new double[]{3.0,3.0,1.0, 3.0}); DenseVector testvec4e= new DenseVector(new double[]{6.5,0.5,0.5, 0.5}); - - DataSet dsExpected = env.fromElements( testvec2e, testvec1e, testvec4e); - try { dsMost = Imputer.impute(dsMost, Strategy.MOST_FREQUENT, 0); } catch (Exception e) { fail("MOSTFREQUENT could not be calculated"); - } boolean fails=false; for(DenseVector v: dsExpected.collect()){ @@ -125,13 +104,10 @@ public void testMOSTFREQUENT() throws Exception { fails=true; }; } - if(fails){ System.out.println("dsmostfrequent: "); dsMost.print(); System.out.println("dsexpected: " );dsExpected.print(); fail("MOSTFREQUENT could not be calculated columnwise"); } } - - } diff --git a/flink-libraries/flink-ml/src/test/Imputer/rowwiseTest.java b/flink-libraries/flink-ml/src/test/Imputer/rowwiseTest.java index 490b59c91d501..72ee981d95031 100644 --- a/flink-libraries/flink-ml/src/test/Imputer/rowwiseTest.java +++ b/flink-libraries/flink-ml/src/test/Imputer/rowwiseTest.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information