Skip to content

Commit

Permalink
Merge pull request #119 from bitshares/shared-ws-client-ptr
Browse files Browse the repository at this point in the history
 Use shared_ptr to WS connection in API connection
  • Loading branch information
abitmore committed May 10, 2019
2 parents 2cf1510 + 3d39a51 commit 67e5a06
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 81 deletions.
7 changes: 5 additions & 2 deletions include/fc/interprocess/signals.hpp
@@ -1,8 +1,11 @@
#pragma once
#include <functional>
#include <boost/asio/signal_set.hpp>

namespace fc
{
/// handler will be called from ASIO thread
void set_signal_handler( std::function<void(int)> handler, int signal_num );
/// Set a handler to process an IPC (inter process communication) signal.
/// Handler will be called from ASIO thread.
/// @return shared pointer to the signal_set that holds the handler
std::shared_ptr<boost::asio::signal_set> set_signal_handler( std::function<void(int)> handler, int signal_num );
}
2 changes: 2 additions & 0 deletions include/fc/rpc/cli.hpp
Expand Up @@ -25,6 +25,7 @@ namespace fc { namespace rpc {

void start();
void stop();
void cancel();
void wait();
void format_result( const string& method, std::function<string(variant,const variants&)> formatter);

Expand All @@ -40,5 +41,6 @@ namespace fc { namespace rpc {
std::string _prompt = ">>>";
std::map<string,std::function<string(variant,const variants&)> > _result_formatters;
fc::future<void> _run_complete;
fc::thread* _getline_thread = nullptr; ///< Wait for user input in this thread
};
} }
7 changes: 4 additions & 3 deletions include/fc/rpc/websocket_api.hpp
Expand Up @@ -10,7 +10,8 @@ namespace fc { namespace rpc {
class websocket_api_connection : public api_connection
{
public:
websocket_api_connection( fc::http::websocket_connection& c, uint32_t max_conversion_depth );
websocket_api_connection( const std::shared_ptr<fc::http::websocket_connection> &c,
uint32_t max_conversion_depth );
~websocket_api_connection();

virtual variant send_call(
Expand All @@ -29,8 +30,8 @@ namespace fc { namespace rpc {
const std::string& message,
bool send_message = true );

fc::http::websocket_connection& _connection;
fc::rpc::state _rpc_state;
std::shared_ptr<fc::http::websocket_connection> _connection;
fc::rpc::state _rpc_state;
};

} } // namespace fc::rpc
5 changes: 5 additions & 0 deletions include/fc/thread/thread.hpp
Expand Up @@ -129,6 +129,11 @@ namespace fc {
* @todo make quit non-blocking of the calling thread by eliminating the call to <code>boost::thread::join</code>
*/
void quit();

/**
* Send signal to underlying native thread. Only for Linux and macOS
*/
void signal(int);

/**
* @return true unless quit() has been called.
Expand Down
23 changes: 12 additions & 11 deletions src/interprocess/signals.cpp
Expand Up @@ -3,15 +3,16 @@

namespace fc
{
void set_signal_handler( std::function<void(int)> handler, int signal_num )
{
std::shared_ptr<boost::asio::signal_set> sig_set(new boost::asio::signal_set(fc::asio::default_io_service(), signal_num));
sig_set->async_wait(
[sig_set,handler]( const boost::system::error_code& err, int num )
{
handler( num );
sig_set->cancel();
// set_signal_handler( handler, signal_num );
} );
}
std::shared_ptr<boost::asio::signal_set> set_signal_handler( std::function<void(int)> handler, int signal_num )
{
std::shared_ptr<boost::asio::signal_set> sig_set( new boost::asio::signal_set( fc::asio::default_io_service(),
signal_num) );
sig_set->async_wait( [sig_set,handler]( const boost::system::error_code& err, int num )
{
if( err != boost::asio::error::operation_aborted )
handler( num );
sig_set->cancel();
} );
return sig_set;
}
}
157 changes: 115 additions & 42 deletions src/rpc/cli.cpp
Expand Up @@ -9,6 +9,7 @@

#ifdef HAVE_EDITLINE
# include "editline.h"
# include <signal.h>
# ifdef WIN32
# include <io.h>
# endif
Expand Down Expand Up @@ -53,16 +54,22 @@ void cli::send_notice( uint64_t callback_id, variants args /* = variants() */ )
FC_ASSERT(false);
}

void cli::start()
void cli::stop()
{
cli_commands() = get_method_names(0);
_run_complete = fc::async( [&](){ run(); } );
cancel();
_run_complete.wait();
}

void cli::stop()
void cli::cancel()
{
_run_complete.cancel();
_run_complete.wait();
#ifdef HAVE_EDITLINE
if( _getline_thread )
{
_getline_thread->signal(SIGINT);
_getline_thread = nullptr;
}
#endif
}

void cli::wait()
Expand Down Expand Up @@ -98,6 +105,12 @@ void cli::run()
}
catch ( const fc::eof_exception& e )
{
_getline_thread = nullptr;
break;
}
catch ( const fc::canceled_exception& e )
{
_getline_thread = nullptr;
break;
}

Expand All @@ -119,12 +132,12 @@ void cli::run()
}
catch ( const fc::exception& e )
{
std::cout << e.to_detail_string() << "\n";

if (e.code() == fc::canceled_exception_code)
{
_getline_thread = nullptr;
break;
}
std::cout << e.to_detail_string() << "\n";
}
}
}
Expand All @@ -137,36 +150,52 @@ void cli::run()
*/
static char *my_rl_complete(char *token, int *match)
{
bool have_one = false;
std::string method_name;

auto& cmd = cli_commands();
const auto& cmds = cli_commands();
const size_t partlen = strlen (token); /* Part of token */

for (const std::string& it : cmd)
std::vector<std::reference_wrapper<const std::string>> matched_cmds;
for( const std::string& it : cmds )
{
if (it.compare(0, partlen, token) == 0)
if( it.compare(0, partlen, token) == 0 )
{
if (have_one) {
// we can only have 1, but we found a second
return NULL;
}
else
{
method_name = it;
have_one = true;
}
matched_cmds.push_back( it );
}
}

if (have_one)
if( matched_cmds.size() == 0 )
return NULL;

const std::string& first_matched_cmd = matched_cmds[0];
if( matched_cmds.size() == 1 )
{
*match = 1;
method_name += " ";
return strdup (method_name.c_str() + partlen);
std::string matched_cmd = first_matched_cmd + " ";
return strdup( matched_cmd.c_str() + partlen );
}

return NULL;
size_t first_cmd_len = first_matched_cmd.size();
size_t matched_len = partlen;
for( ; matched_len < first_cmd_len; ++matched_len )
{
char next_char = first_matched_cmd[matched_len];
bool end = false;
for( const std::string& s : matched_cmds )
{
if( s.size() <= matched_len || s[matched_len] != next_char )
{
end = true;
break;
}
}
if( end )
break;
}

if( matched_len == partlen )
return NULL;

std::string matched_cmd_part = first_matched_cmd.substr( partlen, matched_len - partlen );
return strdup( matched_cmd_part.c_str() );
}

/***
Expand Down Expand Up @@ -216,6 +245,53 @@ static int cli_check_secret(const char *source)
return 0;
}

/***
* Indicates whether CLI is quitting after got a SIGINT signal.
* In order to be used by editline which is C-style, this is a global variable.
*/
static int cli_quitting = false;

/**
* Get next character from stdin, or EOF if got a SIGINT signal
*/
static int interruptible_getc(void)
{
if( cli_quitting )
return EOF;

int r;
char c;

r = read(0, &c, 1); // read from stdin, will return -1 on SIGINT

if( r == -1 && errno == EINTR )
cli_quitting = true;

return r == 1 ? c : EOF;
}

void cli::start()
{

#ifdef HAVE_EDITLINE
el_hist_size = 256;

rl_set_complete_func(my_rl_complete);
rl_set_list_possib_func(cli_completion);
//rl_set_check_secret_func(cli_check_secret);
rl_set_getc_func(interruptible_getc);

static fc::thread getline_thread("getline");
_getline_thread = &getline_thread;

cli_quitting = false;

cli_commands() = get_method_names(0);
#endif

_run_complete = fc::async( [this](){ run(); } );
}

/***
* @brief Read input from the user
* @param prompt the prompt to display
Expand All @@ -237,29 +313,26 @@ void cli::getline( const std::string& prompt, std::string& line)
if( _isatty( _fileno( stdin ) ) )
#endif
{
rl_set_complete_func(my_rl_complete);
rl_set_list_possib_func(cli_completion);
rl_set_check_secret_func(cli_check_secret);

static fc::thread getline_thread("getline");
getline_thread.async( [&](){
char* line_read = nullptr;
std::cout.flush(); //readline doesn't use cin, so we must manually flush _out
line_read = readline(prompt.c_str());
if( line_read == nullptr )
FC_THROW_EXCEPTION( fc::eof_exception, "" );
line = line_read;
// we don't need here to add line in editline's history, cause it will be doubled
free(line_read);
}).wait();
if( _getline_thread )
{
_getline_thread->async( [&prompt,&line](){
char* line_read = nullptr;
std::cout.flush(); //readline doesn't use cin, so we must manually flush _out
line_read = readline(prompt.c_str());
if( line_read == nullptr )
FC_THROW_EXCEPTION( fc::eof_exception, "" );
line = line_read;
// we don't need here to add line in editline's history, cause it will be doubled
free(line_read);
}).wait();
}
}
else
#endif
{
std::cout << prompt;
// sync_call( cin_thread, [&](){ std::getline( *input_stream, line ); }, "getline");
fc::getline( fc::cin, line );
return;
}
}

Expand Down

0 comments on commit 67e5a06

Please sign in to comment.