Skip to content

Commit

Permalink
feat: [WIP] HTTP-based transport and connector
Browse files Browse the repository at this point in the history
this sends ILP packets as the body of an HTTP POST request (instead of using BTP)
the connector is built around an HTTP server instead of the plugin architecture
  • Loading branch information
emschwartz committed Jan 18, 2019
1 parent c58c830 commit af795bc
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ failure_derive = "0.1.3"
futures = "0.1.25"
hashbrown = "0.1.7"
hex = "0.3.2"
http = "0.1.14"
lazy_static = "1.2.0"
log = "0.4.6"
num-bigint = "0.2.1"
Expand Down
84 changes: 84 additions & 0 deletions examples/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
extern crate env_logger;
extern crate futures;
extern crate hashbrown;
extern crate interledger;
extern crate reqwest;
extern crate tokio;
extern crate tower_web;

use futures::Future;
use hashbrown::HashMap;
use interledger::ilp_http::{spsp_pay, Account, HttpConnector, HttpSpspServer, HttpStreamServer};
use interledger::util::random_secret;
use reqwest::Url;
use std::thread::{sleep, spawn};
use std::time::Duration;
use tokio::runtime::current_thread::Runtime;
use tower_web::ServiceBuilder;

pub fn main() {
env_logger::init();

let mut exchange_rates = HashMap::new();
exchange_rates.insert("USD".to_string(), 1.0);
exchange_rates.insert("EUR".to_string(), 1.15);

spawn(|| {
let connector = ServiceBuilder::new().resource(HttpConnector::new(
vec![
Account {
ilp_address: "example.alice".to_string(),
asset_code: "USD".to_string(),
asset_scale: 9,
ilp_endpoint: Url::parse("http://localhost:3003/ilp").unwrap(), // doesn't exist
incoming_auth_token: "AliceAuthToken".to_string(),
outgoing_auth_token: "ConnectorAuthToken".to_string(),
},
Account {
ilp_address: "example.bob".to_string(),
asset_code: "EUR".to_string(),
asset_scale: 9,
ilp_endpoint: Url::parse("http://localhost:3001/ilp").unwrap(),
incoming_auth_token: "BobAuthToken".to_string(),
outgoing_auth_token: "ConnectorAuthToken".to_string(),
},
],
exchange_rates,
));
let addr = "127.0.0.1:3000".parse().expect("Invalid address");
connector.run(&addr).unwrap()
});

spawn(|| {
let server_secret = random_secret();
let spsp_server = ServiceBuilder::new()
.resource(HttpSpspServer::new("example.bob", server_secret.clone()))
.resource(HttpStreamServer::new(
server_secret,
"example.bob".to_string(),
"EUR".to_string(),
9,
));
let addr = "127.0.0.1:3001".parse().expect("Invalid address");
spsp_server.run(&addr).unwrap();
});

sleep(Duration::from_millis(1000));

let send_payment = spsp_pay(
"http://localhost:3000/ilp",
"AliceAuthToken",
"http://localhost:3001/spsp/bob",
100,
)
.and_then(|amount_delivered| {
println!("Delivered {}", amount_delivered);
Ok(())
})
.map_err(|err| {
println!("Error sending payment {}", err);
});
// TODO make spsp_pay Send so that we can use the normal runtime
let mut runtime = Runtime::new().unwrap();
runtime.block_on(send_payment).unwrap();
}
162 changes: 162 additions & 0 deletions src/ilp_http/connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use bytes::{buf::FromBuf, Bytes};
use futures::{
future::{ok, Either},
Future, Stream,
};
use hashbrown::HashMap;
use http::Response as HttpResponse;
use ildcp::IldcpResponse;
use ilp::*;
use reqwest::{async::Client, Url};

pub struct Account {
pub ilp_endpoint: Url,
pub asset_scale: u8,
pub asset_code: String,
pub incoming_auth_token: String,
pub outgoing_auth_token: String,
pub ilp_address: String,
}

type AccountId = usize;

pub struct HttpConnector {
// TODO get these from Redis or a DB
accounts: Vec<Account>,
auth_tokens: HashMap<String, AccountId>,
exchange_rates: HashMap<String, f64>,
}

impl_web! {
impl HttpConnector {
pub fn new(accounts: Vec<Account>, exchange_rates: HashMap<String, f64>) -> Self {
let mut auth_tokens = HashMap::new();
for i in 0..accounts.len() {
auth_tokens.insert(accounts[i].incoming_auth_token.clone(), i);
}
HttpConnector {
accounts,
auth_tokens,
exchange_rates,
}
}

#[post("/ilp")]
#[content_type("application/octet-stream")]
pub fn post_ilp (&self, authorization: String, body: Vec<u8>) -> impl Future<Item = HttpResponse<Bytes>, Error = ()> {
// Authenticate request
let authorization = authorization.replace("Bearer ", "");
let incoming_account_id: AccountId = *self.auth_tokens.get(&authorization).expect("Unauthorized");
let incoming_account = &self.accounts[incoming_account_id];

// Parse ILP Packet
// TODO proper error handling
// TODO in-place packet modification
let prepare = IlpPrepare::from_bytes(&body).expect("Body is not an ILP Prepare packet");
debug!("Connector got packet from {} destined for account: {}", incoming_account.ilp_address, prepare.destination);

// Adjust incoming_account balance
// TODO

if prepare.destination.starts_with("peer") {
Either::A(self.handle_ildcp_request(incoming_account, prepare))
} else {
Either::B(self.forward_prepare(incoming_account, prepare))
}
}

fn handle_ildcp_request(&self, incoming_account: &Account, prepare: IlpPrepare) -> impl Future<Item = HttpResponse<Bytes>, Error = ()> {
let response_packet = if prepare.destination.starts_with("peer.config") {
IlpPacket::Fulfill(IldcpResponse::new(&incoming_account.ilp_address, incoming_account.asset_scale, &incoming_account.asset_code).to_fulfill().unwrap())
} else {
IlpPacket::Reject(IlpReject::new(
"F06",
"Unrecognized peer protocol",
"",
Bytes::new(),
))
};
ok(HttpResponse::builder()
.status(200)
.header("Content-Type", "application/octet-stream")
.body(Bytes::from(response_packet.to_bytes()))
.unwrap())
}

fn forward_prepare(&self, incoming_account: &Account, prepare: IlpPrepare) -> impl Future<Item = HttpResponse<Bytes>, Error = ()> {
// Determine next hop
let outgoing_account = self.determine_next_hop(&prepare.destination).expect("No next hop");

// Apply exchange rate and scale
// TODO make sure numbers don't overflow
let scale_adjustment = outgoing_account.asset_scale - incoming_account.asset_scale;
let scaled_amount = prepare.amount as f64 * 10.0f64.powf(scale_adjustment as f64);
let outgoing_rate = self.exchange_rates[&outgoing_account.asset_code];
let incoming_rate = self.exchange_rates[&incoming_account.asset_code];
let outgoing_amount = (scaled_amount * outgoing_rate / incoming_rate) as u64;
let mut prepare = prepare;
prepare.amount = outgoing_amount;

debug!("Connector forwarding prepare of {} {} to {}", outgoing_amount, outgoing_account.asset_code, outgoing_account.ilp_address);

// Adjust outgoing_account balance
// TODO

// Modify expiry
// TODO

// Send outgoing request
// TODO timeout if request takes too long
Box::new(Client::new()
.post(outgoing_account.ilp_endpoint.clone())
.body(prepare.to_bytes())
.header("Authorization", format!("Bearer {}", outgoing_account.outgoing_auth_token))
// TODO should we have an ILP-specific content-type?
.header("Content-Type", "application/octet-stream")
.send()
.map_err(|_err| panic!("Error sending outgoing request"))
// TODO handle HTTP error
.and_then(|response| response.into_body().concat2())
.map_err(|_err| panic!("Error getting response body"))
.and_then(|body| {
match IlpPacket::from_bytes(&body) {
Ok(IlpPacket::Fulfill(_fulfill)) => {
debug!("Connector packet was fulfilled");
// TODO validate fulfillment
// TODO validate expiry
},
Ok(IlpPacket::Reject(_reject)) => {
debug!("Connector packet was rejected");
// TODO undo balance changes
},
_ => {
warn!("Connector got unexpected response");
return Ok(HttpResponse::builder()
.status(502)
.body(Bytes::new())
.unwrap());
}
}

Ok(HttpResponse::builder()
.status(200)
.header("Content-Type", "application/octet-stream")
.body(Bytes::from_buf(body))
.unwrap())
}))
}

fn determine_next_hop(&self, destination_address: &str) -> Option<&Account> {
// TODO more efficient lookup
let mut matching_length = 0;
let mut next_hop= None;
self.accounts.iter().for_each(|account| {
if destination_address.starts_with(&account.ilp_address) && account.ilp_address.len() > matching_length {
matching_length = account.ilp_address.len();
next_hop = Some(account);
}
});
next_hop
}
}
}
8 changes: 8 additions & 0 deletions src/ilp_http/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mod connector;
mod spsp_client;
mod stream_server;

pub use self::connector::{Account, HttpConnector};
pub use self::spsp_client::spsp_pay;
pub use self::stream_server::HttpStreamServer;
pub use spsp::SpspResponder as HttpSpspServer;
100 changes: 100 additions & 0 deletions src/ilp_http/spsp_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use futures::{
stream::futures_unordered::FuturesUnordered, Async, AsyncSink, Future, Poll, Sink, StartSend,
Stream,
};
use ilp::*;
use parking_lot::Mutex;
use plugin::{IlpRequest, Plugin};
use reqwest::{async::Client, Url};
use spsp::{pay, Error as SpspError};
use std::sync::Arc;

pub fn spsp_pay(
ilp_endpoint: &str,
outgoing_auth_token: &str,
receiver: &str,
amount: u64,
) -> impl Future<Item = u64, Error = SpspError> {
let plugin = HttpPlugin::new(
Url::parse(ilp_endpoint).unwrap(),
String::from(outgoing_auth_token),
);
pay(plugin, receiver, amount)
}

type IlpRequestFuture = Box<Future<Item = IlpRequest, Error = ()>>;

struct HttpPlugin {
ilp_endpoint: Url,
outgoing_auth_token: String,
pending_requests: Arc<Mutex<FuturesUnordered<IlpRequestFuture>>>,
}

impl HttpPlugin {
pub fn new(ilp_endpoint: Url, outgoing_auth_token: String) -> Self {
HttpPlugin {
ilp_endpoint,
outgoing_auth_token,
pending_requests: Arc::new(Mutex::new(FuturesUnordered::new())),
}
}

fn send_request(&self, request: IlpRequest) -> impl Future<Item = IlpRequest, Error = ()> {
debug!("Http plugin sending outgoing request {:?}", request);
let (request_id, packet) = request;
Client::new()
.post(self.ilp_endpoint.clone())
.header(
"Authorization",
format!("Bearer {}", self.outgoing_auth_token),
)
.header("Content-Type", "application/octet-stream")
.body(packet.to_bytes())
.send()
.map_err(|_err| panic!("Error sending outgoing request"))
// TODO handle HTTP error
.and_then(|response| response.into_body().concat2())
.map_err(|_err| panic!("Error getting response body"))
.and_then(move |body| {
let response_packet = IlpPacket::from_bytes(&body).expect("Not an ILP packet");
Ok((request_id, response_packet))
})
}
}

impl Stream for HttpPlugin {
type Item = IlpRequest;
type Error = ();

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let result = self.pending_requests.lock().poll();
if let Ok(Async::Ready(None)) = result {
// This plugin isn't connection-oriented so the incoming stream only finishes if we get errors
Ok(Async::NotReady)
} else {
result
}
}
}

impl Sink for HttpPlugin {
type SinkItem = IlpRequest;
type SinkError = ();

fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
self.pending_requests
.lock()
.push(Box::new(self.send_request(item)));
Ok(AsyncSink::Ready)
}

fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
if self.pending_requests.lock().is_empty() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}

impl Plugin for HttpPlugin {}
Loading

0 comments on commit af795bc

Please sign in to comment.