Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial support for SIMD-instructions #779

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,38 @@ if (("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") OR
endif (("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") OR
("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU"))

# macros for checking support for SSE
if (("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") OR
("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU"))
include(CheckCXXCompilerFlag)
# idea taken from https://github.com/PointCloudLibrary/pcl/blob/36fb6d2b2e9a8ab65507c0fe9606c05036c37460/cmake/pcl_find_sse.cmake#L154
include(CheckCXXSourceRuns)
set(CMAKE_REQUIRED_FLAGS "-msse")
check_cxx_source_runs("
#include <xmmintrin.h>
int main ()
{
__m128 a, b;
float fa[4] = {0., 1., 2., 3.};
float fb[4] = {-1., 2., 10., 42.};
float fc[4] = {0.};
a = _mm_loadu_ps(fa);
b = _mm_loadu_ps(fb);
b = _mm_mul_ps(a, b);
_mm_storeu_ps(fc, b);
return (0);
}"
HAVE_SSE_EXTENSIONS)
if (HAVE_SSE_EXTENSIONS)
# add flags for compiler, globally now, maybe locally in the future
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -msse -mfpmath=sse")
# add definition for compiler, globally now, maybe locally in the future
add_definitions("-DUSE_SSE")
endif (HAVE_SSE_EXTENSIONS)
set(CMAKE_REQUIRED_FLAGS)
endif (("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") OR
("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU"))

add_subdirectory(src)
add_subdirectory(python)

Expand Down
2 changes: 1 addition & 1 deletion src/artm/core/cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ThetaCacheEntry : boost::noncopyable {
};

// CacheManager class is responsible for caching ThetaMatrix in between calls to different APIs.
// This class is used when the user calls FitOffline / FitOnline / Transfor to store the resulting theta matrix.
// This class is used when the user calls FitOffline / FitOnline / Transform to store the resulting theta matrix.
// (at least when theta_matrix_type is set to ThetaMatrixType_Cache).
// Later user may retrieve the data from CacheManager via calls to ArtmRequestThetaMatrix.
// The cache is organized as a set of entries, each entry associated with a single batch.
Expand Down
195 changes: 123 additions & 72 deletions src/artm/core/processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@

#include "artm/utility/blas.h"

#if defined(USE_SSE)
#include <xmmintrin.h>
#endif // USE_SSE

namespace util = artm::utility;
namespace fs = boost::filesystem;
using ::util::CsrMatrix;
Expand Down Expand Up @@ -99,7 +103,7 @@ class NormalizeThetaAgent : public RegularizeThetaAgent {
float sum_inv = sum > 0.0f ? (1.0f / sum) : 0.0f;
for (int topic_index = 0; topic_index < topics_size; ++topic_index) {
float val = sum_inv * (n_td[topic_index] + r_td[topic_index]);
if (val < 1e-16f) val = 0.0f;
if (val < kProcessorEps) val = 0.0f;

// Hack-hack, write normalized values back to n_td
const_cast<float*>(n_td)[topic_index] = val;
Expand Down Expand Up @@ -341,6 +345,55 @@ InitializeSparseNdw(const Batch& batch, const ProcessBatchesArgs& args) {
return std::make_shared<CsrMatrix<float>>(batch.token_size(), &n_dw_val, &n_dw_row_ptr, &n_dw_col_ind);
}

static inline void
UpdateNtdCounters(const float *phi_ptr,
const float *theta_ptr,
float *ntd_ptr,
const int num_topics,
const float n_dw) {
#if defined(USE_SSE)
const int SSE_SHIFT = 4;
int num_cycles = num_topics / SSE_SHIFT;
int rem = num_topics % SSE_SHIFT;
__m128 res = _mm_setzero_ps();
const float *tmp_phi_ptr = phi_ptr;
for (int k = 0; k < num_cycles; ++k) {
res = _mm_add_ps(res, _mm_mul_ps(_mm_loadu_ps(tmp_phi_ptr), _mm_loadu_ps(theta_ptr)));
tmp_phi_ptr += SSE_SHIFT;
theta_ptr += SSE_SHIFT;
}
float p_dw_val_arr[SSE_SHIFT] = {0.};
_mm_storeu_ps(p_dw_val_arr, res);
float p_dw_val = 0;
for (int k = 0; k < SSE_SHIFT; ++k) p_dw_val += p_dw_val_arr[k];
// add remaining values
for (int k = 0; k < rem; ++k) {
p_dw_val += tmp_phi_ptr[k] * theta_ptr[k];
}
if (p_dw_val == 0) return;
const float alpha = n_dw / p_dw_val;
__m128 m_alpha = _mm_load1_ps(&alpha);
for (int k = 0; k < num_cycles; ++k) {
__m128 ntd_vals = _mm_loadu_ps(ntd_ptr);
__m128 add_vals = _mm_mul_ps(m_alpha, _mm_loadu_ps(phi_ptr));
_mm_storeu_ps(ntd_ptr, _mm_add_ps(ntd_vals, add_vals));
ntd_ptr += SSE_SHIFT;
phi_ptr += SSE_SHIFT;
}
for (int k = 0; k < rem; ++k) {
ntd_ptr[k] += alpha * phi_ptr[k];
}
#else
float p_dw_val = 0;
for (int k = 0; k < num_topics; ++k) p_dw_val += phi_ptr[k] * theta_ptr[k];
if (p_dw_val == 0) return;
const float alpha = n_dw / p_dw_val;
for (int k = 0; k < num_topics; ++k)
ntd_ptr[k] += alpha * phi_ptr[k];
#endif // USE_SSE
}


static void
InferThetaAndUpdateNwtSparse(const ProcessBatchesArgs& args, const Batch& batch, float batch_weight,
const CsrMatrix<float>& sparse_ndw,
Expand All @@ -359,90 +412,85 @@ InferThetaAndUpdateNwtSparse(const ProcessBatchesArgs& args, const Batch& batch,
token_id[token_index] = p_wt.token_index(Token(batch.class_id(token_index), batch.token(token_index)));

if (args.opt_for_avx()) {
// This version is about 40% faster than the second alternative below.
// Both versions return 100% equal results.
// Speedup is due to several factors:
// 1. explicit loops instead of blas->saxpy and blas->sdot
// makes compiler generate AVX instructions (vectorized 128-bit float-point operations)
// 2. better memory usage (reduced bandwith to DRAM and more sequential accesss)

int max_local_token_size = 0; // find the longest document from the batch
for (int d = 0; d < docs_count; ++d) {
const int begin_index = sparse_ndw.row_ptr()[d];
const int end_index = sparse_ndw.row_ptr()[d + 1];
const int local_token_size = end_index - begin_index;
max_local_token_size = std::max(max_local_token_size, local_token_size);
}
LocalPhiMatrix<float> local_phi(max_local_token_size, num_topics);
LocalThetaMatrix<float> r_td(num_topics, 1);
std::vector<float> helper_vector(num_topics, 0.0f);

for (int d = 0; d < docs_count; ++d) {
float* ntd_ptr = &n_td(0, d);
float* theta_ptr = &(*theta_matrix)(0, d); // NOLINT

const int begin_index = sparse_ndw.row_ptr()[d];
const int end_index = sparse_ndw.row_ptr()[d + 1];
local_phi.InitializeZeros();
bool item_has_tokens = false;
for (int i = begin_index; i < end_index; ++i) {
int w = sparse_ndw.col_ind()[i];
if (token_id[w] == ::artm::core::PhiMatrix::kUndefIndex) continue;
item_has_tokens = true;
float* local_phi_ptr = &local_phi(i - begin_index, 0);
p_wt.get(token_id[w], &helper_vector);
for (int k = 0; k < num_topics; ++k) local_phi_ptr[k] = helper_vector[k];
// This version is about 40% faster than the second alternative below.
// Both versions return 100% equal results.
// Speedup is due to several factors:
// 1. explicit loops instead of blas->saxpy and blas->sdot
// makes compiler generate AVX instructions (vectorized 128-bit float-point operations)
// 2. better memory usage (reduced bandwith to DRAM and more sequential accesss)

int max_local_token_size = 0; // find the longest document from the batch
for (int d = 0; d < docs_count; ++d) {
const int begin_index = sparse_ndw.row_ptr()[d];
const int end_index = sparse_ndw.row_ptr()[d + 1];
const int local_token_size = end_index - begin_index;
max_local_token_size = std::max(max_local_token_size, local_token_size);
}
LocalPhiMatrix<float> local_phi(max_local_token_size, num_topics);
LocalThetaMatrix<float> r_td(num_topics, 1);
std::vector<float> helper_vector(num_topics, 0.0f);

if (!item_has_tokens) continue; // continue to the next item

for (int inner_iter = 0; inner_iter < args.num_document_passes(); ++inner_iter) {
for (int k = 0; k < num_topics; ++k)
ntd_ptr[k] = 0.0f;
for (int d = 0; d < docs_count; ++d) {
float* ntd_ptr = &n_td(0, d);
float* theta_ptr = &(*theta_matrix)(0, d); // NOLINT

const int begin_index = sparse_ndw.row_ptr()[d];
const int end_index = sparse_ndw.row_ptr()[d + 1];
local_phi.InitializeZeros();
bool item_has_tokens = false;
for (int i = begin_index; i < end_index; ++i) {
const float* phi_ptr = &local_phi(i - begin_index, 0);
int w = sparse_ndw.col_ind()[i];
if (token_id[w] == ::artm::core::PhiMatrix::kUndefIndex) continue;
item_has_tokens = true;
float* local_phi_ptr = &local_phi(i - begin_index, 0);
p_wt.get(token_id[w], &helper_vector);
for (int k = 0; k < num_topics; ++k) local_phi_ptr[k] = helper_vector[k];
}

float p_dw_val = 0;
for (int k = 0; k < num_topics; ++k)
p_dw_val += phi_ptr[k] * theta_ptr[k];
if (p_dw_val == 0) continue;
if (!item_has_tokens) continue; // continue to the next item

const float alpha = sparse_ndw.val()[i] / p_dw_val;
for (int inner_iter = 0; inner_iter < args.num_document_passes(); ++inner_iter) {
for (int k = 0; k < num_topics; ++k)
ntd_ptr[k] += alpha * phi_ptr[k];
}
ntd_ptr[k] = 0.0f;

for (int k = 0; k < num_topics; ++k)
theta_ptr[k] *= ntd_ptr[k];
for (int i = begin_index; i < end_index; ++i) {
UpdateNtdCounters(&local_phi(i - begin_index, 0),
theta_ptr,
ntd_ptr,
num_topics,
sparse_ndw.val()[i]);
}

r_td.InitializeZeros();
theta_agents.Apply(d, inner_iter, num_topics, theta_ptr, r_td.get_data());
}
}
} else {
std::shared_ptr<LocalPhiMatrix<float>> phi_matrix_ptr = InitializePhi(batch, p_wt);
if (phi_matrix_ptr == nullptr) return;
const LocalPhiMatrix<float>& phi_matrix = *phi_matrix_ptr;
for (int inner_iter = 0; inner_iter < args.num_document_passes(); ++inner_iter) {
// helper_td will represent either n_td or r_td, depending on the context - see code below
LocalThetaMatrix<float> helper_td(theta_matrix->num_topics(), theta_matrix->num_items());
helper_td.InitializeZeros();
for (int k = 0; k < num_topics; ++k)
theta_ptr[k] *= ntd_ptr[k];

for (int d = 0; d < docs_count; ++d) {
for (int i = sparse_ndw.row_ptr()[d]; i < sparse_ndw.row_ptr()[d + 1]; ++i) {
int w = sparse_ndw.col_ind()[i];
float p_dw_val = blas->sdot(num_topics, &phi_matrix(w, 0), 1, &(*theta_matrix)(0, d), 1); // NOLINT
if (p_dw_val == 0) continue;
blas->saxpy(num_topics, sparse_ndw.val()[i] / p_dw_val, &phi_matrix(w, 0), 1, &helper_td(0, d), 1);
r_td.InitializeZeros();
theta_agents.Apply(d, inner_iter, num_topics, theta_ptr, r_td.get_data());
}
}
} else {
std::shared_ptr<LocalPhiMatrix<float>> phi_matrix_ptr = InitializePhi(batch, p_wt);
if (phi_matrix_ptr == nullptr) return;
const LocalPhiMatrix<float>& phi_matrix = *phi_matrix_ptr;
for (int inner_iter = 0; inner_iter < args.num_document_passes(); ++inner_iter) {
// helper_td will represent either n_td or r_td, depending on the context - see code below
LocalThetaMatrix<float> helper_td(theta_matrix->num_topics(), theta_matrix->num_items());
helper_td.InitializeZeros();

for (int d = 0; d < docs_count; ++d) {
for (int i = sparse_ndw.row_ptr()[d]; i < sparse_ndw.row_ptr()[d + 1]; ++i) {
int w = sparse_ndw.col_ind()[i];
float p_dw_val = blas->sdot(num_topics, &phi_matrix(w, 0), 1, &(*theta_matrix)(0, d), 1); // NOLINT
if (p_dw_val == 0) continue;
blas->saxpy(num_topics, sparse_ndw.val()[i] / p_dw_val, &phi_matrix(w, 0), 1, &helper_td(0, d), 1);
}
}

AssignDenseMatrixByProduct(*theta_matrix, helper_td, theta_matrix);
AssignDenseMatrixByProduct(*theta_matrix, helper_td, theta_matrix);

helper_td.InitializeZeros(); // from now this represents r_td
theta_agents.Apply(inner_iter, *theta_matrix, &helper_td);
}
helper_td.InitializeZeros(); // from now this represents r_td
theta_agents.Apply(inner_iter, *theta_matrix, &helper_td);
}
}

CreateThetaCacheEntry(new_cache_entry_ptr, theta_matrix, batch, p_wt, args);
Expand Down Expand Up @@ -584,7 +632,7 @@ CalcScores(ScoreCalculatorInterface* score_calc, const Batch& batch,

std::vector<Token> token_dict;
for (int token_index = 0; token_index < batch.token_size(); ++token_index) {
token_dict.push_back(Token(batch.class_id(token_index), batch.token(token_index)));
token_dict.emplace_back(batch.class_id(token_index), batch.token(token_index));
}

std::shared_ptr<Score> score = score_calc->CreateScore();
Expand Down Expand Up @@ -658,6 +706,9 @@ void Processor::ThreadFunction() {
pop_retries++;
LOG_IF(INFO, pop_retries == pop_retries_max) << "No data in processing queue, waiting...";

// Warning: sleep function marks as DEPRECATED since version 1.53.0:
// See http://www.boost.org/doc/libs/1_53_0/doc/html/thread/thread_management.html,
// part thread.thread_management.this_thread.sleep
boost::this_thread::sleep(boost::posix_time::milliseconds(kIdleLoopFrequency));

continue;
Expand Down