Skip to content

Commit

Permalink
Better async job status handling
Browse files Browse the repository at this point in the history
Signed-off-by: Salim Alam <salam@chef.io>
  • Loading branch information
chefsalim committed Aug 3, 2017
1 parent f7bd6c8 commit 1194cc1
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 142 deletions.
52 changes: 21 additions & 31 deletions components/builder-jobsrv/src/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use postgres;
use protobuf;
use protocol::net::{NetOk, NetError, ErrCode};
use protocol::{originsrv, jobsrv, scheduler};
use std::str::FromStr;
use protobuf::ProtobufEnum;

/// DataStore inherints being Send + Sync by virtue of having only one member, the pool itself.
Expand Down Expand Up @@ -474,34 +473,32 @@ impl DataStore {
pub fn update_job(&self, job: &jobsrv::Job) -> Result<()> {
let conn = self.pool.get_shard(0)?;
let job_id = job.get_id() as i64;
let job_state = match job.get_state() {
jobsrv::JobState::Dispatched => "Dispatched",
jobsrv::JobState::Pending => "Pending",
jobsrv::JobState::Processing => "Processing",
jobsrv::JobState::Complete => "Complete",
jobsrv::JobState::Rejected => "Rejected",
jobsrv::JobState::Failed => "Failed",
};
let job_state = job.get_state().to_string();

// Note: the following fields may all be NULL. As currently
// coded, if they are NULL, then the corresponding fields in
// the database will also be updated to be NULL. This should
// be OK, though, because they shouldn't be changing anyway.
let build_started_at = match job.has_build_started_at() {
true => Some(
DateTime::<UTC>::from_str(job.get_build_started_at()).unwrap(),
),
false => None,
let build_started_at = if job.has_build_started_at() {
Some(job.get_build_started_at().parse::<DateTime<UTC>>().unwrap())
} else {
None
};
let build_finished_at = match job.has_build_finished_at() {
true => Some(
DateTime::<UTC>::from_str(job.get_build_finished_at()).unwrap(),
),
false => None,

let build_finished_at = if job.has_build_finished_at() {
Some(
job.get_build_finished_at()
.parse::<DateTime<UTC>>()
.unwrap(),
)
} else {
None
};
let ident = match job.has_package_ident() {
true => Some(job.get_package_ident().to_string()),
false => None,

let ident = if job.has_package_ident() {
Some(job.get_package_ident().to_string())
} else {
None
};

let (err_code, err_msg) = if job.has_error() {
Expand Down Expand Up @@ -554,16 +551,9 @@ fn row_to_job(row: &postgres::rows::Row) -> Result<jobsrv::Job> {
job.set_id(id as u64);
let owner_id: i64 = row.get("owner_id");
job.set_owner_id(owner_id as u64);

let js: String = row.get("job_state");
let job_state = match &js[..] {
"Dispatched" => jobsrv::JobState::Dispatched,
"Pending" => jobsrv::JobState::Pending,
"Processing" => jobsrv::JobState::Processing,
"Complete" => jobsrv::JobState::Complete,
"Rejected" => jobsrv::JobState::Rejected,
"Failed" => jobsrv::JobState::Failed,
_ => return Err(Error::UnknownJobState),
};
let job_state: jobsrv::JobState = js.parse().map_err(Error::UnknownJobState)?;
job.set_state(job_state);

let created_at = row.get::<&str, DateTime<UTC>>("created_at");
Expand Down
7 changes: 4 additions & 3 deletions components/builder-jobsrv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use db;
use extern_url;
use hab_core;
use hab_net;
use protocol;
use postgres;
use protobuf;
use r2d2;
Expand Down Expand Up @@ -55,7 +56,7 @@ pub enum Error {
ProjectJobsGet(postgres::error::Error),
Protobuf(protobuf::ProtobufError),
UnknownVCS,
UnknownJobState,
UnknownJobState(protocol::jobsrv::Error),
Zmq(zmq::Error),
}

Expand Down Expand Up @@ -113,7 +114,7 @@ impl fmt::Display for Error {
format!("Database error getting jobs for project, {}", e)
}
Error::UnknownVCS => format!("Unknown VCS"),
Error::UnknownJobState => format!("Unknown Job State"),
Error::UnknownJobState(ref e) => format!("{}", e),
Error::Zmq(ref e) => format!("{}", e),
};
write!(f, "{}", msg)
Expand Down Expand Up @@ -148,7 +149,7 @@ impl error::Error for Error {
Error::NetError(ref err) => err.description(),
Error::ProjectJobsGet(ref err) => err.description(),
Error::Protobuf(ref err) => err.description(),
Error::UnknownJobState => "Unknown Job State",
Error::UnknownJobState(ref err) => err.description(),
Error::UnknownVCS => "Unknown VCS",
Error::Zmq(ref err) => err.description(),
}
Expand Down
53 changes: 43 additions & 10 deletions components/builder-protocol/src/jobsrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
// limitations under the License.

use message::{Persistable, Routable};
use protobuf::{ProtobufEnum, RepeatedField};
use protobuf::RepeatedField;
use regex::Regex;
use serde::ser::SerializeStruct;
use serde::{Serialize, Serializer};
use sharding::InstaId;
use std::result;
use std::str::FromStr;
use std::fmt;
use std::error;

pub use message::jobsrv::*;

Expand All @@ -28,6 +30,23 @@ pub enum Error {
BadJobState,
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let msg = match *self {
Error::BadJobState => "Bad Job State",
};
write!(f, "{}", msg)
}
}

impl error::Error for Error {
fn description(&self) -> &str {
match *self {
Error::BadJobState => "Job state cannot be parsed",
}
}
}

impl Into<Job> for JobSpec {
fn into(mut self) -> Job {
let mut job = Job::new();
Expand Down Expand Up @@ -221,19 +240,33 @@ impl FromStr for JobState {
type Err = Error;

fn from_str(value: &str) -> result::Result<Self, Self::Err> {
match value.parse() {
Ok(id) => {
if let Some(state) = JobState::from_i32(id) {
Ok(state)
} else {
Err(Error::BadJobState)
}
}
Err(_) => Err(Error::BadJobState),

match value.to_lowercase().as_ref() {
"pending" => Ok(JobState::Pending),
"processing" => Ok(JobState::Processing),
"complete" => Ok(JobState::Complete),
"rejected" => Ok(JobState::Rejected),
"failed" => Ok(JobState::Failed),
"dispatched" => Ok(JobState::Dispatched),
_ => Err(Error::BadJobState),
}
}
}

impl fmt::Display for JobState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let value = match *self {
JobState::Dispatched => "Dispatched",
JobState::Pending => "Pending",
JobState::Processing => "Processing",
JobState::Complete => "Complete",
JobState::Rejected => "Rejected",
JobState::Failed => "Failed",
};
write!(f, "{}", value)
}
}

impl Persistable for Job {
type Key = u64;

Expand Down
84 changes: 83 additions & 1 deletion components/builder-scheduler/src/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ use config::Config;
use error::{Result, Error};
use chrono::{DateTime, UTC};

use protocol;
use protocol::jobsrv::{Job, JobState};
use protocol::scheduler::*;
use protobuf::RepeatedField;
use protobuf::{parse_from_bytes, Message};

// DataStore inherits Send + Sync by virtue of having only one member, the pool itself.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -293,6 +295,50 @@ impl DataStore {
)
SELECT * FROM my_group;
$$"#)?;

// A queue to hold messages to be processed. This is currently only for JobStatus
// messages to allow the scheduler to deal with them asynchronously and reliably.
// The message queue is meant to provide 'at-least-once' delivery reliability.
// Eventually this functionality could be useful more broadly and be moved out to
// a separate module.
migrator.migrate(
"scheduler",
r#"CREATE TABLE IF NOT EXISTS message_queue (
id bigserial PRIMARY KEY,
message bytea
)"#,
)?;

// Insert a message to the message queue.
migrator.migrate(
"scheduler",
r#"CREATE OR REPLACE FUNCTION insert_message_v1 (
message bytea
) RETURNS SETOF message_queue AS $$
INSERT INTO message_queue (message)
VALUES (message)
RETURNING *;
$$ LANGUAGE SQL VOLATILE"#,
)?;

// Retrieve oldest message(s) from the front of the message queue.
// The bigserial id is used to determine the message order.
migrator.migrate("scheduler",
r#"CREATE OR REPLACE FUNCTION get_front_message_v1(max_rows int) RETURNS SETOF message_queue AS $$
SELECT * FROM message_queue
ORDER BY id ASC
LIMIT max_rows;
$$ LANGUAGE SQL VOLATILE"#)?;

// Delete a message from the message queue
migrator.migrate(
"scheduler",
r#"CREATE OR REPLACE FUNCTION delete_message_v1 (msg_id bigint) RETURNS void AS $$
DELETE FROM message_queue
WHERE id = msg_id;
$$ LANGUAGE SQL VOLATILE"#,
)?;

migrator.finish()?;

Ok(())
Expand Down Expand Up @@ -556,7 +602,7 @@ impl DataStore {
// No rows means this job might not be one we care about
if rows.is_empty() {
warn!("No project found for job id: {}", job.get_id());
return Err(Error::UnknownJobState);
return Err(Error::UnknownProjectState);
}

assert!(rows.len() == 1); // should never have more than one
Expand Down Expand Up @@ -608,4 +654,40 @@ impl DataStore {

Ok(groups)
}

pub fn enqueue_message(&self, msg: &protocol::net::Msg) -> Result<()> {
let conn = self.pool.get_shard(0)?;

let body = msg.write_to_bytes().map_err(Error::Protobuf)?;

conn.execute("SELECT FROM insert_message_v1($1)", &[&body])
.map_err(Error::MessageInsert)?;

Ok(())
}

pub fn peek_message(&self, count: i32) -> Result<Vec<(i64, protocol::net::Msg)>> {
let mut results = Vec::new();

let conn = self.pool.get_shard(0)?;
let rows = &conn.query("SELECT * FROM get_front_message_v1($1)", &[&count])
.map_err(Error::MessageGet)?;
for row in rows {
let id: i64 = row.get("id");
let body: Vec<u8> = row.get("message");
let msg: protocol::net::Msg = parse_from_bytes(&body).map_err(Error::Protobuf)?;
results.push((id, msg));
}

Ok(results)
}

pub fn delete_message(&self, id: i64) -> Result<()> {
let conn = self.pool.get_shard(0)?;

conn.execute("SELECT FROM delete_message_v1($1)", &[&id])
.map_err(Error::MessageDelete)?;

Ok(())
}
}
30 changes: 27 additions & 3 deletions components/builder-scheduler/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ pub enum Error {
GroupPending(postgres::error::Error),
GroupSetState(postgres::error::Error),
ProjectSetState(postgres::error::Error),
MessageInsert(postgres::error::Error),
MessageGet(postgres::error::Error),
MessageDelete(postgres::error::Error),
NetError(hab_net::Error),
ProtoNetError(protocol::net::NetError),
Protobuf(protobuf::ProtobufError),
UnknownGroup,
UnknownGroupState,
UnknownProjectState,
UnknownJobState,
UnknownJobState(protocol::jobsrv::Error),
UnknownPackage,
Zmq(zmq::Error),
ChannelCreate(depot_client::Error),
Expand Down Expand Up @@ -85,13 +88,31 @@ impl fmt::Display for Error {
Error::GroupPending(ref e) => format!("Database error getting pending group, {}", e),
Error::GroupSetState(ref e) => format!("Database error setting group state, {}", e),
Error::ProjectSetState(ref e) => format!("Database error setting project state, {}", e),
Error::MessageInsert(ref e) => {
format!(
"Database error inserting a message to the message queue, {}",
e
)
}
Error::MessageGet(ref e) => {
format!(
"Database error retrieving a message from the message queue, {}",
e
)
}
Error::MessageDelete(ref e) => {
format!(
"Database error deleting a message from the message queue, {}",
e
)
}
Error::NetError(ref e) => format!("{}", e),
Error::ProtoNetError(ref e) => format!("{}", e),
Error::Protobuf(ref e) => format!("{}", e),
Error::UnknownGroup => format!("Unknown Group"),
Error::UnknownGroupState => format!("Unknown Group State"),
Error::UnknownProjectState => format!("Unknown Project State"),
Error::UnknownJobState => format!("Unknown Job State"),
Error::UnknownJobState(ref e) => format!("{}", e),
Error::UnknownPackage => format!("Unknown Package"),
Error::Zmq(ref e) => format!("{}", e),
Error::ChannelCreate(ref e) => format!("{}", e),
Expand Down Expand Up @@ -120,13 +141,16 @@ impl error::Error for Error {
Error::GroupPending(ref err) => err.description(),
Error::GroupSetState(ref err) => err.description(),
Error::ProjectSetState(ref err) => err.description(),
Error::MessageInsert(ref err) => err.description(),
Error::MessageGet(ref err) => err.description(),
Error::MessageDelete(ref err) => err.description(),
Error::NetError(ref err) => err.description(),
Error::ProtoNetError(ref err) => err.description(),
Error::Protobuf(ref err) => err.description(),
Error::UnknownGroup => "Unknown Group",
Error::UnknownGroupState => "Unknown Group State",
Error::UnknownProjectState => "Unknown Project State",
Error::UnknownJobState => "Unknown Job State",
Error::UnknownJobState(ref err) => err.description(),
Error::UnknownPackage => "Unknown Package",
Error::Zmq(ref err) => err.description(),
Error::ChannelCreate(ref err) => err.description(),
Expand Down

0 comments on commit 1194cc1

Please sign in to comment.