-
Notifications
You must be signed in to change notification settings - Fork 424
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
integrate gaia to persistent store (#440)
* add gaia port and configurations to graphscope-store charts * Add extra configs by set key-value pairs * add interface (#448) * add interface * [graphscope] add mapping of partition id to server id in GraphPartitionManager (#431) * [graphscope] add function of mapping partition id to server id in GraphPartitionManager Co-authored-by: siyuan0322 <siyuan0322@gmail.com> Co-authored-by: BingqingLyu <lv_bingqing@163.com>
- Loading branch information
1 parent
6eecccd
commit 809842a
Showing
42 changed files
with
741 additions
and
210 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
48 changes: 48 additions & 0 deletions
48
interactive_engine/src/executor/jna/src/executor/gaia/engine_ports_response.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
use std::os::raw::c_char; | ||
use std::ffi::CString; | ||
|
||
#[repr(C)] | ||
#[allow(non_snake_case)] | ||
pub struct EnginePortsResponse { | ||
success: bool, | ||
errMsg: *const c_char, | ||
enginePort: i32, | ||
rpcPort: i32, | ||
} | ||
|
||
impl EnginePortsResponse { | ||
pub fn new(engine_port: i32, rpc_port: i32) -> Box<EnginePortsResponse> { | ||
Box::new(EnginePortsResponse { | ||
success: true, | ||
errMsg: std::ptr::null(), | ||
enginePort: engine_port, | ||
rpcPort: rpc_port, | ||
}) | ||
} | ||
|
||
pub fn new_with_error(err_msg: &str) -> Box<EnginePortsResponse> { | ||
let msg = CString::new(err_msg).unwrap(); | ||
let response = EnginePortsResponse { | ||
success: false, | ||
errMsg: msg.as_ptr(), | ||
enginePort: 0, | ||
rpcPort: 0, | ||
}; | ||
::std::mem::forget(msg); | ||
Box::new(response) | ||
} | ||
} | ||
|
||
impl Drop for EnginePortsResponse { | ||
fn drop(&mut self) { | ||
unsafe { | ||
if !self.errMsg.is_null() { | ||
CString::from_raw(self.errMsg as *mut c_char); | ||
} | ||
} | ||
} | ||
} | ||
|
||
#[no_mangle] | ||
#[allow(non_snake_case)] | ||
pub extern fn dropEnginePortsResponse(_: Box<EnginePortsResponse>) {} |
85 changes: 85 additions & 0 deletions
85
interactive_engine/src/executor/jna/src/executor/gaia/gaia_library.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
use std::os::raw::{c_void, c_char}; | ||
use maxgraph_store::db::proto::common::ConfigPb; | ||
use maxgraph_store::db::common::bytes::util::parse_pb; | ||
use maxgraph_store::db::api::GraphConfigBuilder; | ||
use std::sync::Arc; | ||
use maxgraph_store::db::common::unsafe_util::to_mut; | ||
use maxgraph_store::db::graph::store::GraphStore; | ||
use std::ffi::CStr; | ||
use std::net::SocketAddr; | ||
use crate::executor::gaia::gaia_server::GaiaServer; | ||
use crate::executor::gaia::engine_ports_response::EnginePortsResponse; | ||
|
||
pub type EngineHandle = *const c_void; | ||
pub type GraphHandle = *const c_void; | ||
|
||
#[no_mangle] | ||
pub extern fn initialize(config_bytes: *const u8, len: usize) -> EngineHandle { | ||
let config_buf = unsafe { ::std::slice::from_raw_parts(config_bytes, len) }; | ||
let config_pb = parse_pb::<ConfigPb>(config_buf).expect("parse config pb failed"); | ||
let mut config_builder = GraphConfigBuilder::new(); | ||
config_builder.set_storage_options(config_pb.get_configs().clone()); | ||
let config = Arc::new(config_builder.build()); | ||
let handle = Box::new(GaiaServer::new(config)); | ||
Box::into_raw(handle) as EngineHandle | ||
} | ||
|
||
#[no_mangle] | ||
pub extern fn addPartition(engine_handle: EngineHandle, partition_id: i32, graph_handle: GraphHandle) { | ||
let engine_ptr = unsafe { | ||
to_mut(&*(engine_handle as *const GaiaServer)) | ||
}; | ||
let graph_ptr = unsafe { | ||
Arc::from_raw(&*(graph_handle as *const GraphStore)) | ||
}; | ||
engine_ptr.add_partition(partition_id as u32, graph_ptr); | ||
} | ||
|
||
#[no_mangle] | ||
pub extern fn updatePartitionRouting(engine_handle: EngineHandle, partition_id: i32, server_id: i32) { | ||
let engine_ptr = unsafe { | ||
to_mut(&*(engine_handle as *const GaiaServer)) | ||
}; | ||
engine_ptr.update_partition_routing(partition_id as u32, server_id as u32); | ||
} | ||
|
||
#[no_mangle] | ||
pub extern fn startEngine(engine_handle: EngineHandle) -> Box<EnginePortsResponse> { | ||
let engine_ptr = unsafe { | ||
to_mut(&*(engine_handle as *const GaiaServer)) | ||
}; | ||
match engine_ptr.start() { | ||
Ok((engine_port, server_port)) => { | ||
EnginePortsResponse::new(engine_port as i32, server_port as i32) | ||
}, | ||
Err(e) => { | ||
let msg = format!("{:?}", e); | ||
EnginePortsResponse::new_with_error(&msg) | ||
}, | ||
} | ||
} | ||
|
||
#[no_mangle] | ||
pub extern fn stopEngine(engine_handle: EngineHandle) { | ||
let engine_ptr = unsafe { | ||
to_mut(&*(engine_handle as *const GaiaServer)) | ||
}; | ||
engine_ptr.stop(); | ||
} | ||
|
||
#[no_mangle] | ||
pub extern fn updatePeerView(engine_handle: EngineHandle, peer_view_string_raw: *const c_char) { | ||
let slice = unsafe { CStr::from_ptr(peer_view_string_raw) }.to_bytes(); | ||
let peer_view_string = std::str::from_utf8(slice).unwrap(); | ||
let peer_view = peer_view_string.split(",").map(|item| { | ||
let mut fields = item.split("#"); | ||
let id = fields.next().unwrap().parse::<u64>().unwrap(); | ||
let host = fields.next().unwrap().parse().unwrap(); | ||
let port = fields.next().unwrap().parse().unwrap(); | ||
(id, SocketAddr::new(host, port)) | ||
}).collect::<Vec<(u64, SocketAddr)>>(); | ||
let engine_ptr = unsafe { | ||
to_mut(&*(engine_handle as *const GaiaServer)) | ||
}; | ||
engine_ptr.update_peer_view(peer_view); | ||
} |
Oops, something went wrong.