Skip to content

Commit

Permalink
#372: X-Means parallel implementation that can be controlled by user.
Browse files Browse the repository at this point in the history
  • Loading branch information
annoviko committed Oct 30, 2017
1 parent a55b940 commit 02668f7
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 127 deletions.
157 changes: 100 additions & 57 deletions ccore/src/cluster/xmeans.cpp
Expand Up @@ -31,17 +31,23 @@
#include "utils.hpp"


#define XMEANS_PARALLEL_OPTIMIZATION


namespace cluster_analysis {


const std::size_t xmeans::DEFAULT_AMOUNT_THREADS = 10;
const double xmeans::DEFAULT_SPLIT_DIFFERENCE = 0.001;
const std::size_t xmeans::DEFAULT_DATA_SIZE_PARALLEL_PROCESSING = 100000;


xmeans::xmeans(const dataset & p_centers, const std::size_t p_kmax, const double p_tolerance, const splitting_type p_criterion) :
m_centers(p_centers),
m_maximum_clusters(p_kmax),
m_tolerance(p_tolerance * p_tolerance),
m_criterion(p_criterion)
m_criterion(p_criterion),
m_parallel_processing(false),
m_parallel_trigger(DEFAULT_DATA_SIZE_PARALLEL_PROCESSING)
{ }


Expand All @@ -51,6 +57,10 @@ xmeans::~xmeans(void) { }
void xmeans::process(const dataset & data, cluster_data & output_result) {
m_ptr_data = &data;

if (m_ptr_data->size() >= m_parallel_trigger) {
m_parallel_processing = true;
}

output_result = xmeans_data();
m_ptr_result = (xmeans_data *)&output_result;

Expand All @@ -72,6 +82,11 @@ void xmeans::process(const dataset & data, cluster_data & output_result) {
}


void xmeans::set_parallel_processing_trigger(const std::size_t p_data_size) {
m_parallel_trigger = p_data_size;
}


void xmeans::improve_parameters(cluster_sequence & improved_clusters, dataset & improved_centers, const index_sequence & available_indexes) {
double current_change = std::numeric_limits<double>::max();

Expand All @@ -83,55 +98,30 @@ void xmeans::improve_parameters(cluster_sequence & improved_clusters, dataset &


void xmeans::improve_structure() {
const double difference = 0.001;
dataset allocated_centers;

for (std::size_t index = 0; index < m_ptr_result->clusters()->size(); index++) {
cluster & current_cluster = (*(m_ptr_result->clusters()))[index];
if (m_parallel_processing) {
std::vector< std::future<void> > pool_improve_futures;
for (std::size_t index = 0; index < m_ptr_result->clusters()->size(); index++) {
auto improve_functor = std::bind(&xmeans::improve_region_structure, this,
std::cref((*(m_ptr_result->clusters()))[index]),
std::cref(m_centers[index]),
std::ref(allocated_centers));

dataset parent_child_centers;
parent_child_centers.push_back( m_centers[index] ); /* the first child */
parent_child_centers.push_back( m_centers[index] ); /* the second child */

/* change location of each child (total number of children is two) */
for (std::size_t dimension = 0; dimension < parent_child_centers[0].size(); dimension++) {
parent_child_centers[0][dimension] -= difference;
parent_child_centers[1][dimension] += difference;
pool_improve_futures.emplace_back(std::async(std::launch::async, improve_functor));
}

/* solve k-means problem for children where data of parent are used */
cluster_sequence parent_child_clusters(2, cluster());

improve_parameters(parent_child_clusters, parent_child_centers, current_cluster);

/* splitting criterion */
cluster_sequence parent_cluster(1, current_cluster);
dataset parent_center(1, m_centers[index]);

double parent_scores = splitting_criterion(parent_cluster, parent_center);
double child_scores = splitting_criterion(parent_child_clusters, parent_child_centers);

if (m_criterion == splitting_type::BAYESIAN_INFORMATION_CRITERION) {
/* take the best representation of the considered data */
if (parent_scores > child_scores) {
allocated_centers.push_back(m_centers[index]);
}
else {
allocated_centers.push_back(parent_child_centers[0]);
allocated_centers.push_back(parent_child_centers[1]);
}
for (auto & improve_future : pool_improve_futures) {
improve_future.get();
}
else if (m_criterion == splitting_type::MINIMUM_NOISELESS_DESCRIPTION_LENGTH) {
if (parent_scores < child_scores) {
allocated_centers.push_back(m_centers[index]);
}
else {
allocated_centers.push_back(parent_child_centers[0]);
allocated_centers.push_back(parent_child_centers[1]);
}
}
else {
for (std::size_t index = 0; index < m_ptr_result->clusters()->size(); index++) {
improve_region_structure((*(m_ptr_result->clusters()))[index], m_centers[index], allocated_centers);
}
}


/* update current centers */
m_centers.clear();
for (std::size_t index = 0; index < allocated_centers.size(); index++) {
Expand All @@ -140,6 +130,53 @@ void xmeans::improve_structure() {
}


void xmeans::improve_region_structure(const cluster & p_cluster, const point & p_center, dataset & p_allocated_centers) {
dataset parent_child_centers;

parent_child_centers.push_back( p_center ); /* the first child */
parent_child_centers.push_back( p_center ); /* the second child */

/* change location of each child (total number of children is two) */
for (std::size_t dimension = 0; dimension < parent_child_centers[0].size(); dimension++) {
parent_child_centers[0][dimension] -= DEFAULT_SPLIT_DIFFERENCE;
parent_child_centers[1][dimension] += DEFAULT_SPLIT_DIFFERENCE;
}

/* solve k-means problem for children where data of parent are used */
cluster_sequence parent_child_clusters(2, cluster());

improve_parameters(parent_child_clusters, parent_child_centers, p_cluster);

/* splitting criterion */
cluster_sequence parent_cluster(1, p_cluster);
dataset parent_center(1, p_center);

double parent_scores = splitting_criterion(parent_cluster, parent_center);
double child_scores = splitting_criterion(parent_child_clusters, parent_child_centers);

bool divide_descision = false;

if (m_criterion == splitting_type::BAYESIAN_INFORMATION_CRITERION) {
divide_descision = (parent_scores <= child_scores);
}
else if (m_criterion == splitting_type::MINIMUM_NOISELESS_DESCRIPTION_LENGTH) {
divide_descision = (parent_scores >= child_scores);
}

if (m_parallel_processing) {
std::lock_guard<std::mutex> locker(m_mutex);
}

if (divide_descision) {
p_allocated_centers.push_back(parent_child_centers[0]);
p_allocated_centers.push_back(parent_child_centers[1]);
}
else {
p_allocated_centers.push_back(p_center);
}
}


double xmeans::splitting_criterion(const cluster_sequence & analysed_clusters, const dataset & analysed_centers) const {
switch(m_criterion) {
case splitting_type::BAYESIAN_INFORMATION_CRITERION:
Expand All @@ -166,9 +203,9 @@ void xmeans::update_clusters(cluster_sequence & analysed_clusters, const dataset
}
}
else {
for (index_sequence::const_iterator index_object = available_indexes.begin(); index_object != available_indexes.end(); index_object++) {
std::size_t index_cluster = find_proper_cluster(analysed_centers, (*m_ptr_data)[*index_object]);
analysed_clusters[index_cluster].push_back(*index_object);
for (auto & index_object : available_indexes) {
std::size_t index_cluster = find_proper_cluster(analysed_centers, (*m_ptr_data)[index_object]);
analysed_clusters[index_cluster].push_back(index_object);
}
}
}
Expand All @@ -190,25 +227,31 @@ std::size_t xmeans::find_proper_cluster(const dataset & analysed_centers, const
return index_optimum;
}

double xmeans::foo(cluster & p) {
return 0.0;
}

double xmeans::update_centers(const cluster_sequence & analysed_clusters, dataset & analysed_centers) {
double maximum_change = 0;

/* for each cluster */
std::vector<std::future<double>> pool_update_futures;
if (m_parallel_processing) {
std::vector< std::future<double> > pool_update_futures;
for (std::size_t index_cluster = 0; index_cluster < analysed_clusters.size(); index_cluster++) {
auto update_functor = std::bind(&xmeans::update_center, this, std::cref(analysed_clusters[index_cluster]), std::ref(analysed_centers[index_cluster]));
pool_update_futures.emplace_back(std::async(std::launch::async, update_functor));
}

for (std::size_t index_cluster = 0; index_cluster < analysed_clusters.size(); index_cluster++) {
auto update_functor = std::bind(&xmeans::update_center, this, std::cref(analysed_clusters[index_cluster]), std::ref(analysed_centers[index_cluster]));
pool_update_futures.emplace_back(std::async(std::launch::async, update_functor));
for (auto & update_future : pool_update_futures) {
double distance = update_future.get();
if (distance > maximum_change) {
maximum_change = distance;
}
}
}
else {
for (std::size_t index_cluster = 0; index_cluster < analysed_clusters.size(); index_cluster++) {
double distance = update_center(analysed_clusters[index_cluster], analysed_centers[index_cluster]);

for (auto & update_future : pool_update_futures) {
double distance = update_future.get();
if (distance > maximum_change) {
maximum_change = distance;
if (distance > maximum_change) {
maximum_change = distance;
}
}
}

Expand Down
24 changes: 20 additions & 4 deletions ccore/src/cluster/xmeans.hpp
Expand Up @@ -29,6 +29,7 @@
#include "cluster/xmeans_data.hpp"



namespace cluster_analysis {


Expand All @@ -39,8 +40,11 @@ enum class splitting_type {


class xmeans : public cluster_algorithm {
public:
const static std::size_t DEFAULT_DATA_SIZE_PARALLEL_PROCESSING;

private:
const static std::size_t DEFAULT_AMOUNT_THREADS;
const static double DEFAULT_SPLIT_DIFFERENCE;

private:
dataset m_centers;
Expand All @@ -55,6 +59,10 @@ class xmeans : public cluster_algorithm {

splitting_type m_criterion;

std::size_t m_parallel_trigger = DEFAULT_DATA_SIZE_PARALLEL_PROCESSING;

bool m_parallel_processing = false;

std::mutex m_mutex;

public:
Expand Down Expand Up @@ -90,18 +98,26 @@ class xmeans : public cluster_algorithm {
*/
virtual void process(const dataset & data, cluster_data & output_result) override;

/**
*
* @brief Set custom trigger (that is defined by data size) for parallel processing,
* by default this value is defined by static constant DEFAULT_DATA_SIZE_PARALLEL_PROCESSING.
*
* @param[in] p_data_size: data size that triggers parallel processing.
*
*/
void set_parallel_processing_trigger(const std::size_t p_data_size);

private:
void update_clusters(cluster_sequence & clusters, const dataset & centers, const index_sequence & available_indexes);

double update_centers(const cluster_sequence & clusters, dataset & centers);

double update_center(const cluster & p_cluster, point & p_center);

double foo(cluster & p);

void improve_structure(void);

void improve_region_structure(void);
void improve_region_structure(const cluster & p_cluster, const point & p_center, dataset & p_allocated_centers);

void improve_parameters(cluster_sequence & clusters, dataset & centers, const index_sequence & available_indexes);

Expand Down
24 changes: 23 additions & 1 deletion ccore/tst/samples.cpp
Expand Up @@ -18,11 +18,13 @@
*
*/


#include "samples.hpp"

#include <fstream>
#include <sstream>
#include <iostream>
#include <random>
#include <sstream>


#if defined _WIN32 || defined __CYGWIN__
Expand Down Expand Up @@ -152,6 +154,26 @@ std::shared_ptr<dataset> simple_sample_factory::create_sample(const SAMPLE_SIMPL
}


std::shared_ptr<dataset> simple_sample_factory::create_random_sample(const std::size_t p_cluster_size, const std::size_t p_clusters) {
std::shared_ptr<dataset> sample_data(new dataset);

std::random_device device;
std::mt19937 generator(device());

for (std::size_t index_cluster = 0; index_cluster < p_clusters; index_cluster++) {
for (std::size_t index_point = 0; index_point < p_cluster_size; index_point++) {
sample_data->push_back({
std::generate_canonical<double, 32>(generator) + index_point * 5,
std::generate_canonical<double, 32>(generator) + index_point * 5
});
}
}

return sample_data;
}



std::shared_ptr<dataset> fcps_sample_factory::create_sample(const FCPS_SAMPLE sample) {
const std::string path_sample = m_sample_table.at(sample);
return generic_sample_factory::create_sample(path_sample);
Expand Down
13 changes: 12 additions & 1 deletion ccore/tst/samples.hpp
Expand Up @@ -103,6 +103,18 @@ class simple_sample_factory {
*
*/
static std::shared_ptr<dataset> create_sample(const SAMPLE_SIMPLE sample);

/**
*
* @brief Creates random (uniform distribution) sample for cluster analysis.
*
* @param[in] p_cluster_size: size of each cluster in dataset.
* @param[in] p_clusters: amount of clusters in dataset.
*
* @return Smart pointer to created dataset.
*
*/
static std::shared_ptr<dataset> create_random_sample(const std::size_t p_cluster_size, const std::size_t p_clusters);
};


Expand Down Expand Up @@ -162,4 +174,3 @@ class fcps_sample_factory {
*/
static std::shared_ptr<dataset> create_sample(const FCPS_SAMPLE sample);
};

2 changes: 1 addition & 1 deletion ccore/tst/utest-som.cpp
Expand Up @@ -255,7 +255,7 @@ static void template_simulate_check_winners(const som_conn_type conn_type, const
som_map.train(*sample_simple_01.get(), 100, autostop);

std::vector<std::size_t> expected_winners = { 0, 1 };
for (int i = 0; i < sample_simple_01->size(); i++) {
for (std::size_t i = 0; i < sample_simple_01->size(); i++) {
std::size_t index_winner = som_map.simulate(sample_simple_01->at(i));
if ( (i == 0) && (index_winner != 0) ) {
expected_winners = { 1, 0 };
Expand Down

0 comments on commit 02668f7

Please sign in to comment.