Skip to content

Commit

Permalink
add support for plotjugler udp json streamer (#631)
Browse files Browse the repository at this point in the history
* add support for plotjugler udp streamer

Signed-off-by: Andrey Parfenov <a1994ndrey@gmail.com>
  • Loading branch information
Andrey1994 committed May 29, 2023
1 parent 325c20a commit 236ff5b
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 3 deletions.
19 changes: 18 additions & 1 deletion src/board_controller/board.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "custom_cast.h"
#include "file_streamer.h"
#include "multicast_streamer.h"
#include "plotjuggler_udp_streamer.h"

#include "spdlog/sinks/null_sink.h"

Expand Down Expand Up @@ -246,7 +247,6 @@ void Board::free_packages ()

int Board::add_streamer (const char *streamer_params, int preset)
{

std::string preset_str = preset_to_string (preset);
if (board_descr.find (preset_str) == board_descr.end ())
{
Expand Down Expand Up @@ -286,6 +286,23 @@ int Board::add_streamer (const char *streamer_params, int preset)
streamer_dest.c_str (), streamer_mods.c_str ());
streamer = new MultiCastStreamer (streamer_dest.c_str (), port, num_rows);
}
if (streamer_type == "plotjuggler_udp")
{
int port = 0;
try
{
port = std::stoi (streamer_mods);
}
catch (const std::exception &e)
{
safe_logger (spdlog::level::err, e.what ());
return (int)BrainFlowExitCodes::INVALID_ARGUMENTS_ERROR;
}
safe_logger (spdlog::level::trace, "PlotJuggler UDP Streamer, ip addr: {}, port: {}",
streamer_dest.c_str (), streamer_mods.c_str ());
streamer =
new PlotJugglerUDPStreamer (streamer_dest.c_str (), port, board_descr[preset_str]);
}

if (streamer == NULL)
{
Expand Down
1 change: 1 addition & 0 deletions src/board_controller/build.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ SET (BOARD_CONTROLLER_SRC
${CMAKE_CURRENT_SOURCE_DIR}/src/board_controller/openbci/galea.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/board_controller/file_streamer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/board_controller/multicast_streamer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/board_controller/plotjuggler_udp_streamer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/board_controller/gtec/unicorn_board.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/board_controller/neuromd/neuromd_board.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/board_controller/neuromd/brainbit.cpp
Expand Down
35 changes: 35 additions & 0 deletions src/board_controller/inc/plotjuggler_udp_streamer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#pragma once

#include <thread>

#include "data_buffer.h"
#include "socket_client_udp.h"
#include "streamer.h"

#include "json.hpp"

using json = nlohmann::json;


class PlotJugglerUDPStreamer : public Streamer
{

public:
PlotJugglerUDPStreamer (const char *ip, int port, json preset_descr);
~PlotJugglerUDPStreamer ();

int init_streamer ();
void stream_data (double *data);

private:
char ip[128];
int port;
SocketClientUDP *socket;
DataBuffer *db;
volatile bool is_streaming;
std::thread streaming_thread;
json preset_descr;

void thread_worker ();
std::string remove_substr (std::string str, std::string substr);
};
4 changes: 2 additions & 2 deletions src/board_controller/inc/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
class Streamer
{
public:
Streamer (int len, std::string type, std::string dest, std::string mods)
Streamer (int data_len, std::string type, std::string dest, std::string mods)
{
this->len = len;
len = data_len;
streamer_type = type;
streamer_dest = dest;
streamer_mods = mods;
Expand Down
164 changes: 164 additions & 0 deletions src/board_controller/plotjuggler_udp_streamer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#include <cstdlib>
#include <string.h>
#include <string>

#include "board.h"
#include "brainflow_constants.h"
#include "brainflow_env_vars.h"
#include "plotjuggler_udp_streamer.h"


PlotJugglerUDPStreamer::PlotJugglerUDPStreamer (const char *ip, int port, json preset_descr)
: Streamer ((int)preset_descr["num_rows"], "plotjuggler_udp", ip, std::to_string (port))
{
strcpy (this->ip, ip);
this->port = port;
this->preset_descr = preset_descr;
socket = NULL;
is_streaming = false;
db = NULL;
}

PlotJugglerUDPStreamer::~PlotJugglerUDPStreamer ()
{
if ((streaming_thread.joinable ()) && (is_streaming))
{
is_streaming = false;
streaming_thread.join ();
}
if (socket != NULL)
{
delete socket;
socket = NULL;
}
if (db != NULL)
{
delete db;
db = NULL;
}
}

int PlotJugglerUDPStreamer::init_streamer ()
{
if ((is_streaming) || (socket != NULL) || (db != NULL))
{
Board::board_logger->error ("plotjuggler streamer is running");
return (int)BrainFlowExitCodes::GENERAL_ERROR;
}

socket = new SocketClientUDP (ip, port);
int res = socket->connect ();
if (res != (int)SocketClientUDPReturnCodes::STATUS_OK)
{
delete socket;
socket = NULL;
Board::board_logger->error ("failed to init udp socket {}", res);
return (int)BrainFlowExitCodes::GENERAL_ERROR;
}

db = new DataBuffer (len, 1000);
if (!db->is_ready ())
{
Board::board_logger->error ("unable to prepare buffer for streaming");
delete db;
db = NULL;
delete socket;
socket = NULL;
return (int)BrainFlowExitCodes::INVALID_BUFFER_SIZE_ERROR;
}

is_streaming = true;
streaming_thread = std::thread ([this] { this->thread_worker (); });
return (int)BrainFlowExitCodes::STATUS_OK;
}

void PlotJugglerUDPStreamer::stream_data (double *data)
{
db->add_data (data);
}

void PlotJugglerUDPStreamer::thread_worker ()
{
double *transaction = new double[len];
for (int i = 0; i < len; i++)
{
transaction[i] = 0.0;
}
std::string name = preset_descr["name"];
while (is_streaming)
{
if (db->get_data_count () >= 1)
{
db->get_data (1, transaction);
json j;
j[name] = json::object ();
for (auto &el : preset_descr.items ())
{
std::string key = el.key ();
if (key.find ("_channels") != std::string::npos)
{
std::string prefix = remove_substr (key, "_channels");
j[name][prefix] = json::object ();
std::vector<int> values = el.value ();
for (int i = 0; i < (int)values.size (); i++)
{
std::string channel_name = "channel " + std::to_string (i);
if ((key == "accel_channels") && (i == 0))
channel_name = "accel X";
if ((key == "accel_channels") && (i == 1))
channel_name = "accel Y";
if ((key == "accel_channels") && (i == 2))
channel_name = "accel Z";
if (key == "eeg_channels")
{
try
{
std::vector<std::string> eeg_names = preset_descr["eeg_names"];
channel_name = eeg_names[i];
}
catch (...)
{
// pass
}
}
if ((values[i] >= 0) && (values[i] <= len))
{
j[name][prefix][channel_name] = transaction[values[i]];
}
}
}
else if (key.find ("_channel") != std::string::npos)
{
int pos = el.value ();
std::string prefix = remove_substr (key, "_channel");
if ((pos >= 0) && (pos < len))
{
j[name][prefix] = transaction[pos];
}
}
}
std::string s = j.dump ();
socket->send (s.c_str (), (int)s.size ());
}
else
{
#ifdef _WIN32
Sleep (1);
#else
usleep (10);
#endif
}
}
delete[] transaction;
}

std::string PlotJugglerUDPStreamer::remove_substr (std::string str, std::string substr)
{
std::string res = str;
size_t pos = str.find (substr);
if (pos != std::string::npos)
{
res.erase (pos, substr.length ());
}
return res;
}

0 comments on commit 236ff5b

Please sign in to comment.