Skip to content

Commit

Permalink
Added threads on the fit and transform of FeatureSelection methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
datumbox committed Jan 10, 2016
1 parent 6caf73d commit d0243ca
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 83 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Expand Up @@ -59,7 +59,7 @@ Version 0.7.0-SNAPSHOT - Build 20160110
- On Dataframe we now can set records on keys that do not already exist.
- The NgramsExtractor was rewritten to remove its internal state.
- Added a concurrency package on common with useful helper classes.
- Added threads on the Builder of Dataframes and on the DataTransformation algorithms (both fit and transform).
- Added threads on the Builder of Dataframes, on the DataTransformation and FeatureSelection algorithms (both fit and transform).
- Converted the big test Datasets to files stored as resources in the project. This includes all the files that we downloaded from the web.

Version 0.6.1 - Build 20160102
Expand Down
2 changes: 1 addition & 1 deletion TODO.txt
Expand Up @@ -2,7 +2,7 @@ CODE IMPROVEMENTS
=================

- Profiling & benchmarking.
- Add multithreading support on the training methods of the models and on some parts of FeatureSelection.
- Add multithreading support on the training methods of the models.

- Update all maven plugins and dependencies to their latest versions.
- Add support for MapDB 2.0 once a stable version is released.
Expand Down
18 changes: 10 additions & 8 deletions src/main/java/com/datumbox/common/dataobjects/Dataframe.java
Expand Up @@ -334,7 +334,6 @@ public Object[] toArray() {
return array;
}


/** {@inheritDoc} */
@Override
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -564,7 +563,7 @@ public FlatDataList getYColumn() {

/**
* Removes completely a list of columns from the dataset. The meta-data of
* the Dataframe are updated.
* the Dataframe are updated. The method internally uses threads.
*
* @param columnSet
*/
Expand All @@ -578,18 +577,21 @@ public void dropXColumns(Set<Object> columnSet) {
//remove all the columns from the Meta data
xDataTypes.keySet().removeAll(columnSet);

for(Map.Entry<Integer, Record> e : entries()) {
StreamMethods.stream(entries(), true).forEach(e -> {
Integer rId = e.getKey();
Record r = e.getValue();

AssociativeArray xData = r.getX().copy();
int d = xData.size();
xData.keySet().removeAll(columnSet);
boolean modified = xData.keySet().removeAll(columnSet);

if(xData.size()!=d) {
_unsafe_set(rId, new Record(xData, r.getY(), r.getYPredicted(), r.getYPredictedProbabilities()));
if(modified) {
Record newR = new Record(xData, r.getY(), r.getYPredicted(), r.getYPredictedProbabilities());

synchronized(this) {
_unsafe_set(rId, newR); //safe to call in this context. we already updated the meta when we modified the xDataTypes
}
}
}
});

}

Expand Down
Expand Up @@ -46,7 +46,7 @@ public static class ModelParameters extends AbstractTransformer.AbstractModelPar
/**
* The reference levels of each categorical variable.
*/
@BigMap(mapType=MapType.HASHMAP, storageHint=StorageHint.IN_MEMORY, concurrent=false)
@BigMap(mapType=MapType.HASHMAP, storageHint=StorageHint.IN_MEMORY, concurrent=true)
private Map<Object, Object> referenceLevels;

/**
Expand Down Expand Up @@ -379,19 +379,15 @@ protected static void denormalizeY(Dataframe data, Map<Object, Double> minColumn
*/
protected static void fitDummy(Dataframe data, Map<Object, Object> referenceLevels) {
Map<Object, TypeInference.DataType> columnTypes = data.getXDataTypes();

//find the referenceLevels for each categorical variable
StreamMethods.stream(data, true).forEachOrdered(r -> {
for(Map.Entry<Object, Object> entry: r.getX().entrySet()) {
Object column = entry.getKey();
if(covert2dummy(columnTypes.get(column))) { //only ordinal and categorical are converted into dummyvars
if(referenceLevels.containsKey(column)==false) {
synchronized(referenceLevels) {
if(referenceLevels.containsKey(column)==false) {
referenceLevels.put(column, entry.getValue());
}
}
}
if(covert2dummy(columnTypes.get(column))) {
//Note: The referenceLevels Map is an implementation of ConcurrentHashMap.
//Thus we don't need a synchronized() block.
referenceLevels.putIfAbsent(column, entry.getValue());
}
}
});
Expand Down Expand Up @@ -451,7 +447,8 @@ protected static void transformDummy(Dataframe data, Map<Object, Object> referen
}

/**
* Checks whether the variable should be converted into dummy (boolean).
* Checks whether the variable should be converted into dummy (boolean). Only
* categorical and ordinal values are converted.
*
* @param columnType
* @return
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package com.datumbox.framework.machinelearning.featureselection.categorical;

import com.datumbox.common.concurrency.StreamMethods;
import com.datumbox.framework.machinelearning.common.abstracts.featureselectors.AbstractCategoricalFeatureSelector;
import com.datumbox.common.dataobjects.AssociativeArray;
import com.datumbox.common.dataobjects.DataTable2D;
Expand Down Expand Up @@ -103,20 +104,22 @@ protected void estimateFeatureScores(Map<Object, Integer> classCounts, Map<List<

Map<Object, Double> featureScores = modelParameters.getFeatureScores();

DataTable2D contingencyTable = new DataTable2D();
contingencyTable.put(0, new AssociativeArray());
contingencyTable.put(1, new AssociativeArray());

double criticalValue = ContinuousDistributions.chisquareInverseCdf(trainingParameters.getALevel(), 1); //one degree of freedom because the tables below are 2x2


double N = modelParameters.getN();
for(Map.Entry<Object, Double> featureCount : featureCounts.entrySet()) {

StreamMethods.stream(featureCounts.entrySet(), true).forEach(featureCount -> {
Object feature = featureCount.getKey();
double N1_ = featureCount.getValue(); //calculate the N1. (number of records that has the feature)
double N0_ = N - N1_; //also the N0. (number of records that DONT have the feature)

for(Map.Entry<Object, Integer> classCount : classCounts.entrySet()) {
double bestScore = Double.NEGATIVE_INFINITY; //REMEMBER! larger scores means more important feature.

DataTable2D contingencyTable = new DataTable2D();
contingencyTable.put(0, new AssociativeArray());
contingencyTable.put(1, new AssociativeArray());

for (Map.Entry<Object, Integer> classCount : classCounts.entrySet()) {
Object theClass = classCount.getKey();

Integer featureClassC = featureClassCounts.get(Arrays.<Object>asList(feature, theClass));
Expand All @@ -132,15 +135,21 @@ protected void estimateFeatureScores(Map<Object, Integer> classCounts, Map<List<
contingencyTable.get(1).put(1, N11);

double scorevalue = Chisquare.getScoreValue(contingencyTable);
if(scorevalue>=criticalValue) { //if the score is larger than the critical value, then select the feature
Double previousCriticalValue = featureScores.get(feature);
if(previousCriticalValue==null || previousCriticalValue<scorevalue) { //add or update score
featureScores.put(feature, scorevalue);
}
//contingencyTable = null;

if(scorevalue>bestScore) {
bestScore = scorevalue;
}
}
}
//contingencyTable = null;

//REMEMBER! larger scores means more important keywords.
if (bestScore>=criticalValue) {
//if the score is larger than the critical value, then select the feature
synchronized(featureScores) {
featureScores.put(feature, bestScore);
}
}
});

Integer maxFeatures = trainingParameters.getMaxFeatures();
if(maxFeatures!=null && maxFeatures<featureScores.size()) {
Expand Down
Expand Up @@ -15,10 +15,10 @@
*/
package com.datumbox.framework.machinelearning.featureselection.categorical;

import com.datumbox.common.concurrency.StreamMethods;
import com.datumbox.common.persistentstorage.interfaces.DatabaseConfiguration;
import com.datumbox.common.persistentstorage.interfaces.DatabaseConnector;
import com.datumbox.framework.machinelearning.common.abstracts.featureselectors.AbstractCategoricalFeatureSelector;
import com.datumbox.common.utilities.PHPMethods;
import com.datumbox.framework.machinelearning.common.abstracts.featureselectors.AbstractScoreBasedFeatureSelector;

import java.util.Arrays;
Expand Down Expand Up @@ -76,11 +76,16 @@ protected void estimateFeatureScores(Map<Object, Integer> classCounts, Map<List<
Map<Object, Double> featureScores = modelParameters.getFeatureScores();

double N = modelParameters.getN();
for(Map.Entry<Object, Double> featureCount : featureCounts.entrySet()) {

final double log2 = Math.log(2.0);

StreamMethods.stream(featureCounts.entrySet(), true).forEach(featureCount -> {
Object feature = featureCount.getKey();
double N1_ = featureCount.getValue(); //calculate the N1. (number of records that has the feature)
double N0_ = N - N1_; //also the N0. (number of records that DONT have the feature)

double bestScore = Double.NEGATIVE_INFINITY; //REMEMBER! larger scores means more important feature.

for(Map.Entry<Object, Integer> classCount : classCounts.entrySet()) {
Object theClass = classCount.getKey();

Expand All @@ -96,29 +101,29 @@ protected void estimateFeatureScores(Map<Object, Integer> classCounts, Map<List<

//calculate Mutual Information
//Note we calculate it partially because if one of the N.. is zero the log will not be defined and it will return NAN.
double MI=0.0;
double scorevalue=0.0;
if(N11>0.0) {
MI+=(N11/N)*PHPMethods.log((N/N1_)*(N11/N_1),2.0);
scorevalue+=(N11/N)*Math.log((N/N1_)*(N11/N_1))/log2;
}
if(N01>0.0) {
MI+=(N01/N)*PHPMethods.log((N/N0_)*(N01/N_1),2.0);
scorevalue+=(N01/N)*Math.log((N/N0_)*(N01/N_1))/log2;
}
if(N10>0.0) {
MI+=(N10/N)*PHPMethods.log((N/N1_)*(N10/N_0),2.0);
scorevalue+=(N10/N)*Math.log((N/N1_)*(N10/N_0))/log2;
}
if(N00>0.0) {
MI+=(N00/N)*PHPMethods.log((N/N0_)*(N00/N_0),2.0);
scorevalue+=(N00/N)*Math.log((N/N0_)*(N00/N_0))/log2;
}


//REMEMBER! larger scores means more important keywords.
Double previousMI = featureScores.get(feature);
if(previousMI==null || previousMI<MI) { //add or update score
featureScores.put(feature, MI);
if(scorevalue>bestScore) {
bestScore = scorevalue;
}

}
}

synchronized(featureScores) {
featureScores.put(feature, bestScore);
}
});

Integer maxFeatures = trainingParameters.getMaxFeatures();
if(maxFeatures!=null && maxFeatures<featureScores.size()) {
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package com.datumbox.framework.machinelearning.featureselection.continuous;

import com.datumbox.common.concurrency.StreamMethods;
import com.datumbox.common.dataobjects.AssociativeArray;
import com.datumbox.framework.machinelearning.common.abstracts.featureselectors.AbstractContinuousFeatureSelector;
import com.datumbox.common.dataobjects.Dataframe;
Expand Down Expand Up @@ -364,15 +365,14 @@ protected void filterFeatures(Dataframe newData) {

Map<Integer, Integer> recordIdsReference = new HashMap<>();
MatrixDataframe matrixDataset = MatrixDataframe.parseDataset(newData, recordIdsReference, featureIds);
RealMatrix X = matrixDataset.getX();

RealMatrix components = new BlockRealMatrix(modelParameters.getComponents());


//multiplying the data with components
X = X.multiply(components);
final RealMatrix X = matrixDataset.getX().multiply(components);

for(Map.Entry<Integer, Record> e : newData.entries()) {
StreamMethods.stream(newData.entries(), true).forEach(e -> {
Integer rId = e.getKey();
Record r = e.getValue();
int rowId = recordIdsReference.get(rId);
Expand All @@ -383,14 +383,18 @@ protected void filterFeatures(Dataframe newData) {
xData.put(componentId, value);
++componentId;
}

newData._unsafe_set(rId, new Record(xData, r.getY(), r.getYPredicted(), r.getYPredictedProbabilities()));
}

Record newR = new Record(xData, r.getY(), r.getYPredicted(), r.getYPredictedProbabilities());

synchronized(newData) {
newData._unsafe_set(rId, newR); //we call below the recalculateMeta()
}
});

//recordIdsReference = null;
//matrixDataset = null;

newData.recalculateMeta(); //call the recalculate because we used _unsafe_set()
newData.recalculateMeta();
}

}
Expand Up @@ -15,6 +15,7 @@
*/
package com.datumbox.framework.machinelearning.featureselection.scorebased;

import com.datumbox.common.concurrency.StreamMethods;
import com.datumbox.common.dataobjects.Dataframe;
import com.datumbox.common.dataobjects.Record;
import com.datumbox.common.persistentstorage.interfaces.DatabaseConnector;
Expand All @@ -26,6 +27,7 @@

import com.datumbox.framework.machinelearning.common.abstracts.featureselectors.AbstractScoreBasedFeatureSelector;
import java.util.Map;
import java.util.function.BiFunction;


/**
Expand Down Expand Up @@ -176,46 +178,58 @@ protected void _fit(Dataframe trainingData) {
}


Map<Object, Double> maxTFIDFfeatureScores = modelParameters.getMaxTFIDFfeatureScores();
final Map<Object, Double> maxFeatureScores = modelParameters.getMaxTFIDFfeatureScores();

//this lambda checks if the new score is larger than the current max score of the feature
BiFunction<Object, Double, Boolean> isGreaterThanMax = (feature, newScore) -> {
Double maxScore = maxFeatureScores.get(feature);
return maxScore==null || maxScore<newScore;
};

//calculate the maximum tfidf scores
for(Record r : trainingData) {
StreamMethods.stream(trainingData, true).forEach(r -> {
//calculate the tfidf scores
for(Map.Entry<Object, Object> entry : r.getX().entrySet()) {
Object keyword = entry.getKey();
Double counts = TypeInference.toDouble(entry.getValue());

if(counts==null || counts == 0.0) {
continue;
}

if(binarized) {
counts = 1.0;
}

//double tf = counts/documentLength;
double tf = counts;
double idf = tmp_idfMap.get(keyword);

double tfidf = tf*idf;

if(tfidf==0.0) {
continue; //ignore 0 scored features
}

//store the maximum value of the tfidf
Double maxTfidf = maxTFIDFfeatureScores.get(keyword);
if(maxTfidf==null || maxTfidf<tfidf) {
maxTFIDFfeatureScores.put(keyword, tfidf);
if(counts != null && counts > 0.0) {

if(binarized) {
counts = 1.0;
}

//double tf = counts/documentLength;
double tf = counts;
double idf = tmp_idfMap.get(keyword);

double tfidf = tf*idf;

if(tfidf > 0.0) { //ignore 0 scored features

//Threads will help here under the assumption that only
//a small number of records will have features that exceed
//the maximum score. Thus they will stop in this if statement
//and they will not go into the synced block.
if(isGreaterThanMax.apply(keyword, tfidf)) {
synchronized(maxFeatureScores) {
if(isGreaterThanMax.apply(keyword, tfidf)) {
maxFeatureScores.put(keyword, tfidf);
}
}
}

}
}
}
}
});

//Drop the temporary Collection
dbc.dropBigMap("tmp_idf", tmp_idfMap);

Integer maxFeatures = trainingParameters.getMaxFeatures();
if(maxFeatures!=null && maxFeatures<maxTFIDFfeatureScores.size()) {
AbstractScoreBasedFeatureSelector.selectHighScoreFeatures(maxTFIDFfeatureScores, maxFeatures);
if(maxFeatures!=null && maxFeatures<maxFeatureScores.size()) {
AbstractScoreBasedFeatureSelector.selectHighScoreFeatures(maxFeatureScores, maxFeatures);
}
}

Expand Down

0 comments on commit d0243ca

Please sign in to comment.