Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ edition = "2018"
reqwest = { version = "0.11.0", features = ["blocking"] }
serde_json = "1.0.59"
threadpool = "1.8.1"
clap = "2.33.3"
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ cd trin
TRIN_INFURA_PROJECT_ID="YoUr-Id-HeRe" cargo run
```

### Connect over IPC
In a python shell:
```py
>>> from web3 import Web3
Expand All @@ -41,6 +42,17 @@ In a python shell:
11870768
```

### Connect over HTTP
In a python shell:
```py
>>> from web3 import Web3
>>> w3 = Web3(Web3.HTTPProvider("http://127.0.0.1:8545"))
>>> w3.clientVersion
'trin 0.0.1-alpha'
>>> w3.eth.blockNumber
11870768
```

The client version responds immediately, from the trin client. The block number is retrieved more slowly, by proxying to Infura.

To interact with trin at the lowest possible level, try netcat:
Expand All @@ -52,6 +64,25 @@ nc -U /tmp/trin-jsonrpc.ipc
{"jsonrpc":"2.0","id":"85","result":"trin 0.0.1-alpha"}^C
```

## CLI Options
```sh
trin 0.0.1
carver
super lightweight eth portal

USAGE:
trin [OPTIONS]

FLAGS:
--help Prints help information
-V, --version Prints version information

OPTIONS:
-h, --http-port <http_port> port to accept http connections [default: 8545]
-s, --pool-size <pool_size> max size of threadpool [default: 2]
-p, --protocol <protocol> select transport protocol [default: http] [possible values: http, ipc]
```

## Gotchas

- There is a limit on concurrent connections given by the threadpool. At last
Expand Down
268 changes: 256 additions & 12 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,103 @@
//use web3::{Web3, transports};
use clap::{value_t, App, Arg};
use reqwest::blocking as reqwest;
use serde_json;
use std::env;
use std::ffi::OsString;
use std::fs;
use std::io::{self, Read, Write};
use std::net::TcpListener;
use std::net::TcpStream;
use std::os::unix;
use threadpool::ThreadPool;

// TODO: things to configure:
// - infura project id (not just env var?)
// - rpc endpoint port
// - rpc endpoint type (tcp, ws, ipc)
// - max concurrent requests (ie~ threadpool size)
#[derive(Debug, PartialEq)]
pub struct TrinConfig {
pub protocol: String,
pub http_port: u32,
pub pool_size: u32,
}

pub fn launch_trin(infura_project_id: String) {
println!("Launching with infura key: '{}'", infura_project_id);
impl TrinConfig {
pub fn new() -> Self {
Self::new_from(std::env::args_os().into_iter()).expect("Could not parse trin arguments")
}

let pool = ThreadPool::new(2);
fn new_from<I, T>(args: I) -> Result<Self, clap::Error>
where
I: Iterator<Item = T>,
T: Into<OsString> + Clone,
{
let matches = App::new("trin")
.version("0.0.1")
.author("carver")
.about("super lightweight eth portal")
.arg(
Arg::with_name("protocol")
.short("p")
.long("protocol")
.help("select transport protocol")
.possible_values(&["http", "ipc"])
.takes_value(true)
.default_value("http"),
)
.arg(
Arg::with_name("http_port")
.short("h")
.long("http-port")
.help("port to accept http connections")
.takes_value(true)
.default_value("8545"),
)
.arg(
Arg::with_name("pool_size")
.short("s")
.long("pool-size")
.help("max size of threadpool")
.takes_value(true)
.default_value("2"),
)
.get_matches_from(args);

println!("Launching trin...");
let pool_size = value_t!(matches.value_of("pool_size"), u32)
.expect("Invalid type for pool-size argument.");
let http_port = value_t!(matches.value_of("http_port"), u32)
.expect("Invalid type for http-port argument.");
let protocol = value_t!(matches.value_of("protocol"), String)
.expect("Invalid type for protocol argument.");

match protocol.as_str() {
"http" => {
println!("Protocol: {}\nHTTP port: {}", protocol, http_port)
}
"ipc" => match http_port {
8545 => println!("Protocol: {}", protocol),
_ => panic!("Must not supply an http port when using ipc protocol"),
},
val => panic!("Unsupported protocol: {}", val),
}

println!("Pool Size: {}", pool_size);

Ok(TrinConfig {
http_port: http_port,
pool_size: pool_size,
protocol: protocol.to_string(),
})
}
}

pub fn launch_trin(trin_config: TrinConfig, infura_project_id: String) {
let pool = ThreadPool::new(trin_config.pool_size as usize);

match trin_config.protocol.as_str() {
"ipc" => launch_ipc_client(pool, infura_project_id),
"http" => launch_http_client(pool, infura_project_id, trin_config),
val => panic!("Unsupported protocol: {}", val),
}
}

fn launch_ipc_client(pool: ThreadPool, infura_project_id: String) {
let path = "/tmp/trin-jsonrpc.ipc";
let listener_result = unix::net::UnixListener::bind(path);
let listener = match listener_result {
Expand All @@ -38,15 +119,15 @@ pub fn launch_trin(infura_project_id: String) {
let stream = stream.unwrap();
let infura_project_id = infura_project_id.clone();
pool.execute(move || {
let infura_url = format!("https://mainnet.infura.io:443/v3/{}", infura_project_id);
let infura_url = get_infura_url(&infura_project_id);
let mut rx = stream.try_clone().unwrap();
let mut tx = stream;
serve_client(&mut rx, &mut tx, &infura_url);
serve_ipc_client(&mut rx, &mut tx, &infura_url);
});
}
}

fn serve_client(rx: &mut impl Read, tx: &mut impl Write, infura_url: &String) {
fn serve_ipc_client(rx: &mut impl Read, tx: &mut impl Write, infura_url: &String) {
println!("Welcoming...");
let deser = serde_json::Deserializer::from_reader(rx);
for obj in deser.into_iter::<serde_json::Value>() {
Expand Down Expand Up @@ -81,6 +162,91 @@ fn serve_client(rx: &mut impl Read, tx: &mut impl Write, infura_url: &String) {
println!("Clean exit");
}

fn launch_http_client(pool: ThreadPool, infura_project_id: String, trin_config: TrinConfig) {
let uri = format!("127.0.0.1:{}", trin_config.http_port);
let listener = TcpListener::bind(uri).unwrap();
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let infura_project_id = infura_project_id.clone();
pool.execute(move || {
let infura_url = get_infura_url(&infura_project_id);
serve_http_client(stream, &infura_url);
});
}
Err(e) => {
panic!("HTTP connection failed: {}", e)
}
}
}
}

fn serve_http_client(mut stream: TcpStream, infura_url: &String) {
let mut buffer = [0; 1024];

stream.read(&mut buffer).unwrap();

let request = String::from_utf8_lossy(&buffer[..]);
let json_request = match request.lines().last() {
None => panic!("Invalid json request."),
Some(last_line) => last_line.split('\u{0}').nth(0).unwrap(),
};

let deser = serde_json::Deserializer::from_str(&json_request);
for obj in deser.into_iter::<serde_json::Value>() {
let obj = obj.unwrap();
assert!(obj.is_object());
assert_eq!(obj["jsonrpc"], "2.0");
let request_id = obj.get("id").unwrap();
let method = obj.get("method").unwrap();

let response = match method.as_str().unwrap() {
Comment on lines +197 to +203
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the error handling was already this way in the IPC code, but it's bad news. Sending bad input data to trin should reply with an error, instead of crashing trin.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I've added this to be tackled in the refactor issue #6

"web3_clientVersion" => {
let contents = format!(
r#"{{"jsonrpc":"2.0","id":{},"result":"trin 0.0.1-alpha"}}"#,
request_id
);
format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
contents.len(),
contents,
)
.into_bytes()
}
_ => {
//Re-encode json to proxy to Infura
let request = obj.to_string();
match proxy_to_url(request, infura_url) {
Ok(result_body) => {
let contents = String::from_utf8_lossy(&result_body);
format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
contents.len(),
contents,
)
.into_bytes()
}
Err(err) => {
let contents = format!(
r#"{{"jsonrpc":"2.0","id":"{}","error":"Infura failure: {}"}}"#,
request_id,
err.to_string(),
);
format!(
"HTTP/1.1 502 BAD GATEWAY\r\nContent-Length: {}\r\n\r\n{}",
contents.len(),
contents,
)
.into_bytes()
}
}
}
};
stream.write(&response).unwrap();
stream.flush().unwrap();
}
}

fn proxy_to_url(request: String, url: &String) -> io::Result<Vec<u8>> {
let client = reqwest::Client::new();
match client.post(url).body(request).send() {
Expand Down Expand Up @@ -108,3 +274,81 @@ fn proxy_to_url(request: String, url: &String) -> io::Result<Vec<u8>> {
)),
}
}

fn get_infura_url(infura_project_id: &String) -> String {
return format!("https://mainnet.infura.io:443/v3/{}", infura_project_id);
}

#[cfg(test)]
mod test {
use super::*;

fn env_is_set() -> bool {
match env::var("TRIN_INFURA_PROJECT_ID") {
Ok(_) => true,
_ => false,
}
}

#[test]
fn test_default_args() {
assert!(env_is_set());
let expected_config = TrinConfig {
protocol: "http".to_string(),
http_port: 8545,
pool_size: 2,
};
let actual_config = TrinConfig::new_from(["trin"].iter()).unwrap();
assert_eq!(actual_config.protocol, expected_config.protocol);
assert_eq!(actual_config.http_port, expected_config.http_port);
assert_eq!(actual_config.pool_size, expected_config.pool_size);
}

#[test]
fn test_custom_http_args() {
assert!(env_is_set());
let expected_config = TrinConfig {
protocol: "http".to_string(),
http_port: 8080,
pool_size: 3,
};
let actual_config = TrinConfig::new_from(
[
"trin",
"--protocol",
"http",
"--http-port",
"8080",
"--pool-size",
"3",
]
.iter(),
)
.unwrap();
assert_eq!(actual_config.protocol, expected_config.protocol);
assert_eq!(actual_config.http_port, expected_config.http_port);
assert_eq!(actual_config.pool_size, expected_config.pool_size);
}

#[test]
fn test_ipc_protocol() {
assert!(env_is_set());
let actual_config = TrinConfig::new_from(["trin", "--protocol", "ipc"].iter()).unwrap();
let expected_config = TrinConfig {
protocol: "ipc".to_string(),
http_port: 8545,
pool_size: 2,
};
assert_eq!(actual_config.protocol, expected_config.protocol);
assert_eq!(actual_config.http_port, expected_config.http_port);
assert_eq!(actual_config.pool_size, expected_config.pool_size);
}

#[test]
#[should_panic(expected = "Must not supply an http port when using ipc")]
fn test_ipc_protocol_rejects_custom_http_port() {
assert!(env_is_set());
TrinConfig::new_from(["trin", "--protocol", "ipc", "--http-port", "7879"].iter())
.unwrap_err();
}
}
14 changes: 9 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::env;

mod cli;
use cli::TrinConfig;

fn main() {
match env::var("TRIN_INFURA_PROJECT_ID") {
Ok(val) => cli::launch_trin(val),
Err(_) => println!(
let trin_config = TrinConfig::new();

let infura_project_id = match env::var("TRIN_INFURA_PROJECT_ID") {
Ok(val) => val,
Err(_) => panic!(
"Must supply Infura key as environment variable, like:\n\
TRIN_INFURA_PROJECT_ID=\"your-key-here\" trin"
),
}
};

cli::launch_trin(trin_config, infura_project_id);
}