Skip to content

Commit

Permalink
WIP: connect & logging
Browse files Browse the repository at this point in the history
  • Loading branch information
prekucki committed May 28, 2019
1 parent 201272d commit 7e8106f
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 8 deletions.
53 changes: 53 additions & 0 deletions COMMANDS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@


#### Check key

```
POST /api HTTP/1.1
Host: localhost:3292
content-type: application/json
{
"command": "upload",
"id": null,
"hash": "9854003e54f053e2d911b67bcbf38260",
"timeout": null
}
```

```
400 Bad request
{
"error":"NotFoundError: Key not found in database [6af750676216564288a098837ad99e965c9e4e8c7df610e1e2095d597c95d3b0]"
}
```

#### Check key OK

```
POST /api HTTP/1.1
Host: localhost:3292
content-type: application/json
{
"command": "upload",
"id": null,
"hash": "612dd6a00e0e5cd784bdae7de99c78de9bebd7fabd8dfab70adfe2b607c8eccc",
"timeout": 234
}
```

```
HTTP/1.1 200 OK
Content-Type: application/json
{
"hash":"612dd6a00e0e5cd784bdae7de99c78de9bebd7fabd8dfab70adfe2b607c8eccc"
}
```

18 changes: 17 additions & 1 deletion src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::filemap::FileMap;
use actix::Message;
use bytes::{Buf, BufMut, ByteOrder, BytesMut, LittleEndian};
use rand::ErrorKind;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -140,20 +141,35 @@ pub struct Ask {
pub hash: u128,
}

impl Ask {
#[inline]
pub fn new(hash: u128) -> Self {
Self { hash }
}
}

impl Message for Ask {
type Result = Result<AskReply, crate::error::Error>;
}

#[derive(Default, Serialize, Deserialize)]
pub struct AskReply {
pub hash: u128,
// None if unknown hash
pub files: Option<Vec<FileMap>>,
}

#[derive(Default, Serialize, Deserialize)]
#[derive(Default, Serialize, Deserialize, Hash, PartialEq, Eq)]
pub struct GetBlock {
pub hash: u128,
pub file_nr: u32,
pub block_nr: u32,
}

impl Message for GetBlock {
type Result = Result<Block, crate::error::Error>;
}

#[derive(Default, Serialize, Deserialize, Clone)]
pub struct Block {
pub hash: u128,
Expand Down
23 changes: 22 additions & 1 deletion src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::codec::Op::Hello;
use crate::codec::{GetBlock, Op, StCodec, StCommand};
use crate::codec::{AskReply, Block, GetBlock, Op, StCodec, StCommand};
use crate::command::Command;
use crate::database;
use crate::database::{DatabaseManager, FileDesc};
Expand All @@ -8,7 +8,10 @@ use crate::filemap::{FileMap, BLOCK_SIZE};
use actix::io::WriteHandler;
use actix::prelude::*;
use actix::{Actor, Addr, Context};
use futures::task::AtomicTask;
use futures::unsync::oneshot;
use std::cmp::min;
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::{ErrorKind, Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
Expand All @@ -24,6 +27,8 @@ pub struct Connection {
framed: actix::io::FramedWrite<WriteHalf<TcpStream>, StCodec>,
peer_id: Option<u128>,
current_file: Option<database::FileDesc>,
block_requests: HashMap<GetBlock, oneshot::Sender<Block>>,
ask_requests: HashMap<u128, oneshot::Sender<AskReply>>,
}

impl Actor for Connection {
Expand Down Expand Up @@ -54,6 +59,8 @@ impl Connection {
peer_addr,
peer_id: None,
current_file: None,
block_requests: HashMap::new(),
ask_requests: HashMap::new(),
}
})
}
Expand Down Expand Up @@ -227,3 +234,17 @@ impl StreamHandler<StCommand, io::Error> for Connection {
}

impl WriteHandler<io::Error> for Connection {}

impl Handler<crate::codec::Ask> for Connection {
type Result = ActorResponse<Self, AskReply, Error>;

fn handle(&mut self, msg: crate::codec::Ask, ctx: &mut Self::Context) -> Self::Result {
let (rx, tx) = oneshot::channel();
if let Some(prev) = self.ask_requests.insert(msg.hash, rx) {
log::error!("duplicate sned");
} else {
self.framed.write(StCommand::Ask(msg.hash))
}
ActorResponse::r#async(tx.from_err().into_actor(self))
}
}
2 changes: 1 addition & 1 deletion src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Actor for DatabaseManager {
}
Ok(()) => (),
}
log::info!("db started id={}", self.id.as_ref().unwrap())
log::info!("db started id=0x{:032x}", self.id.as_ref().unwrap())
}
}

Expand Down
43 changes: 43 additions & 0 deletions src/download.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use crate::codec::{Ask, AskReply};
use crate::connection::Connection;
use crate::database::DatabaseManager;
use crate::error::Error;
use crate::filemap::FileMap;
use crate::flatten::*;
use actix::prelude::*;
use futures::prelude::*;
use std::{io, net};
use tokio_codec::FramedRead;
use tokio_io::io::WriteHalf;
use tokio_io::AsyncRead;
use tokio_tcp::TcpStream;

pub fn connect(
db: Addr<DatabaseManager>,
addr: net::SocketAddr,
) -> impl Future<Item = Addr<Connection>, Error = Error> {
TcpStream::connect(&addr)
.from_err()
.and_then(move |c| Ok(Connection::new(db, c, addr)))
}

pub fn find_peer(
hash: u128,
db: Addr<DatabaseManager>,
addr: Vec<net::SocketAddr>,
) -> impl Future<Item = (Addr<Connection>, Vec<FileMap>), Error = Error> {
let connections = addr.into_iter().map(move |addr| {
let hash = hash;
connect(db.clone(), addr).and_then(move |connection| {
connection
.send(Ask::new(hash))
.flatten_fut()
.and_then(move |reply: AskReply| match reply.files {
Some(files) => Ok((connection, files)),
None => Err(Error::ResourceNotFound(reply.hash)),
})
})
});

futures::select_ok(connections).and_then(|(v, _)| Ok(v))
}
7 changes: 6 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ pub enum Error {
Mailbox(actix::MailboxError),
#[fail(display = "{}", _0)]
ProtoError(String),
#[fail(display = "request canceled {}", _0)]
RequestCanceled(#[cause] futures::Canceled),
#[fail(display = "resource {:032x} not found", _0)]
ResourceNotFound(u128),
}

macro_rules! convert {
Expand All @@ -37,5 +41,6 @@ convert! {
io::Error => IO,
bincode::Error => InvalidBinFormat,
serde_json::Error => InvalidJsonFormat,
actix::MailboxError => Mailbox
actix::MailboxError => Mailbox,
futures::Canceled => RequestCanceled
}
56 changes: 56 additions & 0 deletions src/flatten.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use futures::{future, Async};

pub trait FlattenResult<T, E> {
fn flatten_result(self) -> Result<T, E>;
}

impl<T, E, TE> FlattenResult<T, E> for Result<Result<T, E>, TE>
where
TE: Into<E>,
{
fn flatten_result(self) -> Result<T, E> {
match self {
Err(e) => Err(e.into()),
Ok(r) => r,
}
}
}

pub trait FlattenFuture<T, E> {
type Future: future::Future<Item = T, Error = E>;

fn flatten_fut(self) -> Self::Future;
}

pub struct FlatFut<F: future::Future> {
inner: F,
}

impl<T, TE, E, F: future::Future<Item = Result<T, E>, Error = TE>> future::Future for FlatFut<F>
where
TE: Into<E>,
{
type Item = T;
type Error = E;

fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
match self.inner.poll() {
Err(e) => Err(e.into()),
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(Err(e))) => Err(e),
Ok(Async::Ready(Ok(v))) => Ok(Async::Ready(v)),
}
}
}

impl<T, TE, E, F> FlattenFuture<T, E> for F
where
TE: Into<E>,
F: future::Future<Item = Result<T, E>, Error = TE>,
{
type Future = FlatFut<F>;

fn flatten_fut(self) -> Self::Future {
FlatFut { inner: self }
}
}
14 changes: 10 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ use std::sync::Arc;
use structopt::StructOpt;
use tokio_reactor::Handle;
use tokio_tcp::TcpListener;
use actix_web::middleware::Logger;

mod codec;
mod command;
mod connection;
pub(crate) mod database;
mod download;
pub(crate) mod error;
pub(crate) mod filemap;
mod flatten;
mod server;

/// Simple resource transfer server for Golem Brass Network.
Expand Down Expand Up @@ -91,7 +94,7 @@ impl State {
) -> impl Future<Item = HttpResponse, Error = actix_web::error::Error> {
let hashed: Result<Vec<(filemap::FileMap, PathBuf)>, _> = files
.into_iter()
.map(|(path, file_name)| (filemap::hash_file(&path, file_name), path))
.map(|(path, file_name)| Ok((filemap::hash_file(&path, file_name)?, path)))
.collect();

let db = self.db.clone();
Expand All @@ -100,7 +103,7 @@ impl State {
db.send(RegisterHash(file_maps)).then(|r| match r {
Err(e) => Err(actix_web::error::ErrorInternalServerError("database lost")),
Ok(Err(e)) => Err(actix_web::error::ErrorInternalServerError(e)),
Ok(Ok(hash)) => future::ok(HttpResponse::Ok().json(UploadResult {
Ok(Ok(hash)) => Ok(HttpResponse::Ok().json(UploadResult {
hash: hash_to_hex(hash),
})),
})
Expand All @@ -124,7 +127,7 @@ fn api(

fn main() -> std::io::Result<()> {
let args = ServerOpts::from_args();
flexi_logger::Logger::with_env_or_str("hyperg=debug")
flexi_logger::Logger::with_env_or_str("hyperg=debug,actix_web::middleware::logger=info")
.start()
.unwrap();

Expand All @@ -136,19 +139,22 @@ fn main() -> std::io::Result<()> {
let addr = net::SocketAddr::from((opts.host, opts.port));
let listener = Arc::new(net::TcpListener::bind(&addr)?);

let server_opts = opts.clone();

let _server = HttpServer::new(move || {
let listener =
TcpListener::from_std(listener.try_clone().unwrap(), &Handle::default()).unwrap();
server::Server::new(db.clone(), listener);

App::new()
.wrap(Logger::default())
.data(State {
db: db.clone(),
opts: opts.clone(),
})
.service(api)
})
.bind("127.0.0.1:3292")?
.bind((server_opts.rpc_addr, server_opts.rpc_port))?
.start();

sys.run()
Expand Down
1 change: 1 addition & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ impl Handler<TcpConnect> for Server {

fn handle(&mut self, msg: TcpConnect, _: &mut Context<Self>) {
log::info!("Connection from: {}", msg.1);
let _conn = crate::connection::Connection::new(self.db.clone(), msg.0, msg.1);
}
}

0 comments on commit 7e8106f

Please sign in to comment.