Skip to content

Commit

Permalink
Fixed the limitation on the clustering algorithms which forced us to …
Browse files Browse the repository at this point in the history
…store the clusters in memory. Forced the headerDataTypes parameter of Dataframe.Builder.parseCSVFile() to be a LinkedHashMap.
  • Loading branch information
datumbox committed Jan 13, 2016
1 parent c1bd7c0 commit c3f1361
Show file tree
Hide file tree
Showing 17 changed files with 438 additions and 399 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -68,6 +68,8 @@ Version 0.7.0-SNAPSHOT - Build 20160113
- Removed the DOUBLE_ACCURACY_LOW property from the tests since it is no longer used and increased the accuracy of DOUBLE_ACCURACY_MEDIUM. - Removed the DOUBLE_ACCURACY_LOW property from the tests since it is no longer used and increased the accuracy of DOUBLE_ACCURACY_MEDIUM.
- Added a skip/limit option on the Dataframe.Builder.parseCSVFile(). - Added a skip/limit option on the Dataframe.Builder.parseCSVFile().
- Added multithreading support on the training for many ML models. - Added multithreading support on the training for many ML models.
- Fixed the limitation on the clustering algorithms which forced us to store the clusters in memory.
- Forced the headerDataTypes parameter of Dataframe.Builder.parseCSVFile() to be a LinkedHashMap.


Version 0.6.1 - Build 20160102 Version 0.6.1 - Build 20160102
------------------------------ ------------------------------
Expand Down
1 change: 1 addition & 0 deletions TODO.txt
Expand Up @@ -23,6 +23,7 @@ NEW ALGORITHMS
============== ==============


- Rewrite PCA to avoid using RealMatrix. - Rewrite PCA to avoid using RealMatrix.
- Speed up LDA: http://www.cs.ucsb.edu/~mingjia/cs240/doc/273811.pdf
- Add regularization in the currenlty supported algorithms. - Add regularization in the currenlty supported algorithms.
- Write a Mixture of Gaussians clustering method. - Write a Mixture of Gaussians clustering method.
- Develop the FunkSVD and PLSI as probabilistic version of SVD. - Develop the FunkSVD and PLSI as probabilistic version of SVD.
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/datumbox/common/dataobjects/Dataframe.java
Expand Up @@ -37,6 +37,7 @@
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -151,7 +152,7 @@ public static Dataframe parseTextFiles(Map<Object, URI> textFilesMap, AbstractTe
* @param dbConf * @param dbConf
* @return * @return
*/ */
public static Dataframe parseCSVFile(Reader reader, String yVariable, Map<String, TypeInference.DataType> headerDataTypes, public static Dataframe parseCSVFile(Reader reader, String yVariable, LinkedHashMap<String, TypeInference.DataType> headerDataTypes,
char delimiter, char quote, String recordSeparator, Long skip, Long limit, DatabaseConfiguration dbConf) { char delimiter, char quote, String recordSeparator, Long skip, Long limit, DatabaseConfiguration dbConf) {
Logger logger = LoggerFactory.getLogger(Dataframe.Builder.class); Logger logger = LoggerFactory.getLogger(Dataframe.Builder.class);


Expand Down
Expand Up @@ -73,12 +73,12 @@ public static class Cluster extends AbstractDPMM.AbstractCluster {
private int meanDf; private int meanDf;


//internal vars for calculation //internal vars for calculation
private transient RealVector xi_sum; private RealVector xi_sum;
private transient RealMatrix xi_square_sum; private RealMatrix xi_square_sum;


//Cache //Cache
private transient volatile Double cache_covariance_determinant; //Cached value of Covariance determinant used only for speed optimization private volatile Double cache_covariance_determinant;
private transient volatile RealMatrix cache_covariance_inverse; //Cached value of Inverse Covariance used only for speed optimization private volatile RealMatrix cache_covariance_inverse;


/** /**
* @param clusterId * @param clusterId
Expand Down Expand Up @@ -117,6 +117,29 @@ protected Cluster(Integer clusterId, int dimensions, int kappa0, int nu0, RealVe
this.psi0 = psi0; this.psi0 = psi0;
this.dimensions = dimensions; this.dimensions = dimensions;
} }

/**
* @param clusterId
* @param copy
* @see com.datumbox.framework.machinelearning.common.abstracts.modelers.AbstractClusterer.AbstractCluster
*/
protected Cluster(Integer clusterId, Cluster copy) {
super(clusterId, copy);

dimensions = copy.dimensions;
kappa0 = copy.kappa0;
nu0 = copy.nu0;
mu0 = copy.mu0;
psi0 = copy.psi0;
mean = copy.mean;
covariance = copy.covariance;
meanError = copy.meanError;
meanDf = copy.meanDf;
xi_sum = copy.xi_sum;
xi_square_sum = copy.xi_square_sum;
cache_covariance_determinant = copy.cache_covariance_determinant;
cache_covariance_inverse = copy.cache_covariance_inverse;
}


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
Expand Down Expand Up @@ -155,7 +178,6 @@ protected double posteriorLogPdf(Record r) {


x_mu = x_mu.subtract(mean); x_mu = x_mu.subtract(mean);



if(cache_covariance_determinant==null || cache_covariance_inverse==null) { if(cache_covariance_determinant==null || cache_covariance_inverse==null) {
synchronized(this) { synchronized(this) {
if(cache_covariance_determinant==null || cache_covariance_inverse==null) { if(cache_covariance_determinant==null || cache_covariance_inverse==null) {
Expand All @@ -166,28 +188,16 @@ protected double posteriorLogPdf(Record r) {
} }
} }


Double determinant=cache_covariance_determinant; double x_muInvSx_muT = (cache_covariance_inverse.preMultiply(x_mu)).dotProduct(x_mu);
RealMatrix invCovariance=cache_covariance_inverse; double normConst = 1.0/( Math.pow(2*Math.PI, dimensions/2.0) * Math.pow(cache_covariance_determinant, 0.5) );

double x_muInvSx_muT = (invCovariance.preMultiply(x_mu)).dotProduct(x_mu);

double normConst = 1.0/( Math.pow(2*Math.PI, dimensions/2.0) * Math.pow(determinant, 0.5) );


//double pdf = Math.exp(-0.5 * x_muInvSx_muT)*normConst; //double pdf = Math.exp(-0.5 * x_muInvSx_muT)*normConst;
double logPdf = -0.5 * x_muInvSx_muT + Math.log(normConst); double logPdf = -0.5 * x_muInvSx_muT + Math.log(normConst);
return logPdf; return logPdf;
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
protected boolean add(Integer rId, Record r) { protected void add(Record r) {
int size= recordIdSet.size();

if(recordIdSet.add(rId)==false) {
return false;
}

RealVector rv = MatrixDataframe.parseRecord(r, featureIds); RealVector rv = MatrixDataframe.parseRecord(r, featureIds);


//update cluster clusterParameters //update cluster clusterParameters
Expand All @@ -200,20 +210,15 @@ protected boolean add(Integer rId, Record r) {
xi_square_sum=xi_square_sum.add(rv.outerProduct(rv)); xi_square_sum=xi_square_sum.add(rv.outerProduct(rv));
} }


updateClusterParameters(); size++;


return true; updateClusterParameters();
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
protected boolean remove(Integer rId, Record r) { protected void remove(Record r) {
if(xi_sum==null || xi_square_sum==null) { size--;
return false; //The cluster is empty or uninitialized
}
if(recordIdSet.remove(rId)==false) {
return false;
}


RealVector rv = MatrixDataframe.parseRecord(r, featureIds); RealVector rv = MatrixDataframe.parseRecord(r, featureIds);


Expand All @@ -222,8 +227,6 @@ protected boolean remove(Integer rId, Record r) {
xi_square_sum=xi_square_sum.subtract(rv.outerProduct(rv)); xi_square_sum=xi_square_sum.subtract(rv.outerProduct(rv));


updateClusterParameters(); updateClusterParameters();

return true;
} }


private RealMatrix calculateMeanError(RealMatrix Psi, int kappa, int nu) { private RealMatrix calculateMeanError(RealMatrix Psi, int kappa, int nu) {
Expand All @@ -234,7 +237,6 @@ private RealMatrix calculateMeanError(RealMatrix Psi, int kappa, int nu) {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
protected void clear() { protected void clear() {
super.clear();
xi_sum = null; xi_sum = null;
xi_square_sum = null; xi_square_sum = null;
cache_covariance_determinant = null; cache_covariance_determinant = null;
Expand All @@ -244,38 +246,36 @@ protected void clear() {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
protected void updateClusterParameters() { protected void updateClusterParameters() {
if(xi_sum==null || xi_square_sum==null || psi0==null || mu0==null) {
return; //The cluster is empty or uninitialized
}
int n = recordIdSet.size();

//fetch hyperparameters //fetch hyperparameters


int kappa_n = kappa0 + n; int kappa_n = kappa0 + size;
int nu = nu0 + n; int nu = nu0 + size;


RealVector mu = xi_sum.mapDivide(n); RealVector mu = xi_sum.mapDivide(size);
RealVector mu_mu0 = mu.subtract(mu0); RealVector mu_mu0 = mu.subtract(mu0);


RealMatrix C = xi_square_sum.subtract( ( mu.outerProduct(mu) ).scalarMultiply(n) ); RealMatrix C = xi_square_sum.subtract( ( mu.outerProduct(mu) ).scalarMultiply(size) );


RealMatrix psi = psi0.add( C.add( ( mu_mu0.outerProduct(mu_mu0) ).scalarMultiply(kappa0*n/(double)kappa_n) )); RealMatrix psi = psi0.add( C.add( ( mu_mu0.outerProduct(mu_mu0) ).scalarMultiply(kappa0*size/(double)kappa_n) ));
//C = null; //C = null;
//mu_mu0 = null; //mu_mu0 = null;


mean = ( mu0.mapMultiply(kappa0) ).add( mu.mapMultiply(n) ).mapDivide(kappa_n); mean = ( mu0.mapMultiply(kappa0) ).add( mu.mapMultiply(size) ).mapDivide(kappa_n);


synchronized(this) { covariance = psi.scalarMultiply( (kappa_n+1.0)/(kappa_n*(nu - dimensions + 1.0)) );
covariance = psi.scalarMultiply( (kappa_n+1.0)/(kappa_n*(nu - dimensions + 1.0)) );
LUDecomposition lud = new LUDecomposition(covariance);
cache_covariance_determinant = lud.getDeterminant();
cache_covariance_inverse = lud.getSolver().getInverse();
}


meanError = calculateMeanError(psi, kappa_n, nu); meanError = calculateMeanError(psi, kappa_n, nu);
meanDf = nu-dimensions+1; meanDf = nu-dimensions+1;

cache_covariance_determinant = null;
cache_covariance_inverse = null;
} }


/** {@inheritDoc} */
@Override
protected Cluster copy2new(Integer newClusterId) {
return new Cluster(newClusterId, this);
}
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down

0 comments on commit c3f1361

Please sign in to comment.