Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to extractors for auto handlers #3

Merged
merged 1 commit into from Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Expand Up @@ -67,10 +67,10 @@ Or with job success and failure reported for you automatically from your
function result:

```rust
use futures::future;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use zeebe::Client;
use futures::future;
use zeebe::{Client, Data};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -97,7 +97,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

// Async job handler function
async fn handle_job(client: Client, data: MyJobData) -> Result<MyJobResult, MyError> {
async fn handle_job(data: Data<MyJobData>) -> Result<MyJobResult, MyError> {
Ok(MyJobResult { result: 42 })
}

Expand All @@ -113,7 +113,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let job = client
.job_worker()
.with_job_type("my-job-type")
.with_auto_handler(|client: Client, my_job_data: MyJobData| {
.with_auto_handler(|my_job_data: Data<MyJobData>| {
future::ok::<_, MyError>(MyJobResult { result: 42 })
})
.run()
Expand Down
59 changes: 59 additions & 0 deletions examples/auto_handler.rs
@@ -0,0 +1,59 @@
use serde::{Deserialize, Serialize};
use std::cell::Cell;
use thiserror::Error;
use zeebe::{Client, Data, State};

// Any error that implements `std::error::Error`
#[derive(Error, Debug)]
enum MyError {}

// A type that can be deserialized from job variables data
#[derive(Deserialize)]
struct MyJobData {
increment: u32,
}

// A result type that can be serialized as job success variables
#[derive(Serialize)]
struct MyJobResult {
result: u32,
}

// Optional worker state that persists across jobs
struct JobState {
total: Cell<u32>,
}

// Job handler with arbitrary number of parameters that can be extracted
async fn handle_job(
job_data: Data<MyJobData>,
job_state: State<JobState>,
) -> Result<MyJobResult, MyError> {
let current_total = job_state.total.get();
let new_total = current_total + job_data.increment;

job_state.total.set(new_total);

Ok(MyJobResult { result: new_total })
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new();

// Initialize the worker state
let job_state = JobState {
total: Cell::new(0),
};

// Run the job
let _job = client
.job_worker()
.with_job_type("my-job-type")
.with_auto_handler(handle_job)
.with_state(job_state)
.run()
.await?;

Ok(())
}
5 changes: 4 additions & 1 deletion src/client.rs
Expand Up @@ -4,20 +4,22 @@ use crate::{
proto::gateway_client::GatewayClient,
topology::TopologyBuilder,
util::{PublishMessageBuilder, ResolveIncidentBuilder},
worker::JobWorkerBuilder,
worker::{auto_handler::Extensions, JobWorkerBuilder},
workflow::{
CancelWorkflowInstanceBuilder, CreateWorkflowInstanceBuilder,
CreateWorkflowInstanceWithResultBuilder, DeployWorkflowBuilder, SetVariablesBuilder,
},
};
use std::fmt::Debug;
use std::rc::Rc;
use tonic::transport::{Channel, ClientTlsConfig};

/// Client used to communicate with Zeebe.
#[derive(Clone, Debug)]
pub struct Client {
pub(crate) gateway_client: GatewayClient<Channel>,
pub(crate) current_job_key: Option<i64>,
pub(crate) current_job_extensions: Option<Rc<Extensions>>,
}

impl Default for Client {
Expand Down Expand Up @@ -76,6 +78,7 @@ impl Client {
Ok(Client {
gateway_client: GatewayClient::new(channel),
current_job_key: None,
current_job_extensions: None,
})
}

Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Expand Up @@ -65,4 +65,10 @@ pub enum Error {
/// Invalid method parameters
#[error("Invalid parameters: {0}")]
InvalidParameters(&'static str),
/// Job payload deserialization error
#[error("Cannot deserialize variables to expected type: {0}")]
DeserializationError(#[from] serde_json::Error),
/// Missing worker state configuration
#[error("Worker state is not configured, use `JobWorkerBuilder::with_state` while building worker for this job")]
MissingWorkerStateConfig,
}
10 changes: 5 additions & 5 deletions src/lib.rs
Expand Up @@ -60,10 +60,10 @@
//! function result:
//!
//! ```no_run
//! use futures::future;
//! use serde::{Deserialize, Serialize};
//! use thiserror::Error;
//! use zeebe::Client;
//! use futures::future;
//! use zeebe::{Client, Data};
//!
//! # #[tokio::main]
//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -90,7 +90,7 @@
//! }
//!
//! // Async job handler function
//! async fn handle_job(client: Client, data: MyJobData) -> Result<MyJobResult, MyError> {
//! async fn handle_job(data: Data<MyJobData>) -> Result<MyJobResult, MyError> {
//! Ok(MyJobResult { result: 42 })
//! }
//!
Expand All @@ -106,7 +106,7 @@
//! let job = client
//! .job_worker()
//! .with_job_type("my-job-type")
//! .with_auto_handler(|client: Client, my_job_data: MyJobData| {
//! .with_auto_handler(|my_job_data: Data<MyJobData>| {
//! future::ok::<_, MyError>(MyJobResult { result: 42 })
//! })
//! .run()
Expand Down Expand Up @@ -160,7 +160,7 @@ pub use topology::{BrokerInfo, Partition, TopologyBuilder, TopologyResponse};
pub use util::{
PublishMessageBuilder, PublishMessageResponse, ResolveIncidentBuilder, ResolveIncidentResponse,
};
pub use worker::JobWorkerBuilder;
pub use worker::{Data, JobWorkerBuilder, State};
pub use workflow::{
CancelWorkflowInstanceBuilder, CancelWorkflowInstanceResponse, CreateWorkflowInstanceBuilder,
CreateWorkflowInstanceResponse, CreateWorkflowInstanceWithResultBuilder,
Expand Down
84 changes: 84 additions & 0 deletions src/worker/auto_handler/extractor/data.rs
@@ -0,0 +1,84 @@
use std::fmt;
use std::ops;

/// Data that can be extracted from JSON encoded job variables in [`auto handlers`].
///
/// [`auto handlers`]: struct.JobWorkerBuilder.html#method.with_auto_handler
///
/// # Examples
///
/// ```no_run
/// use futures::future;
/// use serde::{Deserialize, Serialize};
/// use std::cell::Cell;
/// use thiserror::Error;
/// use zeebe::{Client, Data};
///
/// #[derive(Error, Debug)]
/// enum MyError {}
///
/// #[derive(Serialize)]
/// struct MyJobResult {
/// result: u32,
/// }
///
/// #[derive(Deserialize)]
/// struct MyJobData {
/// some_key: String,
/// }
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let client = Client::default();
///
/// let _job = client
/// .job_worker()
/// .with_job_type("my-job-type")
/// .with_auto_handler(|my_job_data: Data<MyJobData>| {
/// future::ok::<_, MyError>(MyJobResult { result: 42 })
/// })
/// .run()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub struct Data<T>(pub T);

impl<T> Data<T> {
/// Deconstruct to an inner value
pub fn into_inner(self) -> T {
self.0
}
}

impl<T> ops::Deref for Data<T> {
type Target = T;

fn deref(&self) -> &T {
&self.0
}
}

impl<T> ops::DerefMut for Data<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.0
}
}

impl<T> fmt::Debug for Data<T>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Json: {:?}", self.0)
}
}

impl<T> fmt::Display for Data<T>
where
T: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}
74 changes: 74 additions & 0 deletions src/worker/auto_handler/extractor/mod.rs
@@ -0,0 +1,74 @@
use crate::{
client::Client,
error::{Error, Result},
job::Job,
};

mod data;
mod state;

pub use data::Data;
pub use state::State;

pub trait FromJob: Sized {
fn from_job(client: &Client, job: &Job) -> std::result::Result<Self, Error>;
}

impl FromJob for Client {
fn from_job(client: &Client, _: &Job) -> std::result::Result<Self, Error> {
Ok(client.clone())
}
}

impl FromJob for Job {
fn from_job(_: &Client, job: &Job) -> std::result::Result<Self, Error> {
Ok(job.clone())
}
}

impl<T: serde::de::DeserializeOwned> FromJob for Data<T> {
fn from_job(_: &Client, job: &Job) -> Result<Self> {
Ok(Data(serde_json::from_str(job.variables_str())?))
}
}

impl<T: 'static> FromJob for State<T> {
fn from_job(client: &Client, _: &Job) -> Result<Self> {
if let Some(data) = client
.current_job_extensions
.as_ref()
.and_then(|ext| ext.get::<State<T>>())
{
Ok(data.clone())
} else {
Err(Error::MissingWorkerStateConfig)
}
}
}

macro_rules! tuple_from_job ({$(($n:tt, $T:ident)),+} => {
/// FromJob implementation for tuple
#[doc(hidden)]
impl<$($T: FromJob + 'static),+> FromJob for ($($T,)+)
{
fn from_job(client: &Client, job: &Job) -> Result<Self> {
Ok(($($T::from_job(client, job)?,)+))
}
}
});

#[rustfmt::skip]
mod m {
use super::*;

tuple_from_job!((0, A));
tuple_from_job!((0, A), (1, B));
tuple_from_job!((0, A), (1, B), (2, C));
tuple_from_job!((0, A), (1, B), (2, C), (3, D));
tuple_from_job!((0, A), (1, B), (2, C), (3, D), (4, E));
tuple_from_job!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F));
tuple_from_job!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F), (6, G));
tuple_from_job!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F), (6, G), (7, H));
tuple_from_job!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F), (6, G), (7, H), (8, I));
tuple_from_job!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F), (6, G), (7, H), (8, I), (9, J));
}