Skip to content

Commit

Permalink
[GIE-IR] Upgrade Gaia-IR with the new Pegasus Client/Service APIs (#1648
Browse files Browse the repository at this point in the history
)

* [IR Service] init by moving service codes into ir

* [WIP] impl JobAssembly for IRJobAssembly

* [WIP] move client codes into IR, and refine service proto in IR

* [WIP] remove server crate from IR, instead, add job_service.proto only

* [WIP] remove client codes from IR, instead, add job_builder in runtime

* [Service] refine all CI tests

* [IR Service] move job_plan.proto, service client back into Pegasus

* [IR Service] update pegasus java client

* [IR Service] add server_config for start_rpc_service

* [IR Service] update ir compiler according to changes with pegasus client

* [IR Compiler] format java codes

* [IR Runtime] minor refine in SinkOp

* [IR Runtime] start pegasus servers on Groot/Vineyard

* [CI Test] start_rpc_server in the new way

Co-authored-by: shirly121 <zxlmillie@163.com>
Co-authored-by: Longbin Lai <longbin.lailb@alibaba-inc.com>
  • Loading branch information
3 people committed Jun 2, 2022
1 parent dc256c1 commit ce91b0c
Show file tree
Hide file tree
Showing 51 changed files with 658 additions and 869 deletions.
2 changes: 1 addition & 1 deletion interactive_engine/executor/ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ maxgraph-server = { path = "../server" }
pegasus = { path = "../Pegasus", package = "pegasus" }
gaia_pegasus = { path = "../../../research/engine/pegasus/pegasus", package = "pegasus" }
pegasus_network = { path = "../../../research/engine/pegasus/network" }
pegasus_server = { path = "../../../research/engine/pegasus/server-v0" }
pegasus_server = { path = "../../../research/engine/pegasus/server" }
graph_proxy = { path = "../../../research/query_service/ir/graph_proxy" }
itertools = "0.7.8"
log = "0.3"
Expand Down
114 changes: 85 additions & 29 deletions interactive_engine/executor/ffi/src/executor/gaia/gaia_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@
//! limitations under the License.
//!

use maxgraph_store::db::api::{GraphConfig, GraphResult, GraphError};
use std::sync::Arc;
use maxgraph_store::db::api::{GraphConfig, GraphResult};
use std::sync::{Arc, Mutex};
use maxgraph_store::db::graph::store::GraphStore;
use std::net::SocketAddr;
use maxgraph_runtime::store::groot::global_graph::GlobalGraph;
use gaia_pegasus::Configuration as GaiaConfig;
use maxgraph_store::db::api::GraphErrorCode::EngineError;
use tokio::runtime::Runtime;
use pegasus_server::service::Service;
use pegasus_server::rpc::{start_rpc_server, RpcService};
use pegasus_server::rpc::{RPCServerConfig, ServiceStartListener, start_all};
use pegasus_network::SimpleServerDetector;
use pegasus_network::config::{NetworkConfig, ServerAddr};
use graph_proxy::{InitializeJobCompiler, QueryMaxGraph};
use maxgraph_store::api::PartitionId;
use std::thread;
use std::time::Duration;

pub struct GaiaServer {
config: Arc<GraphConfig>,
Expand Down Expand Up @@ -56,34 +56,31 @@ impl GaiaServer {
Arc::get_mut(&mut self.graph).unwrap().update_partition_routing(partition_id, worker_id);
}

pub fn start(&self) -> GraphResult<(u16, u16)> {
let report = match self.config.get_storage_option("gaia.report") {
None => false,
Some(report_string) => report_string.parse()
.map_err(|e| GraphError::new(EngineError, format!("{:?}", e)))?,
};
let rpc_port = match self.config.get_storage_option("gaia.rpc.port") {
None => { 0 },
Some(server_port_string) => {
server_port_string.parse().map_err(|e| GraphError::new(EngineError, format!("{:?}", e)))?
},
};
let addr = format!("{}:{}", "0.0.0.0", rpc_port).parse()
.map_err(|e| GraphError::new(EngineError, format!("{:?}", e)))?;
pub fn start(&'static self) -> GraphResult<(u16, u16)> {
let gaia_config = make_gaia_config(self.config.clone());
let socket_addr = gaia_pegasus::startup_with(gaia_config, self.detector.clone())
.map_err(|e| GraphError::new(EngineError, format!("{:?}", e)))?
.ok_or(GraphError::new(EngineError, "gaia engine return None addr".to_string()))?;

let rpc_port = self.rpc_runtime.block_on(async{
let gaia_rpc_config = make_gaia_rpc_config(self.config.clone());
let (server_port, rpc_port) = self.rpc_runtime.block_on(async {
let query_maxgraph = QueryMaxGraph::new(self.graph.clone(), self.graph.clone());
let job_compiler = query_maxgraph.initialize_job_compiler();
let service = Service::new(job_compiler);
let rpc_service = RpcService::new(service, report);
let local_addr = start_rpc_server(addr, rpc_service, false).await.unwrap();
local_addr.port()
let service_listener = GaiaServiceListener::default();
let service_listener_clone = service_listener.clone();
self.rpc_runtime.spawn(async move {start_all(
gaia_rpc_config,
gaia_config,
job_compiler,
self.detector.clone(),
service_listener_clone,
).await.unwrap()});
loop {
if service_listener.get_server_port().is_some() && service_listener.get_rpc_port().is_some() {
break;
} else {
thread::sleep(Duration::from_millis(10));
}
}
(service_listener.get_server_port().unwrap(),service_listener.get_rpc_port().unwrap())
});
Ok((socket_addr.port(), rpc_port))
Ok((server_port, rpc_port))
}

pub fn update_peer_view(&self, peer_view: Vec<(u64, SocketAddr)>) {
Expand Down Expand Up @@ -139,3 +136,62 @@ fn make_gaia_config(graph_config: Arc<GraphConfig>) -> GaiaConfig {
}
}

fn make_gaia_rpc_config(graph_config: Arc<GraphConfig>) -> RPCServerConfig {
let rpc_port = match graph_config.get_storage_option("gaia.rpc.port") {
None => { 0 },
Some(server_port_string) => {
server_port_string.parse().expect("parse node.idx failed")
},
};
RPCServerConfig::new(Some("0.0.0.0".to_string()), Some(rpc_port))
}


#[derive(Default)]
struct GaiaServiceListener {
rpc_addr: Arc<Mutex<Option<SocketAddr>>>,
server_addr: Arc<Mutex<Option<SocketAddr>>>,
}

impl Clone for GaiaServiceListener {
fn clone(&self) -> Self {
GaiaServiceListener {
rpc_addr: self.rpc_addr.clone(),
server_addr: self.server_addr.clone()
}
}
}

impl GaiaServiceListener {
fn new() -> Self {
GaiaServiceListener { rpc_addr: Arc::new(Mutex::new(None)), server_addr: Arc::new(Mutex::new(None)) }
}
fn get_rpc_port(&self) -> Option<u16> {
self.rpc_addr.lock().unwrap().map(|addr|addr.port())
}
fn get_server_port(&self) -> Option<u16> {
self.server_addr.lock().unwrap().map(|addr|addr.port())
}
}

impl ServiceStartListener for GaiaServiceListener {
fn on_rpc_start(&mut self, server_id: u64, addr: SocketAddr) -> std::io::Result<()> {
info!("RPC server of server[{}] start on {}", server_id, addr);
let mut rpc_addr = self
.rpc_addr
.lock()
.map_err(|e|std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
*rpc_addr = Some(addr);
Ok(())
}

fn on_server_start(&mut self, server_id: u64, addr: SocketAddr) -> std::io::Result<()> {
info!("compute server[{}] start on {}", server_id, addr);
let mut server_addr = self
.server_addr
.lock()
.map_err(|e|std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
*server_addr = Some(addr);
Ok(())
}
}
2 changes: 1 addition & 1 deletion interactive_engine/executor/gaia_runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ tokio = { version = "1.0", features = ["macros", "sync"] }
futures = { version = "0.3.0", features = ["thread-pool"] }
gaia_pegasus = { path = "../../../research/engine/pegasus/pegasus", package = "pegasus" }
pegasus_network = { path = "../../../research/engine/pegasus/network" }
pegasus_server = { path = "../../../research/engine/pegasus/server-v0" }
pegasus_server = { path = "../../../research/engine/pegasus/server" }
graph_proxy = { path = "../../../research/query_service/ir/graph_proxy" }

[dev-dependencies]
Expand Down
95 changes: 72 additions & 23 deletions interactive_engine/executor/gaia_runtime/src/bin/gaia_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ use maxgraph_server::StoreContext;
use maxgraph_store::api::graph_partition::GraphPartitionManager;
use maxgraph_store::api::prelude::*;
use maxgraph_store::config::{StoreConfig, VINEYARD_GRAPH};
use pegasus_server::rpc::{start_rpc_server, RpcService};
use pegasus_server::service::Service;
use pegasus_server::rpc::{RPCServerConfig, ServiceStartListener, start_rpc_server};
use protobuf::Message;
use std::collections::HashMap;
use std::env;
Expand All @@ -59,6 +58,7 @@ use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::Duration;
use tokio::runtime::Runtime;
use std::net::SocketAddr;

fn main() {
if let Some(_) = env::args().find(|arg| arg == "--show-build-info") {
Expand Down Expand Up @@ -129,7 +129,7 @@ fn run_main<V, VI, E, EI>(
store_config.clone(),
Box::new(recover_prepare),
)
.unwrap();
.unwrap();

let gaia_service = GaiaService::new(
store_config.clone(),
Expand All @@ -138,7 +138,7 @@ fn run_main<V, VI, E, EI>(
partition_worker_mapping,
worker_partition_list_mapping,
);
let (_, gaia_rpc_service_port) = gaia_service.start_rpc_service();
let gaia_rpc_service_port = gaia_service.start_rpc_service();
let store_context = StoreContext::new(graph, partition_manager);
start_hb_rpc_service(
runtime_info_clone,
Expand Down Expand Up @@ -234,11 +234,11 @@ fn get_init_info(config: &StoreConfig) -> (u64, Vec<PartitionId>) {
}

pub struct GaiaService<V, VI, E, EI>
where
V: Vertex + 'static,
VI: Iterator<Item = V> + Send + 'static,
E: Edge + 'static,
EI: Iterator<Item = E> + Send + 'static,
where
V: Vertex + 'static,
VI: Iterator<Item = V> + Send + 'static,
E: Edge + 'static,
EI: Iterator<Item = E> + Send + 'static,
{
store_config: Arc<StoreConfig>,
graph: Arc<dyn GlobalGraphQuery<V = V, E = E, VI = VI, EI = EI>>,
Expand All @@ -251,11 +251,11 @@ where
}

impl<V, VI, E, EI> GaiaService<V, VI, E, EI>
where
V: Vertex + 'static,
VI: Iterator<Item = V> + Send + 'static,
E: Edge + 'static,
EI: Iterator<Item = E> + Send + 'static,
where
V: Vertex + 'static,
VI: Iterator<Item = V> + Send + 'static,
E: Edge + 'static,
EI: Iterator<Item = E> + Send + 'static,
{
pub fn new(
store_config: Arc<StoreConfig>,
Expand All @@ -274,7 +274,7 @@ where
}
}

pub fn start_rpc_service(&self) -> (String, u16) {
pub fn start_rpc_service(&self) -> u16 {
let rpc_port = self.rpc_runtime.block_on(async {
let query_vineyard = QueryVineyard::new(
self.graph.clone(),
Expand All @@ -283,16 +283,65 @@ where
self.worker_partition_list_mapping.clone(),
self.store_config.worker_num as usize,
);
let job_compiler = query_vineyard.initialize_job_compiler();
let service = Service::new(job_compiler);
let addr = format!("{}:{}", "0.0.0.0", self.store_config.rpc_port);
// TODO: add report in store_config
let rpc_service = RpcService::new(service, true);
let local_addr = start_rpc_server(addr.parse().unwrap(), rpc_service, false).await.unwrap();
local_addr.port()
let job_assembly = query_vineyard.initialize_job_compiler();
let rpc_config = RPCServerConfig::new(Some("0.0.0.0".to_string()), Some(self.store_config.rpc_port as u16));
let service_listener = GaiaRpcServiceListener::default();
let service_listener_clone = service_listener.clone();
self.rpc_runtime.spawn(async move {
// pass a fake server_id, which is required by the API, but won't be used later.
start_rpc_server(0, rpc_config, job_assembly, service_listener_clone).await.unwrap();
});
loop {
if service_listener.get_rpc_port().is_some() {
break;
} else {
thread::sleep(Duration::from_millis(10));
}
}
service_listener.get_rpc_port().unwrap()
});
let ip = get_local_ip();
info!("start rpc server on {} {}", ip, rpc_port);
(ip, rpc_port)
rpc_port
}
}

#[derive(Default)]
struct GaiaRpcServiceListener {
rpc_addr: Arc<Mutex<Option<SocketAddr>>>,
}

impl Clone for GaiaRpcServiceListener {
fn clone(&self) -> Self {
GaiaRpcServiceListener {
rpc_addr: self.rpc_addr.clone(),
}
}
}

impl GaiaRpcServiceListener {
fn new() -> Self {
GaiaRpcServiceListener {
rpc_addr: Arc::new(Mutex::new(None)),
}
}
fn get_rpc_port(&self) -> Option<u16> {
self.rpc_addr.lock().unwrap().map(|addr| addr.port())
}
}

impl ServiceStartListener for GaiaRpcServiceListener {
fn on_rpc_start(&mut self, server_id: u64, addr: SocketAddr) -> std::io::Result<()> {
info!("RPC server of server[{}] start on {}", server_id, addr);
let mut rpc_addr = self
.rpc_addr
.lock()
.map_err(|e|std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
*rpc_addr = Some(addr);
Ok(())
}

fn on_server_start(&mut self, _: u64, _: SocketAddr) -> std::io::Result<()> {
Ok(())
}
}
2 changes: 1 addition & 1 deletion research/engine/pegasus/clients/java/client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${protoc.grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<protoSourceRoot>${project.basedir}/../../../server-v0/proto/</protoSourceRoot>
<protoSourceRoot>${project.basedir}/../../../server/proto/</protoSourceRoot>
<outputDirectory>src/main/generated/</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.alibaba.pegasus.builder.JobBuilder;
import com.alibaba.pegasus.intf.CloseableIterator;
import com.alibaba.pegasus.service.protocol.PegasusClient;
import com.alibaba.pegasus.service.protocol.PegasusClient.JobConfig;
import com.alibaba.pegasus.service.protocol.PegasusClient.JobRequest;
import com.alibaba.pegasus.service.protocol.PegasusClient.JobResponse;
Expand All @@ -36,7 +37,7 @@ public class ClientExample {
private static final Logger logger = LoggerFactory.getLogger(ClientExample.class);

private static void process(JobResponse response) {
ByteString data = response.getData();
ByteString data = response.getResp();
ArrayList<Long> res = toLongArray(data.toByteArray(), data.size());
logger.info(
"got one response: job id {}, array size {}, job data {}",
Expand Down Expand Up @@ -119,8 +120,7 @@ public static void main(String[] args) throws Exception {
.setJobId(2)
.setJobName("ping_pong_example")
.setWorkers(2)
.addServers(0)
.addServers(1)
.setAll(PegasusClient.Empty.newBuilder().build())
.build();
// for job build
JobBuilder jobBuilder = new JobBuilder(confPb);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
*/
package com.alibaba.pegasus.builder;

import com.alibaba.pegasus.service.job.protocol.JobClient.Sink;
import com.alibaba.pegasus.service.job.protocol.JobClient.TaskPlan;
import com.alibaba.pegasus.service.protocol.PegasusClient.JobConfig;
import com.alibaba.pegasus.service.protocol.PegasusClient.JobRequest;
import com.alibaba.pegasus.service.protocol.PegasusClient.Sink;
import com.alibaba.pegasus.service.protocol.PegasusClient.Source;
import com.alibaba.pegasus.service.protocol.PegasusClient.TaskPlan;
import com.google.protobuf.ByteString;

public abstract class AbstractBuilder {
Expand Down Expand Up @@ -82,14 +81,16 @@ public void setPlan(Plan plan) {

public JobRequest build() {
Sink sink = this.sink;
if (this.plan.endReduce()) {
sink = this.plan.genSink();
}

return JobRequest.newBuilder()
.setConf(this.conf)
.setSource(Source.newBuilder().setResource(this.source).build())
.setPlan(TaskPlan.newBuilder().addAllPlan(this.plan.getPlan()))
.setSink(sink)
.setSource(this.source)
.setPlan(
TaskPlan.newBuilder()
.addAllPlan(this.plan.getPlan())
.build()
.toByteString())
.setResource(sink.toByteString())
.build();
}
}
Loading

0 comments on commit ce91b0c

Please sign in to comment.