Skip to content

Commit

Permalink
Merge branch 'master' into cancel-inf
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Oct 10, 2018
2 parents 5154a1f + 9881ce5 commit 8bc3ab4
Show file tree
Hide file tree
Showing 23 changed files with 412 additions and 90 deletions.
21 changes: 21 additions & 0 deletions CHANGES.md
@@ -1,5 +1,26 @@
# CHANGES

## [0.x.x](2018-1x-xx)

### Added

- Introduce the `clock` module to allow overriding and mocking the system clock
based on `tokio_timer`.

- System now has `System::builder()` which allows overriding the system clock
with a custom instance. `Arbiter::builder()` can now also override the system
clock. The default is to inherit from the system.

- New utility classes `TimerFunc` and `IntervalFunc` in the `utils` module.

- Implement `failure::Fail` for `SendError`.

- Implement `Debug` for multiple public types: `AddressSender`, `Addr`, `Arbiter`, `Context`, `ContextParts`, `ContextFut`, `Response`, `ActorResponse`, `Mailbox`, `SystemRegistry`, `Supervisor`, `System`, `SystemRunner`, `SystemArbiter`. #135

### Changed

- No longer perform unnecessary clone of `Addr` in `SystemRegistry::set`.

## [0.7.4] (2018-08-27)

### Added
Expand Down
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -58,7 +58,7 @@ the [`Actor`](https://actix.github.io/actix/actix/trait.Actor.html) trait.

```rust
extern crate actix;
use actix::{msgs, Actor, Addr, Arbiter, Context, Syn, System};
use actix::{msgs, Actor, Addr, Arbiter, Context, System};

struct MyActor;

Expand Down
5 changes: 3 additions & 2 deletions src/actors/resolver.rs
Expand Up @@ -43,7 +43,7 @@ extern crate trust_dns_resolver;
use std::collections::VecDeque;
use std::io;
use std::net::SocketAddr;
use std::time::{Duration, Instant};
use std::time::Duration;

use self::trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use self::trust_dns_resolver::lookup_ip::LookupIpFuture;
Expand All @@ -52,6 +52,7 @@ use futures::{Async, Future, Poll};
use tokio_tcp::{ConnectFuture, TcpStream};
use tokio_timer::Delay;

use clock;
use prelude::*;

#[deprecated(since = "0.7.0", note = "please use `Resolver` instead")]
Expand Down Expand Up @@ -380,7 +381,7 @@ impl TcpConnector {
TcpConnector {
addrs,
stream: None,
timeout: Delay::new(Instant::now() + timeout),
timeout: Delay::new(clock::now() + timeout),
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions src/address/channel.rs
Expand Up @@ -3,6 +3,7 @@ use std::hash::{Hash, Hasher};
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc;
use std::fmt;
use std::{thread, usize};

use futures::sync::oneshot::{channel as sync_channel, Receiver};
Expand Down Expand Up @@ -51,6 +52,15 @@ pub struct AddressSender<A: Actor> {
maybe_parked: Arc<AtomicBool>,
}

impl<A: Actor> fmt::Debug for AddressSender<A> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("AddressSender")
.field("sender_task", &self.sender_task)
.field("maybe_parked", &self.maybe_parked)
.finish()
}
}

trait AssertKinds: Send + Sync + Clone {}

/// The receiving end of a channel which implements the `Stream` trait.
Expand Down Expand Up @@ -262,10 +272,8 @@ impl<A: Actor> AddressSender<A> {
None => return Err(SendError::Closed(msg)),
};

if park_self {
if park {
self.park(true);
}
if park_self && park {
self.park(true);
}
let env = <A::Context as ToEnvelope<A, M>>::pack(msg, None);
self.queue_push_and_signal(env);
Expand Down
7 changes: 4 additions & 3 deletions src/address/message.rs
@@ -1,10 +1,11 @@
use std::marker::PhantomData;
use std::time::{Duration, Instant};
use std::time::Duration;

use futures::sync::oneshot;
use futures::{Async, Future, Poll};
use tokio_timer::Delay;

use clock;
use handler::{Handler, Message};

use super::channel::{AddressSender, Sender};
Expand Down Expand Up @@ -49,7 +50,7 @@ where

/// Set message delivery timeout
pub fn timeout(mut self, dur: Duration) -> Self {
self.timeout = Some(Delay::new(Instant::now() + dur));
self.timeout = Some(Delay::new(clock::now() + dur));
self
}

Expand Down Expand Up @@ -130,7 +131,7 @@ where

/// Set message delivery timeout
pub fn timeout(mut self, dur: Duration) -> Self {
self.timeout = Some(Delay::new(Instant::now() + dur));
self.timeout = Some(Delay::new(clock::now() + dur));
self
}

Expand Down
6 changes: 6 additions & 0 deletions src/address/mod.rs
@@ -1,5 +1,6 @@
use std::fmt;
use std::hash::{Hash, Hasher};
use failure;

pub(crate) mod channel;
mod envelope;
Expand Down Expand Up @@ -55,13 +56,18 @@ impl<T> fmt::Display for SendError<T> {
}
}

impl<T> failure::Fail for SendError<T>
where T: Send + Sync + 'static {
}

impl fmt::Debug for MailboxError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "MailboxError({})", self)
}
}

/// Address of the actor
#[derive(Debug)]
pub struct Addr<A: Actor> {
tx: AddressSender<A>,
}
Expand Down
51 changes: 33 additions & 18 deletions src/arbiter.rs
Expand Up @@ -5,11 +5,12 @@ use std::thread;
use futures::sync::oneshot::{channel, Sender};
use futures::{future, Future, IntoFuture};
use tokio::executor::current_thread::spawn;
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::current_thread::Builder as RuntimeBuilder;
use uuid::Uuid;

use actor::Actor;
use address::{channel, Addr, AddressReceiver};
use clock::Clock;
use context::Context;
use handler::Handler;
use mailbox::DEFAULT_CAPACITY;
Expand All @@ -35,6 +36,7 @@ thread_local!(
/// unless the panic is in the System actor. Users of Arbiter can opt into
/// shutting down the system on panic by using `Arbiter::builder()` and enabling
/// `stop_system_on_panic`.
#[derive(Debug)]
pub struct Arbiter {
stop: Option<Sender<i32>>,
stop_system_on_panic: bool,
Expand Down Expand Up @@ -70,18 +72,18 @@ impl Arbiter {

/// Spawn new thread and run event loop in spawned thread.
/// Returns address of newly created arbiter.
fn new_with_builder(builder: Builder) -> Addr<Arbiter> {
fn new_with_builder(mut builder: Builder) -> Addr<Arbiter> {
let (tx, rx) = std::sync::mpsc::channel();
let id = Uuid::new_v4();
let name = format!(
"arbiter:{}:{}",
id.to_hyphenated_ref().to_string(),
builder.name.as_ref().unwrap_or(&"actor".into())
builder.name.take().unwrap_or_else(|| "actor".into())
);
let sys = System::current();

let _ = thread::Builder::new().name(name.clone()).spawn(move || {
let mut rt = Runtime::new().unwrap();
let mut rt = builder.runtime.build().unwrap();

let (stop, stop_rx) = channel();
NAME.with(|cell| *cell.borrow_mut() = Some(name));
Expand All @@ -91,8 +93,8 @@ impl Arbiter {
System::set_current(sys);

// start arbiter
let addr =
rt.block_on(future::lazy(move || {
let addr = rt
.block_on(future::lazy(move || {
let addr = Actor::start(Arbiter {
stop: Some(stop),
stop_system_on_panic: builder.stop_system_on_panic,
Expand All @@ -102,9 +104,10 @@ impl Arbiter {
ADDR.with(|cell| *cell.borrow_mut() = Some(addr.clone()));

// register arbiter
System::current()
.sys()
.do_send(RegisterArbiter(id.to_simple_ref().to_string(), addr.clone()));
System::current().sys().do_send(RegisterArbiter(
id.to_simple_ref().to_string(),
addr.clone(),
));

if tx.send(addr).is_err() {
error!("Can not start Arbiter, remote side is dead");
Expand Down Expand Up @@ -205,17 +208,17 @@ impl Arbiter {
/// Start new arbiter and then start actor in created arbiter.
/// Returns `Addr<Syn, A>` of created actor.
pub fn start<A, F>(f: F) -> Addr<A>
where
A: Actor<Context = Context<A>>,
F: FnOnce(&mut A::Context) -> A + Send + 'static,
where
A: Actor<Context = Context<A>>,
F: FnOnce(&mut A::Context) -> A + Send + 'static,
{
Arbiter::builder().start(f)
}

fn start_with_builder<A, F>(builder: Builder, f: F) -> Addr<A>
where
A: Actor<Context = Context<A>>,
F: FnOnce(&mut A::Context) -> A + Send + 'static,
where
A: Actor<Context = Context<A>>,
F: FnOnce(&mut A::Context) -> A + Send + 'static,
{
let (stx, srx) = channel::channel(DEFAULT_CAPACITY);

Expand Down Expand Up @@ -271,13 +274,17 @@ pub struct Builder {

/// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
stop_system_on_panic: bool,

/// Tokio runtime builder.
runtime: RuntimeBuilder,
}

impl Builder {
fn new() -> Self {
Builder {
name: None,
stop_system_on_panic: false,
runtime: RuntimeBuilder::new(),
}
}

Expand All @@ -296,6 +303,14 @@ impl Builder {
self
}

/// Set the Clock instance that will be used by this Arbiter.
///
/// Defaults to the clock used by the actix `System`, which defaults to the system clock.
pub fn clock(mut self, clock: Clock) -> Self {
self.runtime.clock(clock);
self
}

/// Spawn new thread and run event loop in spawned thread.
/// Returns address of newly created arbiter.
pub fn build(self) -> Addr<Arbiter> {
Expand All @@ -305,9 +320,9 @@ impl Builder {
/// Start new arbiter and then start actor in created arbiter.
/// Returns `Addr<Syn, A>` of created actor.
pub fn start<A, F>(self, f: F) -> Addr<A>
where
A: Actor<Context = Context<A>>,
F: FnOnce(&mut A::Context) -> A + Send + 'static,
where
A: Actor<Context = Context<A>>,
F: FnOnce(&mut A::Context) -> A + Send + 'static,
{
Arbiter::start_with_builder(self, f)
}
Expand Down
11 changes: 11 additions & 0 deletions src/clock.rs
@@ -0,0 +1,11 @@
//! A configurable source of time.
//!
//! This module provides an API to get the current instant in such a way that
//! the source of time may be configured. This allows mocking out the source of
//! time in tests.
//!
//! See [Module `tokio_timer::clock`] for full documentation.
//!
//! [Module `tokio_timer::clock`]: https://docs.rs/tokio-timer/latest/tokio_timer/clock/index.html

pub use tokio_timer::clock::{now, Clock, Now};
10 changes: 10 additions & 0 deletions src/context.rs
Expand Up @@ -2,6 +2,7 @@ use actor::{Actor, ActorContext, ActorState, AsyncContext, SpawnHandle};
use address::{Addr, AddressReceiver};
use arbiter::Arbiter;
use fut::ActorFuture;
use std::fmt;

use contextimpl::{AsyncContextParts, ContextFut, ContextParts};
use mailbox::Mailbox;
Expand All @@ -15,6 +16,15 @@ where
mb: Option<Mailbox<A>>,
}

impl<A: Actor<Context = Context<A>>> fmt::Debug for Context<A> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Context")
.field("parts", &self.parts)
.field("mb", &self.mb)
.finish()
}
}

impl<A> ActorContext for Context<A>
where
A: Actor<Context = Self>,
Expand Down
23 changes: 23 additions & 0 deletions src/contextimpl.rs
@@ -1,5 +1,6 @@
use futures::{Async, Future, Poll};
use smallvec::SmallVec;
use std::fmt;

use actor::{
Actor, ActorContext, ActorState, AsyncContext, Running, SpawnHandle, Supervised,
Expand Down Expand Up @@ -44,6 +45,18 @@ where
handles: SmallVec<[SpawnHandle; 2]>,
}

impl<A> fmt::Debug for ContextParts<A>
where
A: Actor,
A::Context: AsyncContext<A>,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ContextParts")
.field("flags", &self.flags)
.finish()
}
}

impl<A> ContextParts<A>
where
A: Actor,
Expand Down Expand Up @@ -185,6 +198,16 @@ where
items: SmallVec<[Item<A>; 3]>,
}

impl<A, C> fmt::Debug for ContextFut<A, C>
where
C: AsyncContextParts<A>,
A: Actor<Context = C>,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "ContextFut {{ /* omitted */ }}")
}
}

impl<A, C> ContextFut<A, C>
where
C: AsyncContextParts<A>,
Expand Down
5 changes: 3 additions & 2 deletions src/contextitems.rs
@@ -1,9 +1,10 @@
use futures::{Async, Future, Poll, Stream};
use std::marker::PhantomData;
use std::time::{Duration, Instant};
use std::time::Duration;
use tokio_timer::Delay;

use actor::{Actor, ActorContext, AsyncContext};
use clock;
use fut::ActorFuture;
use handler::{Handler, Message, MessageResponse};

Expand Down Expand Up @@ -57,7 +58,7 @@ where
pub fn new(msg: M, timeout: Duration) -> Self {
ActorDelayedMessageItem {
msg: Some(msg),
timeout: Delay::new(Instant::now() + timeout),
timeout: Delay::new(clock::now() + timeout),
act: PhantomData,
m: PhantomData,
}
Expand Down

0 comments on commit 8bc3ab4

Please sign in to comment.