Skip to content
Permalink
Browse files

Merge branch 'mm2-http-fallback' into mm2

  • Loading branch information...
ArtemGr committed Apr 24, 2019
2 parents 499668b + cc9e7d4 commit 037473cc7bc348968b014e08358a55350ff8ea19

Some generated files are not rendered by default. Learn more.

@@ -205,6 +205,7 @@ pub trait IguanaInfo {
}
}

#[allow(dead_code)]
#[derive(Deserialize)]
struct WithdrawRequest {
coin: String,
@@ -72,6 +72,7 @@ use hyper::{Body, Client, Request, Response, StatusCode, HeaderMap};
use hyper::client::HttpConnector;
use hyper::header::{ HeaderValue, CONTENT_TYPE };
use hyper::rt::Stream;
use hyper::server::conn::Http;
use hyper_rustls::HttpsConnector;
use libc::{c_char, c_void, malloc, free};
use serde_json::{self as json, Value as Json};
@@ -423,6 +424,8 @@ fn start_core_thread() -> Remote {
lazy_static! {
/// Shared asynchronous reactor.
pub static ref CORE: Remote = start_core_thread();
/// Shared HTTP server.
pub static ref HTTP: Http = Http::new();
}

/// With a shared reactor drives the future `f` to completion.
@@ -17,39 +17,39 @@
// marketmaker
//

use common::{coins_iter, lp, lp_queue_command_for_c, slurp_url, os, CJSON, MM_VERSION};
use common::log::TagParam;
use common::mm_ctx::{MmCtx, MmArc};
use futures::{Future};
use futures::stream::Stream;
use futures::sync::oneshot::Sender;
use gstuff::now_ms;
use hex;
use hyper::{Request, Body, StatusCode};
use hyper::service::Service;
use hyper::StatusCode;
use libc::{self, c_char, c_void};
use peers;
use portfolio::prices_loop;
use rand::random;
use rand::{random, thread_rng, Rng};
use serde_json::{self as json, Value as Json};
use std::borrow::Cow;
use std::fs;
use std::ffi::{CStr, CString};
use std::io::{Cursor, Read, Write};
use std::mem::{transmute, zeroed};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener};
use std::net::{IpAddr, Ipv4Addr, TcpListener};
use std::path::Path;
use std::ptr::null_mut;
use std::str;
use std::str::from_utf8;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{self, sleep};
use std::time::Duration;
use tokio_core::{net as tnet};

use crate::common::{rpc_response, HyRes, CORE};
use peers::http_fallback::new_http_fallback;
use portfolio::prices_loop;

use crate::common::{
coins_iter, lp, lp_queue_command_for_c, os, slurp_url,
CORE, CJSON, MM_VERSION};
use crate::common::log::TagParam;
use crate::common::mm_ctx::{MmCtx, MmArc};
use crate::mm2::lp_network::{lp_command_q_loop, seednode_loop, client_p2p_loop};
use crate::mm2::lp_ordermatch::{lp_trade_command, lp_trades_loop};
use crate::mm2::rpc::{self, HTTP, SINGLE_THREADED_C_LOCK};
use crate::mm2::rpc::{self, SINGLE_THREADED_C_LOCK};

/*
#include <stdio.h>
@@ -1568,45 +1568,59 @@ pub unsafe fn lp_passphrase_init (ctx: &MmArc, passphrase: Option<&str>, gui: Op
Ok(())
}

/// Temporarily binds on the given IP and port to check if they're available.
/// Returns `false` if the address did not work
/// (like when the `ip` does not belong to a connected interface or is already taken).
fn test_bind (ip: IpAddr, port: u16) -> bool {
let bindaddr = SocketAddr::new (ip, port);
let listener = match tnet::TcpListener::bind2 (&bindaddr) {Ok (tl) => tl, Err (_) => return false};

// The bind just always works on certain operating systems.
/// Temporarily binds on the given IP to check if it's available.
///
/// Returns an error if the address did not work
/// (like when the `ip` does not belong to a connected interface or is already taken).
///
/// If the IP has passed the communication check then a shutdown Sender is returned.
/// Dropping or using that Sender will stop the HTTP fallback server.
///
/// Also the port of the HTTP fallback server is returned.
fn test_bind (ip: IpAddr, seed: bool) -> Result<(Sender<()>, u16), String> {
// NB: The `bind` just always works on certain operating systems.
// To actually check the address we should try communicating on it.
// Reusing a likeness of `rpc::spawn_rpc` HTTP server for that.

struct RpcService;
impl Service for RpcService {
type ReqBody = Body; type ResBody = Body; type Error = String; type Future = HyRes;
fn call (&mut self, _request: Request<Body>) -> HyRes {
rpc_response (200, "k")
// Reusing the HTTP fallback server for that.

let (server, port) = if seed {
// NB: We might need special permissions to bind on 80.
// HTTP fallback should work on that port though
// in order to function with those providers which only allow the usual traffic.
let port = 80;
(try_s! (new_http_fallback (ip, port)), port)
} else {
// Try a few random ports.
let mut attempts_left = 9;
let mut rng = thread_rng();
loop {
if attempts_left < 1 {return ERR! ("Out of attempts")}
attempts_left -= 1;
// TODO: Avoid `mypubport`.
let port = rng.gen_range (1111, 65535);
log! ("test_bind] Trying to listen on " (ip) ':' (port));
match new_http_fallback (ip, port) {
Ok (s) => break (s, port),
Err (err) => {
if attempts_left == 0 {return ERR! ("{}", err)}
continue
}
}
}
}
let server = listener.incoming().for_each (move |(socket, _my_sock)| {
CORE.spawn (move |_| HTTP
.serve_connection (socket, RpcService)
.map(|_| ())
.map_err (|err| log! ({"test_bind] HTTP error: {}", err})));
Ok(())
}) .map_err (|err| log! ({"test_bind] accept error: {}", err}));
};

// Finish the server `Future` when `shutdown_rx` fires.
let (shutdown_tx, shutdown_rx) = futures::sync::oneshot::channel::<()>();
let server = server.select2 (shutdown_rx) .then (|_| Ok(()));
CORE.spawn (move |_| server);

let url = fomat! ("http://" if ip.is_unspecified() {"127.0.0.1"} else {(ip)} ":" (port));
log! ("test_bind] Checking " (url));
let rc = slurp_url (&url) .wait();
let _ = shutdown_tx.send(());
if let Ok ((StatusCode::OK, _h, body)) = rc {
body == b"k"
} else {
false
}
let (status, _h, body) = try_s! (rc);
if status != StatusCode::OK {return ERR! ("Status not OK")}
if body != b"k" {return ERR! ("body not k")}
Ok ((shutdown_tx, port))
}

pub fn lp_init (mypullport: u16, mypubport: u16, conf: Json, c_conf: CJSON) -> Result<(), String> {
@@ -1669,6 +1683,11 @@ pub fn lp_init (mypullport: u16, mypubport: u16, conf: Json, c_conf: CJSON) -> R
Ok (match ip.parse() {Ok (ip) => ip, Err (err) => return ERR! ("Error parsing IP address '{}': {}", ip, err)})
}

let i_am_seed = ctx.conf["i_am_seed"].as_bool().unwrap_or(false);

let mut http_fallback_shutdown = None;
let mut http_fallback_port = None;

let myipaddr: IpAddr = if Path::new ("myipaddr") .exists() {
match fs::File::open ("myipaddr") {
Ok (mut f) => {
@@ -1732,26 +1751,34 @@ pub fn lp_init (mypullport: u16, mypubport: u16, conf: Json, c_conf: CJSON) -> R
// If we're not behind a NAT then the bind will likely suceed.
// If the bind fails then emit a user-visible warning and fall back to 0.0.0.0.
let tags: &[&TagParam] = &[&"myipaddr"];
if test_bind (ip, mypubport) {
ctx.log.log ("🙂", tags, &fomat! (
"We've detected an external IP " (ip) " and we can bind on it (port " (mypubport) "), so probably a dedicated IP."));
break ip
match test_bind (ip, i_am_seed) {
Ok ((hf_shutdown, hf_port)) => {
ctx.log.log ("🙂", tags, &fomat! (
"We've detected an external IP " (ip) " and we can bind on it (port " (mypubport) ")"
", so probably a dedicated IP."));
if i_am_seed {
http_fallback_shutdown = Some (hf_shutdown);
http_fallback_port = Some (hf_port);
}
break ip
},
Err (err) => log! ("IP " (ip) " doesn't check: " (err))
}
let all_interfaces = Ipv4Addr::new (0, 0, 0, 0) .into();
if test_bind (all_interfaces, mypubport) {
if test_bind (all_interfaces, false) .is_ok() {
ctx.log.log ("😅", tags, &fomat! (
"We couldn't bind on the external IP " (ip) " (port " (mypubport) "), so NAT is likely to be present. We'll be okay though."));
"We couldn't bind on the external IP " (ip) ", so NAT is likely to be present. We'll be okay though."));
break all_interfaces
}
let locahost = Ipv4Addr::new (127, 0, 0, 1) .into();
if test_bind (locahost, mypubport) {
if test_bind (locahost, false) .is_ok() {
ctx.log.log ("🤫", tags, &fomat! (
"We couldn't bind on " (ip) " or 0.0.0.0 (port " (mypubport) "), is another MM2 instance running?"
"We couldn't bind on " (ip) " or 0.0.0.0, is another MM2 instance running?"
" Looks like we can bind on 127.0.0.1 as a workaround, but this is a temporary workaround so please don't tell anyone."));
break locahost
}
ctx.log.log ("🤒", tags, &fomat! (
"Couldn't bind on " (ip) ", 0.0.0.0 or 127.0.0.1 (port " (mypubport) ")."));
"Couldn't bind on " (ip) ", 0.0.0.0 or 127.0.0.1."));
break all_interfaces // Seems like a better default than 127.0.0.1, might still work for other ports.
}
};
@@ -1761,8 +1788,8 @@ pub fn lp_init (mypullport: u16, mypubport: u16, conf: Json, c_conf: CJSON) -> R
unsafe {lp::G.netid = ctx.conf["netid"].as_u64().unwrap_or (0) as u16}
unsafe {lp::LP_mypubsock = -1}

let i_am_seed = ctx.conf["i_am_seed"].as_bool().unwrap_or(false);
let seednode_thread = if i_am_seed {
log! ("i_am_seed at " (myipaddr) ":" (mypubport));
let listener: TcpListener = try_s!(TcpListener::bind(&fomat!((myipaddr) ":" (mypubport))));
try_s!(listener.set_nonblocking(true));
Some(try_s!(thread::Builder::new().name ("seednode_loop".into()) .spawn ({
@@ -163,7 +163,7 @@ fn help() {
" rpc_local_only .. MM forbids some RPC requests from not loopback (localhost) IPs as additional security measure.\n"
" Defaults to `true`, set `false` to disable. `Use with caution`.\n"
" rpcport .. If > 1000 overrides the 7783 default.\n"
" i_am_seed .. Notify MM that it should run in seednode mode (acting as message relayer/broadcaster for others).\n"
" i_am_seed .. Activate the seed node mode (acting as a relay for mm2 clients).\n"
" Defaults to `false`.\n"
" seednodes .. Seednode IPs that node will use. At least 1 seed IP be set if the node is not seed itself.\n"
" userhome .. System home directory of a user ('/root' by default).\n"
@@ -21,6 +21,7 @@ fomat-macros = "0.2"
futures = "0.1"
gstuff = "0.5"
hashbrown = {version = "0.1.8", features = ["serde", "nightly"]}
hyper = "0.12"
itertools = "0.7"
lazy_static = "1.2"
libc = "0.2"
@@ -30,5 +31,6 @@ serde_derive = "1.0"
serde_json = "1.0"
serde_bencode = "0.2"
serde_bytes = "0.10"
tokio-core = "0.1"
unwrap = "1.2"
#zstd-safe = "1.4"
@@ -0,0 +1,31 @@
use futures::{self, Future};
use hyper::{Request, Body};
use hyper::rt::{Stream};
use hyper::service::Service;
use std::net::{IpAddr, SocketAddr};
use tokio_core::net::TcpListener;

use crate::common::{rpc_response, HyRes, CORE, HTTP};

/// Creates a Hyper Future that would run the HTTP fallback server.
pub fn new_http_fallback (ip: IpAddr, port: u16) -> Result<Box<Future<Item=(), Error=()>+Send>, String> {
let bindaddr = SocketAddr::new (ip, port);
let listener = try_s! (TcpListener::bind2 (&bindaddr));

struct RpcService;
impl Service for RpcService {
type ReqBody = Body; type ResBody = Body; type Error = String; type Future = HyRes;
fn call (&mut self, _request: Request<Body>) -> HyRes {
rpc_response (200, "k")
}
}
let server = listener.incoming().for_each (move |(socket, _my_sock)| {
CORE.spawn (move |_| HTTP
.serve_connection (socket, RpcService)
.map(|_| ())
.map_err (|err| log! ({"test_bind] HTTP error: {}", err})));
Ok(())
}) .map_err (|err| log! ({"test_bind] accept error: {}", err}));

Ok (Box::new (server))
}
@@ -13,11 +13,14 @@
// 01 13:30:16, peers:617] peers_send_compat] Compression from 32084 to 32094
// but we're going to refactor these payloads in the future,
// and there might be different other payloads as we go through the port.
// TODO: See if compression works for CRDT type(s).
//extern crate zstd_safe; // https://github.com/facebook/zstd/blob/dev/lib/zstd.h

#[doc(hidden)]
pub mod peers_tests;

pub mod http_fallback;

use byteorder::{BigEndian, WriteBytesExt, ReadBytesExt};
use common::{bits256, is_a_test_drill, slice_to_malloc, RaiiRm};
use common::log::TagParam;
@@ -138,3 +138,70 @@ pub fn test_peers_direct_send() {
destruction_check (alice);
destruction_check (bob);
}

pub fn test_http_fallback() {
/*
let f = start_http_fallback_server();
let alice = peer ({seed: server_ip, lt dht disabled, lt direct disabled});
let bob = peer ({seed: server_ip, lt dht disabled, lt direct disabled});
//With libtorrent DHT disabled the peers will hit the timeout that activates the HTTP fallback.
println! ("Sending {} bytes …", message.len());
let _sending_f = super::send (&alice, unwrap! (super::key (&bob)), b"test_dht", message.clone());
let receiving_f = super::recv (&bob, b"test_dht", Box::new ({
let message = message.clone();
move |payload| payload == &message[..]
}));
let received = unwrap! (receiving_f.wait());
assert_eq! (received, message);
destruction_check (alice);
destruction_check (bob);
drop (f); // Stops the HTTP server by dropping the Future.
*/
}

// Check the primitives used to communicate with the HTTP fallback server.
// These are useful in implementing NAT traversal in situations
// where a truly distributed no-single-point-of failure operation is not necessary,
// like when we're using the fallback server to drive a tested mm2 instance.
pub fn test_http_fallback_kv() {
/*
let f = start_http_fallback_server();
crdt = add b"value1"
let s = super::http_store (addr, "key", crdt);
test s.wait() == [b"value1"]
crdt = add b"value2"
let s = super::http_store (addr, "key", crdt);
test s.wait() == [b"value1", b"value2"]
crdt = remove b"value1"
let s = super::http_store (addr, "key", crdt);
test s.wait() == [b"value2"]
let g = super::http_fetch (addr, "key");
test g.wait() == [b"value2"]
// Server-side conversion of CRDT into a normal value
// in order to provide a simple cURL-compatible interface.
// (Should maybe test it with cURL?)
let g = super::http_get (addr, "key");
test g.wait() == b"value"
// Server-side conversion of a set operation into a CRDT modification
// in order to provide a simple cURL-compatible interface.
// (Should maybe test it with cURL?)
let s = super::http_set (addr, "key", b"value");
test s.wait()
let g = super::http_fetch (addr, "key");
test g.wait() == [b"value2", b"value"]
drop (f); // Stops the HTTP server by dropping the Future.
*/
}

0 comments on commit 037473c

Please sign in to comment.
You can’t perform that action at this time.