@@ -87,7 +87,7 @@ bool RpcBase::call(const std::string& action,
87
87
// Wait for response
88
88
std::stringstream expected_id;
89
89
expected_id << m_transaction_id;
90
- RpcMessage* rpc_message = nullptr ;
90
+ std::shared_ptr< RpcMessage> rpc_message;
91
91
auto wait_time = std::chrono::steady_clock ().now () + timeout;
92
92
do
93
93
{
@@ -104,12 +104,15 @@ bool RpcBase::call(const std::string& action,
104
104
if (rpc_message->unique_id != expected_id.str ())
105
105
{
106
106
// Wrong message
107
- delete rpc_message;
108
- rpc_message = nullptr ;
107
+ rpc_message.reset ();
109
108
}
110
109
}
111
110
}
112
- } while (ret && (rpc_message == nullptr ));
111
+ else
112
+ {
113
+ ret = false ;
114
+ }
115
+ } while (ret && !rpc_message);
113
116
114
117
// Extract response
115
118
if (rpc_message)
@@ -126,7 +129,7 @@ bool RpcBase::call(const std::string& action,
126
129
{
127
130
message = rpc_message->message .GetString ();
128
131
}
129
- delete rpc_message;
132
+ rpc_message. reset () ;
130
133
}
131
134
else
132
135
{
@@ -164,12 +167,8 @@ void RpcBase::start()
164
167
// Initialize transaction id sequence
165
168
m_transaction_id = std::rand ();
166
169
167
- // Flush queues
168
- m_requests_queue.clear ();
169
- m_requests_queue.setEnable (true );
170
- m_results_queue.clear ();
171
-
172
170
// Start reception thread
171
+ m_requests_queue.setEnable (true );
173
172
m_rx_thread = new std::thread (std::bind (&RpcBase::rxThread, this ));
174
173
}
175
174
}
@@ -185,6 +184,10 @@ void RpcBase::stop()
185
184
m_rx_thread->join ();
186
185
delete m_rx_thread;
187
186
m_rx_thread = nullptr ;
187
+
188
+ // Flush queues
189
+ m_requests_queue.clear ();
190
+ m_results_queue.clear ();
188
191
}
189
192
}
190
193
@@ -309,7 +312,7 @@ bool RpcBase::decodeCall(const std::string& unique_id,
309
312
if (action.IsString () && payload.IsObject ())
310
313
{
311
314
// Add request to the queue
312
- RpcMessage* msg = new RpcMessage (unique_id, action.GetString (), rpc_frame, payload);
315
+ auto msg = std::make_shared< RpcMessage> (unique_id, action.GetString (), rpc_frame, payload);
313
316
m_requests_queue.push (msg);
314
317
315
318
ret = true ;
@@ -327,7 +330,7 @@ bool RpcBase::decodeCallResult(const std::string& unique_id, rapidjson::Document
327
330
if (payload.IsObject ())
328
331
{
329
332
// Add result to the queue
330
- RpcMessage* msg = new RpcMessage (unique_id, rpc_frame, payload);
333
+ auto msg = std::make_shared< RpcMessage> (unique_id, rpc_frame, payload);
331
334
m_results_queue.push (msg);
332
335
333
336
ret = true ;
@@ -349,7 +352,7 @@ bool RpcBase::decodeCallError(const std::string& unique_id,
349
352
if (error.IsString () && message.IsString () && payload.IsObject ())
350
353
{
351
354
// Add error to the queue
352
- RpcMessage* msg = new RpcMessage (unique_id, rpc_frame, payload, &error, &message);
355
+ auto msg = std::make_shared< RpcMessage> (unique_id, rpc_frame, payload, &error, &message);
353
356
m_results_queue.push (msg);
354
357
355
358
ret = true ;
@@ -380,7 +383,7 @@ void RpcBase::sendCallError(const std::string& unique_id, const char* error, con
380
383
void RpcBase::rxThread ()
381
384
{
382
385
// Thread loop
383
- RpcMessage* rpc_message = nullptr ;
386
+ std::shared_ptr< RpcMessage> rpc_message;
384
387
while (m_requests_queue.pop (rpc_message))
385
388
{
386
389
// Notify call
@@ -417,7 +420,7 @@ void RpcBase::rxThread()
417
420
}
418
421
419
422
// Free resources
420
- delete rpc_message;
423
+ rpc_message. reset () ;
421
424
}
422
425
}
423
426
0 commit comments