Skip to content

Commit

Permalink
parallelise vertex centrality; closes #90
Browse files Browse the repository at this point in the history
  • Loading branch information
mpadge committed Oct 16, 2019
1 parent 96cd156 commit ef3f4f6
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 4 deletions.
7 changes: 7 additions & 0 deletions R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ rcpp_centrality <- function(graph, vert_map_in, heap_type, edges) {
.Call(`_dodgr_rcpp_centrality`, graph, vert_map_in, heap_type, edges)
}

#' rcpp_centrality_vertex - parallel function
#'
#' @noRd
rcpp_centrality_vertex <- function(graph, vert_map_in, heap_type, dirtxt) {
invisible(.Call(`_dodgr_rcpp_centrality_vertex`, graph, vert_map_in, heap_type, dirtxt))
}

#' rcpp_centrality_edge - parallel function
#'
#' @noRd
Expand Down
21 changes: 17 additions & 4 deletions R/centrality.R
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,26 @@ dodgr_centrality <- function (graph, contract = TRUE, edges = TRUE,

graph2 <- convert_graph (graph, gr_cols)

if (parallel & edges)
if (parallel)
{
dirtxt <- get_random_prefix ("centrality")
rcpp_centrality_edge (graph2, vert_map, heap, dirtxt)
if (edges)
{
dirtxt <- get_random_prefix ("centrality_edge")
rcpp_centrality_edge (graph2, vert_map, heap, dirtxt)
} else
{
dirtxt <- get_random_prefix ("centrality_vert")
rcpp_centrality_vertex (graph2, vert_map, heap, dirtxt)
}
f <- list.files (tempdir (), full.names = TRUE)
files <- f [grep (dirtxt, f)]
centrality <- rcpp_aggregate_files (files, nrow (graph))
if (edges)
centrality <- rcpp_aggregate_files (files, nrow (graph))
else
{
v <- dodgr_vertices (graph)
centrality <- rcpp_aggregate_files (files, nrow (v))
}
junk <- file.remove (files) # nolint
} else
centrality <- rcpp_centrality (graph2, vert_map, heap, edges)
Expand Down
14 changes: 14 additions & 0 deletions src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
// rcpp_centrality_vertex
void rcpp_centrality_vertex(const Rcpp::DataFrame graph, const Rcpp::DataFrame vert_map_in, const std::string& heap_type, const std::string dirtxt);
RcppExport SEXP _dodgr_rcpp_centrality_vertex(SEXP graphSEXP, SEXP vert_map_inSEXP, SEXP heap_typeSEXP, SEXP dirtxtSEXP) {
BEGIN_RCPP
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< const Rcpp::DataFrame >::type graph(graphSEXP);
Rcpp::traits::input_parameter< const Rcpp::DataFrame >::type vert_map_in(vert_map_inSEXP);
Rcpp::traits::input_parameter< const std::string& >::type heap_type(heap_typeSEXP);
Rcpp::traits::input_parameter< const std::string >::type dirtxt(dirtxtSEXP);
rcpp_centrality_vertex(graph, vert_map_in, heap_type, dirtxt);
return R_NilValue;
END_RCPP
}
// rcpp_centrality_edge
void rcpp_centrality_edge(const Rcpp::DataFrame graph, const Rcpp::DataFrame vert_map_in, const std::string& heap_type, const std::string dirtxt);
RcppExport SEXP _dodgr_rcpp_centrality_edge(SEXP graphSEXP, SEXP vert_map_inSEXP, SEXP heap_typeSEXP, SEXP dirtxtSEXP) {
Expand Down Expand Up @@ -262,6 +275,7 @@ END_RCPP

static const R_CallMethodDef CallEntries[] = {
{"_dodgr_rcpp_centrality", (DL_FUNC) &_dodgr_rcpp_centrality, 4},
{"_dodgr_rcpp_centrality_vertex", (DL_FUNC) &_dodgr_rcpp_centrality_vertex, 4},
{"_dodgr_rcpp_centrality_edge", (DL_FUNC) &_dodgr_rcpp_centrality_edge, 4},
{"_dodgr_rcpp_aggregate_to_sf", (DL_FUNC) &_dodgr_rcpp_aggregate_to_sf, 3},
{"_dodgr_rcpp_aggregate_files", (DL_FUNC) &_dodgr_rcpp_aggregate_files, 2},
Expand Down
92 changes: 92 additions & 0 deletions src/centrality.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,66 @@ void inst_graph (std::shared_ptr<DGraph> g, unsigned int nedges,
}
// # nocov end

struct OneCentralityVert : public RcppParallel::Worker
{
size_t nverts; // can't be const because of reinterpret case
const std::string dirtxt;
const std::string heap_type;

std::shared_ptr <DGraph> g;

// constructor
OneCentralityVert (
const size_t nverts_in,
const std::string dirtxt_in,
const std::string heap_type_in,
const std::shared_ptr <DGraph> g_in) :
nverts (nverts_in), dirtxt (dirtxt_in),
heap_type (heap_type_in), g (g_in)
{
}

// Function to generate random file names
std::string random_name(size_t len) {
auto randchar = []() -> char
{
const char charset[] = \
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
const size_t max_index = (sizeof(charset) - 1);
//return charset [ rand() % max_index ];
size_t i = static_cast <size_t> (floor (unif_rand () * max_index));
return charset [i];
}; // # nocov
std::string str (len, 0);
std::generate_n (str.begin(), len, randchar);
return str;
}

// Parallel function operator
void operator() (size_t begin, size_t end)
{
std::shared_ptr<PF::PathFinder> pathfinder =
std::make_shared <PF::PathFinder> (nverts,
*run_sp::getHeapImpl (heap_type), g);

std::vector <double> cent (nverts, 0.0);

for (size_t v = begin; v < end; v++)
{
pathfinder->Centrality_vertex (cent, v);
}
// dump flowvec to a file; chance of re-generating same file name is
// 61^10, so there's no check for re-use of same
std::string file_name = dirtxt + "_" + random_name (10) + ".dat";
std::ofstream out_file;
out_file.open (file_name, std::ios::binary | std::ios::out);
out_file.write (reinterpret_cast <char *>(&nverts), sizeof (size_t));
out_file.write (reinterpret_cast <char *>(&cent [0]),
static_cast <std::streamsize> (nverts * sizeof (double)));
out_file.close ();
}
};

struct OneCentralityEdge : public RcppParallel::Worker
{
size_t nverts; // can't be const because of reinterpret case
Expand Down Expand Up @@ -333,6 +393,38 @@ Rcpp::NumericVector rcpp_centrality (const Rcpp::DataFrame graph,
return (dout);
}

//' rcpp_centrality_vertex - parallel function
//'
//' @noRd
// [[Rcpp::export]]
void rcpp_centrality_vertex (const Rcpp::DataFrame graph,
const Rcpp::DataFrame vert_map_in,
const std::string& heap_type,
const std::string dirtxt)
{
std::vector <std::string> from = graph ["from"];
std::vector <std::string> to = graph ["to"];
std::vector <double> dist = graph ["d"];
std::vector <double> wt = graph ["d_weighted"];

const unsigned int nedges = static_cast <unsigned int> (graph.nrow ());
std::map <std::string, unsigned int> vert_map;
std::vector <std::string> vert_map_id = vert_map_in ["vert"];
std::vector <unsigned int> vert_map_n = vert_map_in ["id"];
size_t nverts = run_sp::make_vert_map (vert_map_in, vert_map_id,
vert_map_n, vert_map);

std::shared_ptr <DGraph> g = std::make_shared <DGraph> (nverts);
inst_graph (g, nedges, vert_map, from, to, dist, wt);

// Create parallel worker
OneCentralityVert one_centrality (nverts, dirtxt, heap_type, g);

GetRNGstate (); // Initialise R random seed
RcppParallel::parallelFor (0, nverts, one_centrality);
PutRNGstate ();
}

//' rcpp_centrality_edge - parallel function
//'
//' @noRd
Expand Down
5 changes: 5 additions & 0 deletions src/run_sp.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ Rcpp::NumericVector rcpp_centrality (const Rcpp::DataFrame graph,
const std::string& heap_type,
bool edges); // FALSE for vertex centrality

void rcpp_centrality_vertex (const Rcpp::DataFrame graph,
const Rcpp::DataFrame vert_map_in,
const std::string& heap_type,
const std::string dirtxt);

void rcpp_centrality_edge (const Rcpp::DataFrame graph,
const Rcpp::DataFrame vert_map_in,
const std::string& heap_type,
Expand Down

0 comments on commit ef3f4f6

Please sign in to comment.