Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Activity Event API #850

Merged
merged 1 commit into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,8 @@ ya-sb-router = { path = "service-bus/router" }
ya-sb-util = { path = "service-bus/util" }

## CLIENT
ya-client = { git = "https://github.com/golemfactory/ya-client.git", rev = "e09070085f9f6b9f681a4fbc0ce039abe5211365"}
ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "e09070085f9f6b9f681a4fbc0ce039abe5211365"}

#ya-client = { path = "../ya-client" }
#ya-client-model = { path = "../ya-client/model" }
ya-client = { git = "https://github.com/golemfactory/ya-client.git", rev = "8f325e83a9ad88265456dcd5e97148f8b17b8c7a"}
ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "8f325e83a9ad88265456dcd5e97148f8b17b8c7a"}

## OTHERS
gftp = { path = "core/gftp" }
Expand Down
131 changes: 62 additions & 69 deletions agent/provider/src/execution/task_runner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use actix::prelude::*;
use anyhow::{anyhow, bail, Error, Result};
use chrono::{DateTime, Utc};
use derive_more::Display;
use futures::future::join_all;
use futures::{FutureExt, TryFutureExt};
Expand All @@ -15,6 +16,7 @@ use structopt::StructOpt;

use ya_agreement_utils::{AgreementView, OfferTemplate};
use ya_client::activity::ActivityProviderApi;
use ya_client_model::activity::provider_event::ProviderEventType;
use ya_client_model::activity::{ActivityState, ProviderEvent, State};
use ya_core_model::activity;
use ya_utils_actix::actix_handler::ResultTypeGetter;
Expand Down Expand Up @@ -122,6 +124,8 @@ pub struct TaskRunnerConfig {
pub process_termination_timeout: Duration,
#[structopt(long, env, parse(try_from_str = humantime::parse_duration), default_value = "10s")]
pub exeunit_state_retry_interval: Duration,
#[structopt(skip = "you-forgot-to-set-session-id")]
pub session_id: String,
}

// =========================================== //
Expand All @@ -144,6 +148,7 @@ pub struct TaskRunner {

config: Arc<TaskRunnerConfig>,

event_ts: DateTime<Utc>,
tasks_dir: PathBuf,
cache_dir: PathBuf,
}
Expand Down Expand Up @@ -192,6 +197,7 @@ impl TaskRunner {
activity_created: SignalSlot::<ActivityCreated>::new(),
activity_destroyed: SignalSlot::<ActivityDestroyed>::new(),
config: Arc::new(config),
event_ts: Utc::now(),
tasks_dir,
cache_dir,
})
Expand All @@ -205,25 +211,11 @@ impl TaskRunner {
self.registry.find_exeunit(&msg.name)
}

pub async fn collect_events(
client: Arc<ActivityProviderApi>,
addr: Addr<TaskRunner>,
) -> Result<()> {
match TaskRunner::query_events(client).await {
Err(error) => log::error!("Can't query activity events. Error: {:?}", error),
Ok(activity_events) => {
TaskRunner::dispatch_events(&activity_events, addr).await;
}
}

Ok(())
}

// =========================================== //
// TaskRunner internals - events dispatching
// =========================================== //

async fn dispatch_events(events: &Vec<ProviderEvent>, myself: Addr<TaskRunner>) {
async fn dispatch_events(events: Vec<ProviderEvent>, myself: &Addr<TaskRunner>) {
if events.len() == 0 {
return;
};
Expand All @@ -235,26 +227,22 @@ impl TaskRunner {
.into_iter()
.zip(iter::repeat(myself))
.map(|(event, myself)| async move {
let _ = match event {
ProviderEvent::CreateActivity {
activity_id,
agreement_id,
requestor_pub_key,
} => {
let _ = match event.event_type {
ProviderEventType::CreateActivity { requestor_pub_key } => {
myself
.send(CreateActivity::new(
activity_id,
agreement_id,
requestor_pub_key.as_ref(),
))
.send(CreateActivity {
activity_id: event.activity_id,
agreement_id: event.agreement_id,
requestor_pub_key,
})
.await?
}
ProviderEvent::DestroyActivity {
activity_id,
agreement_id,
} => {
ProviderEventType::DestroyActivity {} => {
myself
.send(DestroyActivity::new(activity_id, agreement_id))
.send(DestroyActivity {
activity_id: event.activity_id,
agreement_id: event.agreement_id,
})
.await?
}
}
Expand All @@ -266,10 +254,6 @@ impl TaskRunner {
let _ = join_all(futures).await;
}

async fn query_events(client: Arc<ActivityProviderApi>) -> Result<Vec<ProviderEvent>> {
Ok(client.get_activity_events(Some(3.), None).await?)
}

// =========================================== //
// TaskRunner internals - activity reactions
// =========================================== //
Expand Down Expand Up @@ -552,7 +536,10 @@ async fn remove_remaining_tasks(
agreement_id,
);
myself
.send(DestroyActivity::new(&activity_id, &agreement_id))
.send(DestroyActivity {
activity_id: activity_id.clone(),
agreement_id,
})
.await
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -609,13 +596,43 @@ forward_actix_handler!(
impl Handler<UpdateActivity> for TaskRunner {
type Result = ActorResponse<Self, (), Error>;

fn handle(&mut self, _msg: UpdateActivity, ctx: &mut Context<Self>) -> Self::Result {
let client = self.api.clone();
fn handle(&mut self, _: UpdateActivity, ctx: &mut Context<Self>) -> Self::Result {
let addr = ctx.address();
let client = self.api.clone();

let mut event_ts = self.event_ts.clone();
let app_session_id = self.config.session_id.clone();
let poll_timeout = Duration::from_secs(3);

let fut = async move {
let result = client
.get_activity_events(
Some(event_ts.clone()),
Some(app_session_id),
Some(poll_timeout),
None,
)
.await;

match result {
Ok(events) => {
events
.iter()
.max_by_key(|e| e.event_date)
.map(|e| event_ts = event_ts.max(e.event_date));
Self::dispatch_events(events, &addr).await;
}
Err(error) => log::error!("Can't query activity events: {:?}", error),
};
event_ts
}
.into_actor(self)
.map(|event_ts, actor, _| {
actor.event_ts = actor.event_ts.max(event_ts);
Ok(())
});

ActorResponse::r#async(
async move { TaskRunner::collect_events(client, addr).await }.into_actor(self),
)
ActorResponse::r#async(fut)
}
}

Expand Down Expand Up @@ -741,7 +758,10 @@ impl Handler<Shutdown> for TaskRunner {
let fut = async move {
for (activity_id, agreement_id) in ids {
if let Err(e) = addr
.send(DestroyActivity::new(&activity_id, &agreement_id))
.send(DestroyActivity {
activity_id,
agreement_id,
})
.await?
{
log::error!("Unable to destroy activity: {}", e);
Expand All @@ -753,30 +773,3 @@ impl Handler<Shutdown> for TaskRunner {
ActorResponse::r#async(fut.into_actor(self))
}
}

// =========================================== //
// Messages creation
// =========================================== //

impl CreateActivity {
pub fn new<S: AsRef<str>>(
activity_id: S,
agreement_id: S,
requestor_pub_key: Option<S>,
) -> CreateActivity {
CreateActivity {
activity_id: activity_id.as_ref().to_string(),
agreement_id: agreement_id.as_ref().to_string(),
requestor_pub_key: requestor_pub_key.map(|s| s.as_ref().to_string()),
}
}
}

impl DestroyActivity {
pub fn new(activity_id: &str, agreement_id: &str) -> DestroyActivity {
DestroyActivity {
activity_id: activity_id.to_string(),
agreement_id: agreement_id.to_string(),
}
}
}
6 changes: 2 additions & 4 deletions agent/provider/src/market/provider_market.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,7 @@ impl ProviderMarket {
) -> Result<()> {
log::info!("Got approved agreement [{}].", msg.agreement.agreement_id,);
// At this moment we only forward agreement to outside world.
self.agreement_signed_signal.send_signal(AgreementApproved {
agreement: msg.agreement,
})
self.agreement_signed_signal.send_signal(msg)
}

// =========================================== //
Expand Down Expand Up @@ -351,7 +349,7 @@ async fn process_proposal(
ProposalResponse::IgnoreProposal => log::info!("Ignoring proposal {:?}", proposal_id),
ProposalResponse::RejectProposal { reason } => {
ctx.api
.reject_proposal_with_reason(&subscription.id, proposal_id, &reason)
.reject_proposal(&subscription.id, proposal_id, &reason)
.await?;
}
},
Expand Down
4 changes: 3 additions & 1 deletion agent/provider/src/provider_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ impl ProviderAgent {
.node_name
.clone()
.unwrap_or("provider".to_string());
args.market.session_id = format!("{}-[{}]", name, std::process::id());
let session_id = format!("{}-[{}]", name, std::process::id());
args.market.session_id = session_id.clone();
args.runner.session_id = session_id;

let mut globals = GlobalsManager::try_new(&config.globals_file, args.node)?;
globals.spawn_monitor(&config.globals_file)?;
Expand Down
17 changes: 17 additions & 0 deletions core/activity/migrations/2020-12-03-092313_events/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
CREATE TABLE activity_event_migrate (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
activity_id INTEGER NOT NULL,
identity_id VARCHAR(50) NOT NULL,
event_date DATETIME NOT NULL,
event_type_id INTEGER NOT NULL,
requestor_pub_key BLOB,
FOREIGN KEY(activity_id) REFERENCES activity (id),
FOREIGN KEY(event_type_id) REFERENCES activity_event_type (id)
);

INSERT INTO activity_event_migrate(activity_id, identity_id, event_date, event_type_id, requestor_pub_key)
SELECT activity_id, identity_id, event_date, event_type_id, requestor_pub_key
FROM activity_event;

DROP TABLE activity_event;
ALTER TABLE activity_event_migrate RENAME TO activity_event;
4 changes: 4 additions & 0 deletions core/activity/migrations/2020-12-03-092313_events/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE activity_event ADD COLUMN app_session_id VARCHAR(100);

CREATE INDEX idx_app_session_id
ON activity_event(app_session_id);
2 changes: 1 addition & 1 deletion core/activity/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub fn web_scope(db: &DbExecutor) -> Scope {
mod common {
use actix_web::{web, Responder};

use ya_core_model::{activity, market::Role};
use ya_core_model::{activity, Role};
use ya_persistence::executor::DbExecutor;
use ya_service_api_web::middleware::Identity;
use ya_service_bus::{timeout::IntoTimeoutFuture, RpcEndpoint};
Expand Down
Loading