Skip to content

Commit

Permalink
[GAIA] Make input data as generic data types in JobAssembly interfa…
Browse files Browse the repository at this point in the history
…ce (#1859)

* [GAIA Engine] make input/output data as generic data types in JobAssembly

* [GAIA Engine] revert output as Vec<u8> in JobAssembly
  • Loading branch information
BingqingLyu committed Jul 20, 2022
1 parent 6616ee0 commit fd9b55f
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 29 deletions.
2 changes: 1 addition & 1 deletion research/engine/pegasus/server/examples/echo_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct Config {

struct EchoJobParser;

impl JobAssembly for EchoJobParser {
impl JobAssembly<Vec<u8>> for EchoJobParser {
fn assemble(&self, job: &JobDesc, worker: &mut Worker<Vec<u8>, Vec<u8>>) -> Result<(), BuildJobError> {
worker.dataflow(|input, output| {
input
Expand Down
5 changes: 3 additions & 2 deletions research/engine/pegasus/server/src/cluster/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::net::SocketAddr;

use crate::job::JobAssembly;
use crate::rpc::{RPCServerConfig, ServiceStartListener};
use pegasus::Data;

struct StandaloneServiceListener;

Expand All @@ -17,11 +18,11 @@ impl ServiceStartListener for StandaloneServiceListener {
}
}

pub async fn start<P>(
pub async fn start<I: Data, P>(
rpc_config: RPCServerConfig, server_config: pegasus::Configuration, assemble: P,
) -> Result<(), Box<dyn std::error::Error>>
where
P: JobAssembly,
P: JobAssembly<I>,
{
let detect = if let Some(net_conf) = server_config.network_config() {
net_conf.get_servers()?.unwrap_or(vec![])
Expand Down
8 changes: 4 additions & 4 deletions research/engine/pegasus/server/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use libloading::{Library, Symbol};
use pegasus::{BuildJobError, Worker};
use pegasus::{BuildJobError, Data, Worker};

#[derive(Default)]
pub struct JobDesc {
Expand All @@ -25,13 +25,13 @@ impl JobDesc {
}
}

pub trait JobAssembly: Send + Sync + 'static {
fn assemble(&self, job: &JobDesc, worker: &mut Worker<Vec<u8>, Vec<u8>>) -> Result<(), BuildJobError>;
pub trait JobAssembly<I: Data>: Send + Sync + 'static {
fn assemble(&self, job: &JobDesc, worker: &mut Worker<I, Vec<u8>>) -> Result<(), BuildJobError>;
}

pub struct DynLibraryAssembly;

impl JobAssembly for DynLibraryAssembly {
impl JobAssembly<Vec<u8>> for DynLibraryAssembly {
fn assemble(&self, job: &JobDesc, worker: &mut Worker<Vec<u8>, Vec<u8>>) -> Result<(), BuildJobError> {
if let Ok(resource) = String::from_utf8(job.resource.clone()) {
if let Some(lib) = pegasus::resource::get_global_resource::<Library>(&resource) {
Expand Down
18 changes: 9 additions & 9 deletions research/engine/pegasus/server/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use hyper::server::conn::{AddrIncoming, AddrStream};
use pegasus::api::function::FnResult;
use pegasus::api::FromStream;
use pegasus::result::{FromStreamExt, ResultSink};
use pegasus::{Configuration, JobConf, ServerConf};
use pegasus::{Configuration, Data, JobConf, ServerConf};
use pegasus_network::config::ServerAddr;
use pegasus_network::ServerDetect;
use serde::Deserialize;
Expand Down Expand Up @@ -103,15 +103,15 @@ impl Drop for RpcSink {
}

#[derive(Clone)]
pub struct JobServiceImpl<P> {
inner: Arc<P>,
pub struct JobServiceImpl<I> {
inner: Arc<dyn JobAssembly<I>>,
report: bool,
}

#[tonic::async_trait]
impl<P: JobAssembly> pb::job_service_server::JobService for JobServiceImpl<P>
impl<I> pb::job_service_server::JobService for JobServiceImpl<I>
where
P: JobAssembly,
I: Data,
{
async fn add_library(&self, request: Request<BinaryResource>) -> Result<Response<Empty>, Status> {
let BinaryResource { name, resource } = request.into_inner();
Expand Down Expand Up @@ -227,12 +227,12 @@ pub struct RPCJobServer<S: pb::job_service_server::JobService> {
}

/// start both rpc server and pegasus server
pub async fn start_all<P, D, E>(
pub async fn start_all<I: Data, P, D, E>(
rpc_config: RPCServerConfig, server_config: Configuration, assemble: P, server_detector: D,
mut listener: E,
) -> Result<(), Box<dyn std::error::Error>>
where
P: JobAssembly,
P: JobAssembly<I>,
D: ServerDetect + 'static,
E: ServiceStartListener,
{
Expand All @@ -245,11 +245,11 @@ where
}

/// startup rpc server
pub async fn start_rpc_server<P, E>(
pub async fn start_rpc_server<I: Data, P, E>(
server_id: u64, rpc_config: RPCServerConfig, assemble: P, listener: E,
) -> Result<(), Box<dyn std::error::Error>>
where
P: JobAssembly,
P: JobAssembly<I>,
E: ServiceStartListener,
{
let service = JobServiceImpl { inner: Arc::new(assemble), report: true };
Expand Down
16 changes: 3 additions & 13 deletions research/query_service/ir/runtime/src/assembly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use pegasus::api::{
Collect, CorrelatedSubTask, Count, Dedup, EmitKind, Filter, Fold, FoldByKey, HasAny, IterCondition,
Iteration, Join, KeyBy, Limit, Map, Merge, PartitionByKey, Sink, SortBy, SortLimitBy,
};
use pegasus::codec::{Decode, Encode};
use pegasus::stream::Stream;
use pegasus::{BuildJobError, Worker};
use pegasus_server::job::{JobAssembly, JobDesc};
Expand Down Expand Up @@ -468,23 +467,14 @@ impl IRJobAssembly {
}
}

impl JobAssembly for IRJobAssembly {
fn assemble(&self, plan: &JobDesc, worker: &mut Worker<Vec<u8>, Vec<u8>>) -> Result<(), BuildJobError> {
impl JobAssembly<Record> for IRJobAssembly {
fn assemble(&self, plan: &JobDesc, worker: &mut Worker<Record, Vec<u8>>) -> Result<(), BuildJobError> {
worker.dataflow(move |input, output| {
let source = decode::<server_pb::Source>(&plan.input)?;
let source_iter = self
.udf_gen
.gen_source(self.parse(&source.resource)?)?;
let source = input
.input_from(source_iter.map(|record| {
let mut buf: Vec<u8> = vec![];
record.write_to(&mut buf).unwrap();
buf
}))?
.map(|buf| {
let record = Record::read_from(&mut buf.as_slice()).unwrap();
Ok(record)
})?;
let source = input.input_from(source_iter)?;
let task = decode::<server_pb::TaskPlan>(&plan.plan)?;
let stream = self.install(source, &task.plan)?;

Expand Down

0 comments on commit fd9b55f

Please sign in to comment.