Skip to content

Commit

Permalink
Simplified and refactored the logic of AbstractFeatureSelector and Ab…
Browse files Browse the repository at this point in the history
…stractCategoricalFeatureSelector.
  • Loading branch information
datumbox committed Dec 30, 2016
1 parent 3813e44 commit ef1933b
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 181 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -68,6 +68,7 @@ Version 0.8.0-SNAPSHOT - Build 20161230
- Removed unnecessary dimension variable from the MultinomialDPMM.Cluster class.
- Flattening the featureselection package.
- Removed unnecessary Score-Based and Numerical abstract feature selectors.
- Simplified and refactored the logic of AbstractFeatureSelector and AbstractCategoricalFeatureSelector.

Version 0.7.0 - Build 20160319
------------------------------
Expand Down
2 changes: 1 addition & 1 deletion TODO.txt
Expand Up @@ -3,7 +3,7 @@ CODE IMPROVEMENTS

- Rewrite the FeatureSelection package:
- Improve the API of Feature Selection and how we handle different data types.
- Refactor AbstractCategoricalFeatureSelector and simplify the method calls.
- Convert AbstractCategoricalFeatureSelector to Boolean.
- In AbstractCategoricalFeatureSelector and TFIDF we should keep track of the kept columns not the removedColumns.
- Consider dropping all the common.dataobjects and use their internalData directly instead.
- Refactor the statistics package and replace all the static methods with proper inheritance.
Expand Down
Expand Up @@ -25,8 +25,6 @@
import com.datumbox.framework.common.storageengines.interfaces.StorageEngine.MapType;
import com.datumbox.framework.common.storageengines.interfaces.StorageEngine.StorageHint;
import com.datumbox.framework.core.machinelearning.common.abstracts.AbstractTrainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

Expand Down Expand Up @@ -145,62 +143,16 @@ protected AbstractCategoricalFeatureSelector(String storageName, Configuration c
@Override
protected void _fit(Dataframe trainingData) {
StorageEngine storageEngine = knowledgeBase.getStorageEngine();
TP trainingParameters = knowledgeBase.getTrainingParameters();
MP modelParameters = knowledgeBase.getModelParameters();

Map<Object, Integer> tmp_classCounts = new HashMap<>(); //map which stores the counts of the classes
Map<List<Object>, Integer> tmp_featureClassCounts = storageEngine.getBigMap("tmp_featureClassCounts", (Class<List<Object>>)(Class<?>)List.class, Integer.class, MapType.HASHMAP, StorageHint.IN_MEMORY, false, true); //map which stores the counts of feature-class combinations.
Map<Object, Double> tmp_featureCounts = storageEngine.getBigMap("tmp_featureCounts", Object.class, Double.class, MapType.HASHMAP, StorageHint.IN_MEMORY, false, true); //map which stores the counts of the features


//build the maps with the feature statistics and counts
buildFeatureStatistics(trainingData, tmp_classCounts, tmp_featureClassCounts, tmp_featureCounts);


//call the overriden method to get the scores of the features.
//WARNING: do not use feature scores for any weighting. Sometimes the features are selected based on a minimum and others on a maximum criterion.
estimateFeatureScores(trainingData.size(), tmp_classCounts, tmp_featureClassCounts, tmp_featureCounts);


//drop the unnecessary stastistics tables
storageEngine.dropBigMap("tmp_featureClassCounts", tmp_featureClassCounts);
storageEngine.dropBigMap("tmp_featureCounts", tmp_featureCounts);
}

/** {@inheritDoc} */
@Override
protected void _transform(Dataframe newdata) {
//now filter the data by removing all the features that are not selected
filterData(newdata, knowledgeBase.getModelParameters().getFeatureScores());
}

private void filterData(Dataframe data, Map<Object, Double> featureScores) {
logger.debug("filterData()");
StorageEngine storageEngine = knowledgeBase.getStorageEngine();
Map<Object, Boolean> tmp_removedColumns = storageEngine.getBigMap("tmp_removedColumns", Object.class, Boolean.class, MapType.HASHMAP, StorageHint.IN_MEMORY, false, true);

for(Map.Entry<Object, DataType> entry: data.getXDataTypes().entrySet()) {
Object feature = entry.getKey();

if(!featureScores.containsKey(feature)) {
tmp_removedColumns.put(feature, true);
}
}

logger.debug("Removing Columns");
data.dropXColumns(tmp_removedColumns.keySet());

//Drop the temporary Collection
storageEngine.dropBigMap("tmp_removedColumns", tmp_removedColumns);
}

private void buildFeatureStatistics(Dataframe data, Map<Object, Integer> classCounts, Map<List<Object>, Integer> featureClassCounts, Map<Object, Double> featureCounts) {
logger.debug("buildFeatureStatistics()");

TP trainingParameters = knowledgeBase.getTrainingParameters();
Integer rareFeatureThreshold = trainingParameters.getRareFeatureThreshold();

//find the featureCounts
logger.debug("Estimating featureCounts");
for(Record r : data) {
for(Record r : trainingData) {
for(Map.Entry<Object, Object> entry : r.getX().entrySet()) {
Object feature = entry.getKey();

Expand All @@ -210,67 +162,100 @@ private void buildFeatureStatistics(Dataframe data, Map<Object, Integer> classCo
}

//feature counts
double featureCounter = featureCounts.getOrDefault(feature, 0.0);
featureCounts.put(feature, ++featureCounter);
double featureCounter = tmp_featureCounts.getOrDefault(feature, 0.0);
tmp_featureCounts.put(feature, ++featureCounter);

}
}

//remove rare features
Integer rareFeatureThreshold = trainingParameters.getRareFeatureThreshold();
if(rareFeatureThreshold != null && rareFeatureThreshold>0) {
logger.debug("Removing rare features");
//remove features from the featureCounts list
Iterator<Map.Entry<Object, Double>> it = featureCounts.entrySet().iterator();
Iterator<Map.Entry<Object, Double>> it = tmp_featureCounts.entrySet().iterator();
while(it.hasNext()) {
Map.Entry<Object, Double> entry = it.next();
if(entry.getValue()<=rareFeatureThreshold) {
it.remove();
}
}

//then remove the features in dataset that do not appear in the list
filterData(data, featureCounts);
}

//now find the classCounts and the featureClassCounts
logger.debug("Estimating classCounts and featureClassCounts");
for(Record r : data) {
for(Record r : trainingData) {
Object theClass = r.getY();

//class counts
int classCounter = classCounts.getOrDefault(theClass, 0);
classCounts.put(theClass, ++classCounter);
int classCounter = tmp_classCounts.getOrDefault(theClass, 0);
tmp_classCounts.put(theClass, ++classCounter);


for(Map.Entry<Object, Object> entry : r.getX().entrySet()) {
Object feature = entry.getKey();

Double value = TypeInference.toDouble(entry.getValue());
if(value==null || value==0.0) {
continue;
}



//featureClass counts
List<Object> featureClassTuple = Arrays.asList(feature, theClass);
Integer featureClassCounter = featureClassCounts.getOrDefault(featureClassTuple, 0);
featureClassCounts.put(featureClassTuple, ++featureClassCounter);
Integer featureClassCounter = tmp_featureClassCounts.getOrDefault(featureClassTuple, 0);
tmp_featureClassCounts.put(featureClassTuple, ++featureClassCounter);
}
}


}
//call the overriden method to get the scores of the features.
final Map<Object, Double> featureScores = modelParameters.getFeatureScores();
estimateFeatureScores(featureScores, trainingData.size(), tmp_classCounts, tmp_featureClassCounts, tmp_featureCounts);


//drop the unnecessary stastistics tables
tmp_classCounts.clear();
storageEngine.dropBigMap("tmp_featureClassCounts", tmp_featureClassCounts);
storageEngine.dropBigMap("tmp_featureCounts", tmp_featureCounts);


//keep only the top features
Integer maxFeatures = trainingParameters.getMaxFeatures();
if(maxFeatures!=null && maxFeatures<featureScores.size()) {
selectTopFeatures(featureScores, maxFeatures);
}
}

/** {@inheritDoc} */
@Override
protected void _transform(Dataframe newdata) {
StorageEngine storageEngine = knowledgeBase.getStorageEngine();
Map<Object, Double> featureScores = knowledgeBase.getModelParameters().getFeatureScores();
Map<Object, Boolean> tmp_removedColumns = storageEngine.getBigMap("tmp_removedColumns", Object.class, Boolean.class, MapType.HASHMAP, StorageHint.IN_MEMORY, false, true);

for(Map.Entry<Object, DataType> entry: newdata.getXDataTypes().entrySet()) {
Object feature = entry.getKey();

if(!featureScores.containsKey(feature)) {
tmp_removedColumns.put(feature, true);
}
}

logger.debug("Removing Columns");
newdata.dropXColumns(tmp_removedColumns.keySet());

//Drop the temporary Collection
storageEngine.dropBigMap("tmp_removedColumns", tmp_removedColumns);
}

/**
* Abstract method which is responsible for estimating the score of each
* Feature.
* Abstract method which is responsible for estimating the score of each Feature.
*
* @param featureScores
* @param N
* @param classCounts
* @param featureClassCounts
* @param featureCounts
*/
protected abstract void estimateFeatureScores(int N, Map<Object, Integer> classCounts, Map<List<Object>, Integer> featureClassCounts, Map<Object, Double> featureCounts);
protected abstract void estimateFeatureScores(Map<Object, Double> featureScores, int N, Map<Object, Integer> classCounts, Map<List<Object>, Integer> featureClassCounts, Map<Object, Double> featureCounts);
}
Expand Up @@ -16,9 +16,11 @@
package com.datumbox.framework.core.machinelearning.common.abstracts.featureselectors;

import com.datumbox.framework.common.Configuration;
import com.datumbox.framework.common.concurrency.ForkJoinStream;
import com.datumbox.framework.common.dataobjects.Dataframe;
import com.datumbox.framework.common.utilities.SelectKth;
import com.datumbox.framework.core.machinelearning.common.abstracts.AbstractTrainer;
import com.datumbox.framework.core.machinelearning.common.interfaces.Parallelizable;

import java.util.Iterator;
import java.util.Map;
Expand All @@ -30,7 +32,7 @@
* @param <MP>
* @param <TP>
*/
public abstract class AbstractFeatureSelector<MP extends AbstractFeatureSelector.AbstractModelParameters, TP extends AbstractFeatureSelector.AbstractTrainingParameters> extends AbstractTrainer<MP, TP> {
public abstract class AbstractFeatureSelector<MP extends AbstractFeatureSelector.AbstractModelParameters, TP extends AbstractFeatureSelector.AbstractTrainingParameters> extends AbstractTrainer<MP, TP> implements Parallelizable {

/**
* @param trainingParameters
Expand All @@ -39,6 +41,7 @@ public abstract class AbstractFeatureSelector<MP extends AbstractFeatureSelector
*/
protected AbstractFeatureSelector(TP trainingParameters, Configuration configuration) {
super(trainingParameters, configuration);
streamExecutor = new ForkJoinStream(knowledgeBase.getConfiguration().getConcurrencyConfiguration());
}

/**
Expand All @@ -48,8 +51,28 @@ protected AbstractFeatureSelector(TP trainingParameters, Configuration configura
*/
protected AbstractFeatureSelector(String storageName, Configuration configuration) {
super(storageName, configuration);
streamExecutor = new ForkJoinStream(knowledgeBase.getConfiguration().getConcurrencyConfiguration());
}

private boolean parallelized = true;

/**
* This executor is used for the parallel processing of streams with custom
* Thread pool.
*/
protected final ForkJoinStream streamExecutor;

/** {@inheritDoc} */
@Override
public boolean isParallelized() {
return parallelized;
}

/** {@inheritDoc} */
@Override
public void setParallelized(boolean parallelized) {
this.parallelized = parallelized;
}

/**
* Fits and transforms the data of the provided dataset.
Expand Down Expand Up @@ -87,8 +110,8 @@ public void transform(Dataframe newData) {
* @param featureScores
* @param maxFeatures
*/
protected void selectHighScoreFeatures(Map<Object, Double> featureScores, Integer maxFeatures) {
logger.debug("selectHighScoreFeatures()");
protected void selectTopFeatures(Map<Object, Double> featureScores, Integer maxFeatures) {
logger.debug("selectTopFeatures()");

logger.debug("Estimating the minPermittedScore");
Double minPermittedScore = SelectKth.largest(featureScores.values().iterator(), maxFeatures);
Expand Down
Expand Up @@ -16,14 +16,12 @@
package com.datumbox.framework.core.machinelearning.featureselection;

import com.datumbox.framework.common.Configuration;
import com.datumbox.framework.common.concurrency.ForkJoinStream;
import com.datumbox.framework.common.concurrency.StreamMethods;
import com.datumbox.framework.common.dataobjects.AssociativeArray;
import com.datumbox.framework.common.dataobjects.DataTable2D;
import com.datumbox.framework.common.storageengines.interfaces.StorageEngine;
import com.datumbox.framework.core.machinelearning.common.abstracts.AbstractTrainer;
import com.datumbox.framework.core.machinelearning.common.abstracts.featureselectors.AbstractCategoricalFeatureSelector;
import com.datumbox.framework.core.machinelearning.common.interfaces.Parallelizable;
import com.datumbox.framework.core.statistics.distributions.ContinuousDistributions;
import com.datumbox.framework.core.statistics.nonparametrics.independentsamples.Chisquare;

Expand All @@ -40,7 +38,7 @@
*
* @author Vasilis Vryniotis <bbriniotis@datumbox.com>
*/
public class ChisquareSelect extends AbstractCategoricalFeatureSelector<ChisquareSelect.ModelParameters, ChisquareSelect.TrainingParameters> implements Parallelizable {
public class ChisquareSelect extends AbstractCategoricalFeatureSelector<ChisquareSelect.ModelParameters, ChisquareSelect.TrainingParameters> {

/** {@inheritDoc} */
public static class ModelParameters extends AbstractCategoricalFeatureSelector.AbstractModelParameters {
Expand Down Expand Up @@ -95,7 +93,6 @@ public void setALevel(double aLevel) {
*/
protected ChisquareSelect(TrainingParameters trainingParameters, Configuration configuration) {
super(trainingParameters, configuration);
streamExecutor = new ForkJoinStream(knowledgeBase.getConfiguration().getConcurrencyConfiguration());
}

/**
Expand All @@ -105,39 +102,14 @@ protected ChisquareSelect(TrainingParameters trainingParameters, Configuration c
*/
protected ChisquareSelect(String storageName, Configuration configuration) {
super(storageName, configuration);
streamExecutor = new ForkJoinStream(knowledgeBase.getConfiguration().getConcurrencyConfiguration());
}

private boolean parallelized = true;

/**
* This executor is used for the parallel processing of streams with custom
* Thread pool.
*/
protected final ForkJoinStream streamExecutor;

/** {@inheritDoc} */
@Override
public boolean isParallelized() {
return parallelized;
}

/** {@inheritDoc} */
@Override
public void setParallelized(boolean parallelized) {
this.parallelized = parallelized;
}

/** {@inheritDoc} */
@Override
protected void estimateFeatureScores(int N, Map<Object, Integer> classCounts, Map<List<Object>, Integer> featureClassCounts, Map<Object, Double> featureCounts) {
protected void estimateFeatureScores(Map<Object, Double> featureScores, int N, Map<Object, Integer> classCounts, Map<List<Object>, Integer> featureClassCounts, Map<Object, Double> featureCounts) {
logger.debug("estimateFeatureScores()");
ModelParameters modelParameters = knowledgeBase.getModelParameters();
TrainingParameters trainingParameters = knowledgeBase.getTrainingParameters();

Map<Object, Double> featureScores = modelParameters.getFeatureScores();

double criticalValue = ContinuousDistributions.chisquareInverseCdf(trainingParameters.getALevel(), 1); //one degree of freedom because the tables below are 2x2

double criticalValue = ContinuousDistributions.chisquareInverseCdf(knowledgeBase.getTrainingParameters().getALevel(), 1); //one degree of freedom because the tables below are 2x2

streamExecutor.forEach(StreamMethods.stream(featureCounts.entrySet().stream(), isParallelized()), featureCount -> {
Object feature = featureCount.getKey();
Expand Down Expand Up @@ -176,12 +148,7 @@ protected void estimateFeatureScores(int N, Map<Object, Integer> classCounts, Ma
if (bestScore>=criticalValue) { //if the score is larger than the critical value, then select the feature
featureScores.put(feature, bestScore); //This Map is concurrent and there are no overlaping keys between threads
}
});

Integer maxFeatures = trainingParameters.getMaxFeatures();
if(maxFeatures!=null && maxFeatures<featureScores.size()) {
selectHighScoreFeatures(featureScores, maxFeatures);
}
});
}

}

0 comments on commit ef1933b

Please sign in to comment.