Skip to content

Commit

Permalink
Merge pull request #1 from w-klijn/risotto
Browse files Browse the repository at this point in the history
Risotto
  • Loading branch information
soehrl committed Sep 13, 2018
2 parents c689819 + c8b5066 commit 9d72947
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 31 deletions.
124 changes: 93 additions & 31 deletions example/risotto/risotto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
#include <fstream>
#include <iomanip>
#include <iostream>
#include <deque>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <thread>


#include <nlohmann/json.hpp>

Expand All @@ -16,7 +22,7 @@
#include <arbor/mc_cell.hpp>
#include <arbor/profile/meter_manager.hpp>
#include <arbor/profile/profiler.hpp>
#include <arbor/simple_sampler.hpp>

#include <arbor/simulation.hpp>
#include <arbor/recipe.hpp>
#include <arbor/version.hpp>
Expand All @@ -25,6 +31,7 @@
#include <ancillary/json_meter.hpp>

#include "parameters.hpp"
#include "thread_forwarding_sampler.hpp"

#ifdef ARB_MPI_ENABLED
#include <mpi.h>
Expand All @@ -41,8 +48,7 @@ using arb::cell_probe_address;
using arb::mc_cell;
using arb::section_kind;

// Writes voltage trace as a json file.
void write_trace_json(const arb::trace_data<double>& trace);


// Generate a cell.
arb::mc_cell branch_cell(arb::cell_gid_type gid, const cell_parameters& params);
Expand Down Expand Up @@ -73,7 +79,7 @@ class ring_recipe: public arb::recipe {
seg->set_compartments(4);
}
}

c.add_detector({0,0}, 10);
c.add_synapse({1, 0.5}, "expsyn");

Expand Down Expand Up @@ -118,10 +124,10 @@ class ring_recipe: public arb::recipe {
arb::probe_info get_probe(cell_member_type id) const override {
// Get the appropriate kind for measuring voltage.
cell_probe_address::probe_kind kind = cell_probe_address::membrane_voltage;

// Measure at the soma.
arb::segment_location loc_soma(0, 0.0);

// Measure at the dendrite.
arb::segment_location loc_dendrite(1, 1.0);

Expand Down Expand Up @@ -182,6 +188,60 @@ struct cell_stats {
};



// publisher, to be used as a thread that consumes data generated in a different thread
//
// Waits for the wake_up signal guarded by a lock that is released on the other side
// While having the lock, the trace vector data is swapped and
// the quit flag are copied. With this done the lock is released and the other
// side notified that processing can continue.
using traces_type = std::vector<std::tuple< arb::cell_gid_type, arb::cell_lid_type,
std::vector<std::tuple<arb::time_type, double>> >>;

void publisher(
traces_type &traces,
std::mutex & queue_mutex, std::condition_variable &wake_up, bool& quit)
{
traces_type traces_local;
bool quit_local;
while (true) {
// Wait on the wake_up signal,
// TODO: WHy a unique lock here and a lock gaurd on the other side
std::unique_lock<std::mutex> lock(queue_mutex);
wake_up.wait(lock, [] {return true; });
// We now have the mutex

// Copy / swap the mutex guarded variables
traces_local.swap(traces);
quit_local = quit;

// Release our mutex and signal the other thread we are done
lock.unlock();
// TODO: Should this be all? Because only on receiver can continue
wake_up.notify_one();

// Simple plotting
for (auto& entry : traces_local) {
auto gid = std::get<0>(entry);
auto lid = std::get<1>(entry);
auto trace = std::get<2>(entry);

std::cout << gid << ", " << lid << " \n";
for (auto& value : trace) {
auto time = std::get<0>(value);
auto voltage = std::get<1>(value);

std::cout << time << ", " << voltage << "\n";
}
}
traces_local.clear();

if (quit_local) {
break;
}
}
}

int main(int argc, char** argv) {
try {
bool root = true;
Expand Down Expand Up @@ -227,14 +287,26 @@ int main(int argc, char** argv) {

// Set up the probe that will measure voltage in the cell.

// The id of the only probe on the cell: the cell_member type points to (cell 0, probe 0)
auto probe_id = cell_member_type{0, 0};
// The schedule for sampling is 10 samples every 1 ms.
auto sched = arb::regular_schedule(0.1);
// This is where the voltage samples will be stored as (time, value) pairs
arb::trace_data<double> voltage;
// Now attach the sampler at probe_id, with sampling schedule sched, writing to voltage
sim.add_sampler(arb::one_probe(probe_id), sched, arb::make_simple_sampler(voltage));


// Data object for storing traces data

traces_type traces;
// locking tools and signals for the thread communication
std::mutex queue_mutex;
std::condition_variable wake_up;
bool quit;

// Start the thread that will process the posted data
std::thread worker(publisher, std::ref(traces), std::ref(queue_mutex), std::ref(wake_up), std::ref(quit));


// Now attach the sampler at probe_id, with sampling schedule sched,
// Connect the thread_forwarding_sampler that will push all data on a mutex guarded vector
sim.add_sampler(arb::all_probes, sched,
thread_forwarding_sampler(traces, queue_mutex, wake_up));

// Set up recording of spikes to a vector on the root process.
std::vector<arb::spike> recorded_spikes;
Expand Down Expand Up @@ -275,39 +347,29 @@ int main(int argc, char** argv) {
}

// Write the samples to a json file.
if (root) write_trace_json(voltage);
{
std::lock_guard<std::mutex> guard(queue_mutex);
wake_up.notify_one();
quit = true;
}
worker.join();

auto report = arb::profile::make_meter_report(meters, context);
std::cout << report;

}
catch (std::exception& e) {
std::cerr << "exception caught in ring miniapp:\n" << e.what() << "\n";
return 1;
}

return 0;
}

void write_trace_json(const arb::trace_data<double>& trace) {
std::string path = "./voltages.json";

nlohmann::json json;
json["name"] = "ring demo";
json["units"] = "mV";
json["cell"] = "0.0";
json["probe"] = "0";

auto& jt = json["data"]["time"];
auto& jy = json["data"]["voltage"];
return 0;
}

for (const auto& sample: trace) {
jt.push_back(sample.t);
jy.push_back(sample.v);
}

std::ofstream file(path);
file << std::setw(1) << json << "\n";
}

// Helper used to interpolate in branch_cell.
template <typename T>
Expand Down
74 changes: 74 additions & 0 deletions example/risotto/thread_forwarding_sampler.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#pragma once

/*
* Simple(st?) implementation of a recorder of scalar
* trace data from a cell probe, with some metadata.
*/

#include <stdexcept>
#include <type_traits>
#include <vector>
#include <iostream>
#include <mutex>
#include <atomic>
#include <condition_variable>

#include <arbor/common_types.hpp>
#include <arbor/sampling.hpp>
#include <arbor/util/any_ptr.hpp>

using arb::cell_gid_type;
using arb::cell_lid_type;
using arb::cell_size_type;
using arb::cell_member_type;
using arb::cell_kind;
using arb::time_type;
using arb::cell_probe_address;
using arb::mc_cell;
using arb::section_kind;


using traces_type = std::vector<std::tuple< arb::cell_gid_type, arb::cell_lid_type,
std::vector<std::tuple<arb::time_type, double>> >>;


// sampler that inserts trace information into a vector
// data insertion is guarded by a mutex, multiple threads might call this sampler function
// After data insertion a signal is forwarded to the receiver side that
// information is available.
class thread_forwarding_sampler {
public:
explicit thread_forwarding_sampler(traces_type &traces, std::mutex& mutex,
std::condition_variable& wake_up) : traces_(traces) , mutex_(mutex), wake_up_(wake_up)
{}

void operator()(cell_member_type probe_id, arb::probe_tag tag, std::size_t n,
const arb::sample_record* recs) {

// Local data structure for storing the trace. Filled outside of the mutex
std::vector<std::tuple<arb::time_type, double>> trace;

// For all samples n in the current batch
for (std::size_t i = 0; i < n; ++i) {
// TODO: Do we need to check this every single time?
if (auto p = arb::util::any_cast<const double*>(recs[i].data)) {
trace.push_back({ recs[i].time, *p });
}
else {
throw std::runtime_error("unexpected sample type in printing_sampler");
}
}

{ // take the lock
std::lock_guard<std::mutex> guard(mutex_);
traces_.push_back({ probe_id.gid, probe_id.index, std::move(trace) });
}
//Tell the other side to wake up outside of the lock
wake_up_.notify_one();
}

private:
traces_type & traces_;
std::mutex& mutex_;
std::condition_variable& wake_up_;
};

0 comments on commit 9d72947

Please sign in to comment.