Skip to content

Commit

Permalink
make get_session_by_id async
Browse files Browse the repository at this point in the history
  • Loading branch information
ariesdevil committed Mar 9, 2022
1 parent e81f7d3 commit 20450a9
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 5 deletions.
2 changes: 1 addition & 1 deletion query/src/api/rpc/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl FlightService for DatabendQueryFlightService {
FlightAction::CancelAction(action) => {
// We only destroy when session is exist
let session_id = action.query_id.clone();
if let Some(session) = self.sessions.get_session_by_id(&session_id) {
if let Some(session) = self.sessions.get_session_by_id(&session_id).await {
// TODO: remove streams
session.force_kill_session();
}
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_kill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl Interpreter for KillInterpreter {
.await?;

let id = &self.plan.id;
match self.ctx.get_session_by_id(id) {
match self.ctx.get_session_by_id(id).await {
None => Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
id
Expand Down
3 changes: 2 additions & 1 deletion query/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,12 @@ impl QueryContext {
}

// Get one session by session id.
pub fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<SessionRef> {
pub async fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<SessionRef> {
self.shared
.session
.get_session_manager()
.get_session_by_id(id)
.await
}

// Get all the processes list info.
Expand Down
4 changes: 2 additions & 2 deletions query/src/sessions/session_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ impl SessionManager {
}

#[allow(clippy::ptr_arg)]
pub fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<SessionRef> {
let sessions = futures::executor::block_on(self.active_sessions.read());
pub async fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<SessionRef> {
let sessions = self.active_sessions.read().await;
sessions
.get(id)
.map(|session| SessionRef::create(session.clone()))
Expand Down

0 comments on commit 20450a9

Please sign in to comment.