Skip to content

Commit

Permalink
Better async job status handling
Browse files Browse the repository at this point in the history
Signed-off-by: Salim Alam <salam@chef.io>
  • Loading branch information
chefsalim committed Aug 1, 2017
1 parent 0fd19b5 commit 574ce5c
Show file tree
Hide file tree
Showing 6 changed files with 314 additions and 118 deletions.
36 changes: 17 additions & 19 deletions components/builder-jobsrv/src/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,34 +474,32 @@ impl DataStore {
pub fn update_job(&self, job: &jobsrv::Job) -> Result<()> {
let conn = self.pool.get_shard(0)?;
let job_id = job.get_id() as i64;
let job_state = match job.get_state() {
jobsrv::JobState::Dispatched => "Dispatched",
jobsrv::JobState::Pending => "Pending",
jobsrv::JobState::Processing => "Processing",
jobsrv::JobState::Complete => "Complete",
jobsrv::JobState::Rejected => "Rejected",
jobsrv::JobState::Failed => "Failed",
};
let job_state = job.get_state().to_string();

// Note: the following fields may all be NULL. As currently
// coded, if they are NULL, then the corresponding fields in
// the database will also be updated to be NULL. This should
// be OK, though, because they shouldn't be changing anyway.
let build_started_at = match job.has_build_started_at() {
true => Some(
let build_started_at = if job.has_build_started_at() {
Some(
DateTime::<UTC>::from_str(job.get_build_started_at()).unwrap(),
),
false => None,
)
} else {
None
};
let build_finished_at = match job.has_build_finished_at() {
true => Some(

let build_finished_at = if job.has_build_finished_at() {
Some(
DateTime::<UTC>::from_str(job.get_build_finished_at()).unwrap(),
),
false => None,
)
} else {
None
};
let ident = match job.has_package_ident() {
true => Some(job.get_package_ident().to_string()),
false => None,

let ident = if job.has_package_ident() {
Some(job.get_package_ident().to_string())
} else {
None
};

let (err_code, err_msg) = if job.has_error() {
Expand Down
33 changes: 33 additions & 0 deletions components/builder-protocol/src/jobsrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use serde::{Serialize, Serializer};
use sharding::InstaId;
use std::result;
use std::str::FromStr;
use std::fmt;
use std::error;

pub use message::jobsrv::*;

Expand All @@ -28,6 +30,23 @@ pub enum Error {
BadJobState,
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let msg = match *self {
Error::BadJobState => "Bad Job State",
};
write!(f, "{}", msg)
}
}

impl error::Error for Error {
fn description(&self) -> &str {
match *self {
Error::BadJobState => "Job state cannot be parsed",
}
}
}

impl Into<Job> for JobSpec {
fn into(mut self) -> Job {
let mut job = Job::new();
Expand Down Expand Up @@ -234,6 +253,20 @@ impl FromStr for JobState {
}
}

impl fmt::Display for JobState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let value = match *self {
JobState::Dispatched => "Dispatched",
JobState::Pending => "Pending",
JobState::Processing => "Processing",
JobState::Complete => "Complete",
JobState::Rejected => "Rejected",
JobState::Failed => "Failed",
};
write!(f, "{}", value)
}
}

impl Persistable for Job {
type Key = u64;

Expand Down
185 changes: 183 additions & 2 deletions components/builder-scheduler/src/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ use rand::{Rng, thread_rng};
use chrono::{DateTime, UTC};

use protocol::jobsrv::{Job, JobState};
use protocol::originsrv::OriginProject;
use protocol::scheduler::*;
use protobuf::RepeatedField;
use protocol::net::{NetError, ErrCode};
use protobuf::{RepeatedField, ProtobufEnum};
use std::str::FromStr;

// DataStore inherits Send + Sync by virtue of having only one member, the pool itself.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -262,6 +265,65 @@ impl DataStore {
r#"CREATE OR REPLACE FUNCTION set_project_state_ident_v1 (pid bigint, jid bigint, state text, ident text) RETURNS void AS $$
UPDATE projects SET project_state=state, job_id=jid, project_ident=ident, updated_at=now() WHERE id=pid;
$$ LANGUAGE SQL VOLATILE"#)?;

// The job status table
migrator.migrate(
"scheduler",
r#"CREATE TABLE IF NOT EXISTS job_status (
id bigserial PRIMARY KEY,
job_id bigint,
owner_id bigint,
job_state text,
project_name text,
net_error_code int,
net_error_msg text,
build_started_at timestamptz,
build_finished_at timestamptz,
created_at timestamptz DEFAULT now()
)"#,
)?;

// Insert a new job status
migrator.migrate("scheduler",
r#"CREATE OR REPLACE FUNCTION insert_job_status_v1 (
job_id bigint,
owner_id bigint,
job_state text,
project_name text,
net_error_code int,
net_error_msg text,
build_started_at timestamptz,
build_finished_at timestamptz
) RETURNS SETOF job_status AS $$
BEGIN
RETURN QUERY INSERT INTO job_status (job_id, owner_id, job_state, project_name, net_error_code, net_error_msg, build_started_at, build_finished_at)
VALUES (job_id, owner_id, job_state, project_name, net_error_code, net_error_msg, build_started_at, build_finished_at)
RETURNING *;
RETURN;
END
$$ LANGUAGE plpgsql VOLATILE
"#)?;

migrator.migrate("scheduler",
r#"CREATE OR REPLACE FUNCTION get_front_job_status_v1(max_rows int) RETURNS SETOF job_status AS $$
BEGIN
RETURN QUERY SELECT * FROM job_status
ORDER BY created_at ASC
LIMIT max_rows;
RETURN;
END
$$ LANGUAGE plpgsql VOLATILE"#)?;

migrator.migrate(
"scheduler",
r#"CREATE OR REPLACE FUNCTION delete_job_status_v1 (status_id int) RETURNS void AS $$
BEGIN
DELETE FROM job_status
WHERE id = status_id;
END
$$ LANGUAGE plpgsql VOLATILE"#,
)?;

migrator.finish()?;

Ok(())
Expand Down Expand Up @@ -534,7 +596,7 @@ impl DataStore {
// No rows means this job might not be one we care about
if rows.is_empty() {
warn!("No project found for job id: {}", job.get_id());
return Err(Error::UnknownJobState);
return Err(Error::UnknownProjectState);
}

assert!(rows.len() == 1); // should never have more than one
Expand Down Expand Up @@ -586,4 +648,123 @@ impl DataStore {

Ok(groups)
}

pub fn create_job_status(&self, msg: &JobStatus) -> Result<()> {
let conn = self.pool.get_shard(0)?;
let job = msg.get_job();

let job_id = job.get_id() as i64;
let owner_id = job.get_owner_id() as i64;

let job_state = job.get_state().to_string();

let project_name = job.get_project().get_name();

let (err_code, err_msg) = if job.has_error() {
(
Some(job.get_error().get_code() as i32),
Some(job.get_error().get_msg()),
)
} else {
(None, None)
};

let build_started_at = if job.has_build_started_at() {
Some(
DateTime::<UTC>::from_str(job.get_build_started_at()).unwrap(),
)
} else {
None
};

let build_finished_at = if job.has_build_finished_at() {
Some(
DateTime::<UTC>::from_str(job.get_build_finished_at()).unwrap(),
)
} else {
None
};

conn.execute(
"SELECT FROM insert_job_status_v1($1, $2, $3, $4, $5, $6, $7, $8)",
&[
&job_id,
&owner_id,
&job_state,
&project_name,
&err_code,
&err_msg,
&build_started_at,
&build_finished_at,
],
).map_err(Error::JobStatusInsert)?;

Ok(())
}

pub fn get_front_job_status(&self, count: i32) -> Result<Vec<(i64, Job)>> {
let mut jobs = Vec::new();

let conn = self.pool.get_shard(0)?;
let rows = &conn.query("SELECT * FROM get_front_job_status_v1($1)", &[&count])
.map_err(Error::JobStatusGet)?;
for row in rows {
let status_id: i64 = row.get("id");
let job = row_to_job(&row)?;
jobs.push((status_id, job));
}

Ok(jobs)
}

pub fn delete_job_status(&self, status_id: i64) -> Result<()> {
let conn = self.pool.get_shard(0)?;
conn.execute("SELECT FROM delete_job_status_v1($1)", &[&status_id])
.map_err(Error::JobStatusDelete)?;

Ok(())
}
}

// Note - this an abbreviated version of a Job - it only sets fields we care about
// for status in the scheduler. TODO (SA): refactor into a different type.
fn row_to_job(row: &postgres::rows::Row) -> Result<Job> {
let mut job = Job::new();
let id: i64 = row.get("job_id");
job.set_id(id as u64);
let owner_id: i64 = row.get("owner_id");
job.set_owner_id(owner_id as u64);

let js: String = row.get("job_state");
let job_state = JobState::from_str(&js).map_err(Error::UnknownJobState)?;
job.set_state(job_state);

if let Some(Ok(start)) = row.get_opt::<&str, DateTime<UTC>>("build_started_at") {
job.set_build_started_at(start.to_rfc3339());
}
if let Some(Ok(stop)) = row.get_opt::<&str, DateTime<UTC>>("build_finished_at") {
job.set_build_finished_at(stop.to_rfc3339());
}

let mut project = OriginProject::new();
let name: String = row.get("project_name");
let name_for_split = name.clone();
let name_split: Vec<&str> = name_for_split.split("/").collect();
project.set_origin_name(name_split[0].to_string());
project.set_package_name(name_split[1].to_string());
project.set_name(name);
job.set_project(project);

if let Some(Ok(err_msg)) = row.get_opt::<&str, String>("net_error_msg") {
let err_code: i32 = row.get("net_error_code");
let mut err = NetError::new();

if let Some(net_err_code) = ErrCode::from_i32(err_code) {
err.set_code(net_err_code);
err.set_msg(err_msg);
job.set_error(err);
}
}

Ok(job)
}
17 changes: 14 additions & 3 deletions components/builder-scheduler/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ pub enum Error {
GroupPending(postgres::error::Error),
GroupSetState(postgres::error::Error),
ProjectSetState(postgres::error::Error),
JobStatusInsert(postgres::error::Error),
JobStatusGet(postgres::error::Error),
JobStatusDelete(postgres::error::Error),
NetError(hab_net::Error),
ProtoNetError(protocol::net::NetError),
Protobuf(protobuf::ProtobufError),
UnknownGroup,
UnknownGroupState,
UnknownProjectState,
UnknownJobState,
UnknownJobState(protocol::jobsrv::Error),
UnknownPackage,
Zmq(zmq::Error),
ChannelCreate(depot_client::Error),
Expand Down Expand Up @@ -85,13 +88,18 @@ impl fmt::Display for Error {
Error::GroupPending(ref e) => format!("Database error getting pending group, {}", e),
Error::GroupSetState(ref e) => format!("Database error setting group state, {}", e),
Error::ProjectSetState(ref e) => format!("Database error setting project state, {}", e),
Error::JobStatusInsert(ref e) => {
format!("Database error inserting a new job status, {}", e)
}
Error::JobStatusGet(ref e) => format!("Database error retrieving a job status, {}", e),
Error::JobStatusDelete(ref e) => format!("Database error deleting a job status, {}", e),
Error::NetError(ref e) => format!("{}", e),
Error::ProtoNetError(ref e) => format!("{}", e),
Error::Protobuf(ref e) => format!("{}", e),
Error::UnknownGroup => format!("Unknown Group"),
Error::UnknownGroupState => format!("Unknown Group State"),
Error::UnknownProjectState => format!("Unknown Project State"),
Error::UnknownJobState => format!("Unknown Job State"),
Error::UnknownJobState(ref e) => format!("{}", e),
Error::UnknownPackage => format!("Unknown Package"),
Error::Zmq(ref e) => format!("{}", e),
Error::ChannelCreate(ref e) => format!("{}", e),
Expand Down Expand Up @@ -120,13 +128,16 @@ impl error::Error for Error {
Error::GroupPending(ref err) => err.description(),
Error::GroupSetState(ref err) => err.description(),
Error::ProjectSetState(ref err) => err.description(),
Error::JobStatusInsert(ref err) => err.description(),
Error::JobStatusGet(ref err) => err.description(),
Error::JobStatusDelete(ref err) => err.description(),
Error::NetError(ref err) => err.description(),
Error::ProtoNetError(ref err) => err.description(),
Error::Protobuf(ref err) => err.description(),
Error::UnknownGroup => "Unknown Group",
Error::UnknownGroupState => "Unknown Group State",
Error::UnknownProjectState => "Unknown Project State",
Error::UnknownJobState => "Unknown Job State",
Error::UnknownJobState(ref err) => err.description(),
Error::UnknownPackage => "Unknown Package",
Error::Zmq(ref err) => err.description(),
Error::ChannelCreate(ref err) => err.description(),
Expand Down
10 changes: 4 additions & 6 deletions components/builder-scheduler/src/server/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub fn group_create(
new_group
} else {
let new_group = state.datastore().create_group(&msg, projects)?;
state.schedule_cli().notify_work()?;
state.schedule_cli().notify()?;
new_group
};

Expand Down Expand Up @@ -220,11 +220,9 @@ pub fn job_status(
let msg: proto::JobStatus = req.parse_msg()?;
debug!("job_status message: {:?}", msg);

// TODO BUG: SA There is a potential race condition here where the job status can get lost
// if the process goes away (for whatever reason) before the status gets processed by
// the scheduler thread. We can fix it by persisting the status and then handing it
// asynchronously, or by making the status update handling synchronous.
state.schedule_cli().notify_status(&msg.get_job())?;
state.datastore().create_job_status(&msg)?;

state.schedule_cli().notify()?;

req.reply_complete(sock, &msg)?;
Ok(())
Expand Down

0 comments on commit 574ce5c

Please sign in to comment.