-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathNodeDRListener.h
104 lines (76 loc) · 2.91 KB
/
NodeDRListener.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#ifndef OPENDDS_NODEDRLISTENER_H
#define OPENDDS_NODEDRLISTENER_H
#include <nan.h>
#include "NodeValueWriter.h"
#include <dds/DdsDcpsSubscriptionC.h>
#include <dds/DCPS/LocalObject.h>
#include <dds/DCPS/DataReaderImpl.h>
#include <mutex>
namespace NodeOpenDDS {
class NodeDRListener
: public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener>
, private OpenDDS::DCPS::AbstractSamples {
public:
NodeDRListener(DDS::DomainParticipant* dp,
const v8::Local<v8::Function>& callback);
~NodeDRListener();
void set_javascript_datareader(const v8::Local<v8::Object>& js_dr);
/**
* If receiving samples, ignore any more samples and unsubscribe
* afterwards, else unsubscribe now.
*/
void unsubscribe();
/// Unsubscribe immediately
void unsubscribe_now();
private:
static void async_cb(uv_async_t* async_uv);
static void close_cb(uv_handle_t* handle_uv);
void unsubscribe_now_i(std::unique_lock<std::mutex>& lock);
typedef DDS::RequestedDeadlineMissedStatus RDMStatus;
void on_requested_deadline_missed(DDS::DataReader*, const RDMStatus&) {}
typedef DDS::RequestedIncompatibleQosStatus RIQStatus;
void on_requested_incompatible_qos(DDS::DataReader*, const RIQStatus&) {}
void on_sample_rejected(DDS::DataReader*,
const DDS::SampleRejectedStatus&) {}
void on_liveliness_changed(DDS::DataReader*,
const DDS::LivelinessChangedStatus&) {}
void on_subscription_matched(DDS::DataReader*,
const DDS::SubscriptionMatchedStatus&) {}
void on_sample_lost(DDS::DataReader*, const DDS::SampleLostStatus&) {}
void on_data_available(DDS::DataReader*);
void async(); // called from libuv event loop
DDS::DomainParticipant* dp_;
Nan::Persistent<v8::Function> callback_;
Nan::Persistent<v8::Object> js_dr_;
const OpenDDS::DCPS::ValueDispatcher* vd_;
NodeValueWriter nvw_;
struct AsyncUv : uv_async_t {
explicit AsyncUv(NodeDRListener* outer) : outer_(outer) {}
NodeDRListener* outer_;
} async_uv_;
NodeDRListener(const NodeDRListener&);
NodeDRListener& operator=(const NodeDRListener&);
void reserve(CORBA::ULong);
void push_back(const DDS::SampleInfo& src, const void* sample);
/// True if Datareader Listener is going to unsubscribe itself soon.
bool unsubscribing_;
bool unsubscribed_;
/// True if Datareader Listener taking samples.
bool receiving_samples_;
mutable std::mutex mutex_;
};
/// Wrapper object to call unsubscribe at a better time using Node Event
/// Loop.
class UnsubscribeWorker : public Nan::AsyncWorker {
public:
UnsubscribeWorker(NodeDRListener* ndrl);
~UnsubscribeWorker();
void Execute();
void Destroy();
protected:
NodeDRListener* ndrl_;
void HandleOKCallback();
void HandleErrorCallback();
};
}
#endif