diff --git a/analytical_engine/core/java/graphx_client.h b/analytical_engine/core/java/graphx_client.h new file mode 100644 index 000000000000..a66516867d7e --- /dev/null +++ b/analytical_engine/core/java/graphx_client.h @@ -0,0 +1,90 @@ +/** Copyright 2022 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANALYTICAL_ENGINE_CORE_JAVA_GRAPHX_CLIENT_H_ +#define ANALYTICAL_ENGINE_CORE_JAVA_GRAPHX_CLIENT_H_ + +#include "graphx_loader.h" +#include "rdd_transfer_client.h" + +namespace gs { + +template +class GraphXClient + : public vineyard::BasicEVFragmentLoader> { + public: + GraphXClient(int listen_port, int part_cnt, vineyard::Client& client, + const grape::CommSpec& comm_spec, + const GraphXPartitioner& partitioner, + bool directed = true, bool retain_oid = false, + bool generate_eid = false) + : vineyard::BasicEVFragmentLoader>( + client, comm_spec, partitioner, directed, retain_oid, + generate_eid) { + listen_port_ = listen_port; + part_cnt_ = part_cnt; + } + + boost::leaf::result LoadFragment() { + int request_port = listen_port_; + std::string target_str = "localhost:" + std::to_string(request_port); + RDDReaderClient node_client( + grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials())); + + node_client.RequestPartitionInfo(); + node_client.RequestArrItem(); + node_client.SendClose(); + + std::shared_ptr vertex_table = node_client.get_vertex_table(); + LOG(INFO) << "Finish built vertex table"; + sleep(10); + + request_port += part_cnt_; + target_str = "localhost:" + std::to_string(request_port); + RDDReaderClient edge_client( + grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials())); + + edge_client.GetEdgeData(); + + edge_client.RequestPartitionInfo(); + edge_client.RequestArrItem(); + edge_client.SendClose(); + std::shared_ptr edge_table = edge_client.get_edge_table(); + LOG(INFO) << "Finish built edge table"; + + BOOST_LEAF_CHECK(this->AddVertexTable("v0", vertex_table)); + LOG(INFO) << "Fnish adding vertices"; + + BOOST_LEAF_CHECK(this->ConstructVertices()); + LOG(INFO) << "Fnish Construct vertices"; + + BOOST_LEAF_CHECK(this->AddEdgeTable("v0", "v0", "e0", edge_table)); + LOG(INFO) << "Fnish adding edges"; + + BOOST_LEAF_CHECK(this->ConstructEdges()); + LOG(INFO) << "Fnish Construct edges"; + + return this->ConstructFragment(); + } + + private: + int part_cnt_; + int listen_port_; +}; + +} // namespace gs +#endif \ No newline at end of file diff --git a/analytical_engine/core/java/rdd_transfer_client.h b/analytical_engine/core/java/rdd_transfer_client.h new file mode 100644 index 000000000000..791f1e09d1d7 --- /dev/null +++ b/analytical_engine/core/java/rdd_transfer_client.h @@ -0,0 +1,218 @@ +/** Copyright 2022 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANALYTICAL_ENGINE_CORE_JAVA_GRAPHX_RDD_CLIENT_H_ +#define ANALYTICAL_ENGINE_CORE_JAVA_GRAPHX_RDD_CLIENT_H_ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "arrow/array.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/table.h" +#include "arrow/type.h" + +#include "rdd.grpc.pb.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::ClientReader; +using grpc::ClientReaderWriter; +using grpc::ClientWriter; +using grpc::Status; + +using RDDReaderTransfer::array_type; +using RDDReaderTransfer::basic_type; +using RDDReaderTransfer::CloseRequest; +using RDDReaderTransfer::CloseResponse; +using RDDReaderTransfer::essential_type; +using RDDReaderTransfer::GetArray; +using RDDReaderTransfer::ItemRequest; +using RDDReaderTransfer::PartInfoRequest; +using RDDReaderTransfer::PartitionInfo; +using RDDReaderTransfer::PartitionItem; + +namespace gs { + +class RDDReaderClient { + public: + RDDReaderClient(std::shared_ptr channel) + : stub_(GetArray::NewStub(channel)), part_id_(0) { + vertex_schema_vector_ = {arrow::field("ID", arrow::int64()), + arrow::field("VALUE", arrow::utf8())}; + + edge_schema_vector_ = {arrow::field("SRC", arrow::int64()), + arrow::field("DST", arrow::int64()), + arrow::field("VALUE", arrow::utf8())}; + } + + void RequestPartitionInfo() { + PartInfoRequest info_req; + info_req.set_req(true); + + PartitionInfo part_info; + ClientContext context; + Status status = stub_->GetPartitionInfo(&context, info_req, &part_info); + if (status.ok()) { + part_id_ = part_info.partitionid(); + std::string rdd_data_type = part_info.datatype(); + part_data_type_ = str_split(rdd_data_type, ":"); + } + } + + bool RequestArrItem() { + ItemRequest item_req; + item_req.set_req(true); + + PartitionItem item_reply; + ClientContext context; + Status status; + std::unique_ptr> reader( + stub_->GetPartitionItem(&context, item_req)); + + int item_cnt = 0; + while (reader->Read(&item_reply)) { + resolve_item_data(item_reply); + item_cnt++; + } + + status = reader->Finish(); + if (status.ok()) { + arrow::Int64Builder id_builder; + id_builder.AppendValues(oid_vec1_); + + auto id_maybe_array = id_builder.Finish(); + std::shared_ptr id_array = *id_maybe_array; + + arrow::StringBuilder str_builder; + str_builder.AppendValues(data_vec_); + auto str_maybe_array = str_builder.Finish(); + std::shared_ptr data_array = *str_maybe_array; + if (get_node_data_) { + auto vertex_schema = + std::make_shared(vertex_schema_vector_); + vertex_table_ = + arrow::Table::Make(vertex_schema, {id_array, data_array}); + } else { + auto edge_schema = std::make_shared(edge_schema_vector_); + arrow::Int64Builder dst_builder; + dst_builder.AppendValues(oid_vec2_); + auto dst_maybe_array = dst_builder.Finish(); + std::shared_ptr dst_array = *dst_maybe_array; + + edge_table_ = + arrow::Table::Make(edge_schema, {id_array, dst_array, data_array}); + } + return true; + } else { + return false; + } + } + + bool SendClose() { + ClientContext context; + CloseRequest close_req; + close_req.set_req(true); + + CloseResponse response; + Status status = stub_->RpcClose(&context, close_req, &response); + if (status.ok()) { + return true; + } else { + return false; + } + } + + int GetPartId() { return part_id_; } + + void GetEdgeData() { get_node_data_ = false; } + + std::shared_ptr get_vertex_table() { return vertex_table_; } + + std::shared_ptr get_edge_table() { return edge_table_; } + + private: + std::vector str_split(std::string str, std::string sep) { + std::vector ret; + int posi = str.find_first_of(sep); + while (posi != std::string::npos) { + std::string tmp = str.substr(0, posi); + ret.push_back(tmp); + str = str.substr(posi + 1); + posi = str.find_first_of(sep); + } + if (str != "") { + ret.push_back(str); + } + return ret; + } + + void resolve_item_data(const PartitionItem& data) { + for (int i = 1; i < part_data_type_.size(); i++) { + if (part_data_type_[i].substr(0, 5) == "Array") { + array_type array_data = data.basic_data(i - 1).array(); + int item_cnt = array_data.item_size(); + + std::string attr = ""; + for (int j = 0; j < item_cnt; j++) { + std::string str = array_data.item(j).string_data(); + attr = attr + "," + str; + } + data_vec_.push_back(attr); + } else { + essential_type essen_data = data.basic_data(i - 1).essen(); + if (part_data_type_[i] == "long") { + int64_t vid = essen_data.long_data(); + if (i == 1) { + oid_vec1_.push_back(vid); + } else { + oid_vec2_.push_back(vid); + } + } else { + std::cout << "type error, id type should be long" << std::endl; + } + } + } + } + + private: + std::unique_ptr stub_; + int part_id_; + bool get_node_data_ = true; + std::vector part_data_type_; + std::vector> vertex_schema_vector_; + std::vector> edge_schema_vector_; + + std::vector oid_vec1_; + std::vector oid_vec2_; + std::vector data_vec_; + + std::shared_ptr vertex_table_; + std::shared_ptr edge_table_; +}; + +} // namespace gs \ No newline at end of file diff --git a/analytical_engine/java/grape-rdd-reader/pom.xml b/analytical_engine/java/grape-rdd-reader/pom.xml new file mode 100644 index 000000000000..b972eb9f6801 --- /dev/null +++ b/analytical_engine/java/grape-rdd-reader/pom.xml @@ -0,0 +1,182 @@ + + + + 4.0.0 + + com.alibaba.graphscope + grape-jdk-parent + ${revision} + ../pom.xml + + + grape-rdd-reader + jar + GraphX rdd in Grape + + + 8 + 8 + + + + + org.scala-lang + scala-library + + + + io.grpc + grpc-bom + ${grpc.version} + pom + import + + + + io.grpc + grpc-netty-shaded + ${grpc.version} + runtime + + + io.grpc + grpc-protobuf + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + com.google.protobuf + protobuf-java-util + ${protobuf.version} + + + com.google.code.gson + gson + 2.9.0 + + + io.grpc + grpc-testing + ${grpc.version} + test + + + com.google.guava + guava + 31.0.1-android + + + + + + + src/main/resources + + + + + + kr.motd.maven + os-maven-plugin + 1.6.2 + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 2.5.5 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + org.scala-tools + maven-scala-plugin + 2.15.2 + + + + compile + + + + **/*.scala + + + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + + + org.apache.maven.plugins + maven-enforcer-plugin + 1.4.1 + + + enforce + + enforce + + + + + + + + + + + + diff --git a/analytical_engine/java/grape-rdd-reader/src/main/java/com/alibaba/RDDReaderTransfer/RDDReadServer.java b/analytical_engine/java/grape-rdd-reader/src/main/java/com/alibaba/RDDReaderTransfer/RDDReadServer.java new file mode 100644 index 000000000000..a68f686bfc0d --- /dev/null +++ b/analytical_engine/java/grape-rdd-reader/src/main/java/com/alibaba/RDDReaderTransfer/RDDReadServer.java @@ -0,0 +1,249 @@ +/* + * Copyright 2022 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.RDDReaderTransfer; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; + +import scala.Tuple2; +import scala.Tuple3; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.UnknownHostException; +import java.util.*; +import java.util.Enumeration; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +public class RDDReadServer { + public static String getLocalHostLANAddress() throws UnknownHostException { + try { + InetAddress candidateAddress = null; + for (Enumeration ifaces = NetworkInterface.getNetworkInterfaces(); + ifaces.hasMoreElements(); ) { + NetworkInterface iface = (NetworkInterface) ifaces.nextElement(); + for (Enumeration inetAddrs = iface.getInetAddresses(); + inetAddrs.hasMoreElements(); ) { + InetAddress inetAddr = (InetAddress) inetAddrs.nextElement(); + if (!inetAddr.isLoopbackAddress()) { + if (inetAddr.isSiteLocalAddress()) { + return inetAddr.toString(); + } else if (candidateAddress == null) { + candidateAddress = inetAddr; + } + } + } + } + if (candidateAddress != null) { + return candidateAddress.toString(); + } + InetAddress jdkSuppliedAddress = InetAddress.getLocalHost(); + if (jdkSuppliedAddress == null) { + throw new UnknownHostException( + "The JDK InetAddress.getLocalHost() method unexpectedly returned null."); + } + return jdkSuppliedAddress.toString(); + } catch (Exception e) { + UnknownHostException unknownHostException = + new UnknownHostException("Failed to determine LAN address: " + e); + unknownHostException.initCause(e); + throw unknownHostException; + } + } + + private static final Logger logger = Logger.getLogger(RDDReadServer.class.getName()); + + private Server server; + + private int listen_port_base_ = 50000, listenPort_; + + private int partitionId_; + + private int partitionCnt_; + + private Iterator partitionIter_; + + private final int wait_time = 30; + + private String data_type_; + private ArrayList essential_names_ = + new ArrayList<>(Arrays.asList("int", "double", "float", "long", "bool", "string")); + + public RDDReadServer( + int port_shift, int partition_id, Iterator iter, String data_type, int part_cnt) { + listenPort_ = listen_port_base_ + port_shift; + partitionId_ = partition_id; + partitionIter_ = iter; + data_type_ = data_type; + partitionCnt_ = part_cnt; + } + + public void start() throws IOException { + /* The port on which the server should run */ + server = ServerBuilder.forPort(listenPort_).addService(new RDDService()).build().start(); + logger.info("Server started, listening on " + listenPort_); + Runtime.getRuntime() + .addShutdownHook( + new Thread() { + @Override + public void run() { + // Use stderr here since the logger may have been reset by its JVM + // shutdown hook. + logger.info( + "*** shutting down gRPC server since JVM is shutting down"); + try { + RDDReadServer.this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + logger.info("*** server shut down"); + } + }); + } + + private void stop() throws InterruptedException { + if (server != null) { + server.shutdown().awaitTermination(wait_time, TimeUnit.SECONDS); + } + } + + /** + * Await termination on the main thread since the grpc library uses daemon threads. + */ + public void blockUntilShutdown() throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } + + class RDDService extends GetArrayGrpc.GetArrayImplBase { + RDDService() {} + + private essential_type buildEssen(EssenType data_val, String type) { + essential_type essen_data; + if (type.startsWith("int")) { + Integer int_val = (Integer) data_val; + essen_data = essential_type.newBuilder().setIntData(int_val).build(); + } else if (type.startsWith("float")) { + Float float_val = (Float) data_val; + essen_data = essential_type.newBuilder().setFloatData(float_val).build(); + } else if (type.startsWith("double")) { + Double double_val = (Double) data_val; + essen_data = essential_type.newBuilder().setDoubleData(double_val).build(); + } else if (type.startsWith("string")) { + String str_val = (String) data_val; + essen_data = essential_type.newBuilder().setStringData(str_val).build(); + } else if (type.startsWith("long")) { // long + Long long_val = (Long) data_val; + essen_data = essential_type.newBuilder().setLongData(long_val).build(); + } else { // null + essen_data = null; + } + return essen_data; + } + + private array_type buildArray(ArrayType array_data, String array_info) { + array_type.Builder arr = array_type.newBuilder(); + String[] array_type = array_info.split(","); + String[] arr_item = (String[]) (array_data); + for (Integer i = 0; i < arr_item.length; i++) { + essential_type essen_data = buildEssen(arr_item[i], array_type[1]); + arr.addItem(essen_data); + } + return arr.build(); + } + + private basic_type buildBasic(BasicType data_val, String type) { + basic_type.Builder basic_data = basic_type.newBuilder(); + if (type.startsWith("Array")) { + array_type arr_tmp = buildArray(data_val, type); + basic_data.setArray(arr_tmp); + } else { + essential_type ess_tmp = buildEssen(data_val, type); + basic_data.setEssen(ess_tmp); + } + return basic_data.build(); + } + + private PartitionItem buildPartitionItem() { + PartitionItem.Builder new_item = PartitionItem.newBuilder(); + if (essential_names_.contains(data_type_) || data_type_.startsWith("Array")) { + basic_type basic_data = buildBasic(partitionIter_.next(), data_type_); + new_item.addBasicData(basic_data); + } else { // tuple + int idx = 1; + String[] tuple_type = data_type_.split(":"); + if (tuple_type.length == 3) { + Tuple2 tup2 = (Tuple2) partitionIter_.next(); + while (idx < tuple_type.length) { + basic_type new_basic = + buildBasic(tup2.productElement(idx - 1), tuple_type[idx]); + new_item.addBasicData(new_basic); + idx++; + } + } else if (tuple_type.length == 4) { + Tuple3 tup3 = (Tuple3) partitionIter_.next(); + while (idx < tuple_type.length) { + basic_type new_basic = + buildBasic(tup3.productElement(idx - 1), tuple_type[idx]); + new_item.addBasicData(new_basic); + idx++; + } + } else { + logger.info("type error, currently tuple2 and tuple3 only"); + } + } + return new_item.build(); + } + + public void getPartitionInfo( + PartInfoRequest request, StreamObserver responseObserver) { + responseObserver.onNext( + PartitionInfo.newBuilder() + .setPartitionId(partitionId_) + .setPartitionCnt(partitionCnt_) + .setDataType(data_type_) + .build()); + responseObserver.onCompleted(); + } + + public void getPartitionItem( + ItemRequest request, StreamObserver responseObserver) { + while (partitionIter_.hasNext()) { + PartitionItem rdd_item = buildPartitionItem(); + responseObserver.onNext(rdd_item); + } + responseObserver.onCompleted(); + } + + public void rpcClose(CloseRequest request, StreamObserver responseObserver) { + responseObserver.onNext(CloseResponse.newBuilder().setClose(true).build()); + responseObserver.onCompleted(); + final int sleep_time = 100; + try { + Thread.sleep(sleep_time); + } catch (InterruptedException e) { + server.shutdown(); + e.printStackTrace(); + } + server.shutdown(); + } + } +} diff --git a/analytical_engine/java/grape-rdd-reader/src/main/proto/rdd.proto b/analytical_engine/java/grape-rdd-reader/src/main/proto/rdd.proto new file mode 100644 index 000000000000..6c306af923a0 --- /dev/null +++ b/analytical_engine/java/grape-rdd-reader/src/main/proto/rdd.proto @@ -0,0 +1,84 @@ +/* +* Copyright 2022 Alibaba Group Holding Limited. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +syntax = "proto3"; + +//import "google/protobuf/empty.proto"; +//import "google/protobuf/struct.proto"; + +option java_multiple_files = true; +option java_package = "com.alibaba.RDDReaderTransfer"; +option java_outer_classname = "RDDReaderTransferProto"; +option objc_class_prefix = "HLW"; + +//For cpp namespace +package RDDReaderTransfer; + +service GetArray{ + rpc GetPartitionInfo(PartInfoRequest) returns (PartitionInfo) {} + rpc GetPartitionItem(ItemRequest) returns (stream PartitionItem) {} + rpc RpcClose(CloseRequest) returns (CloseResponse) {} +} + +message PartInfoRequest { + bool req = 1; +} + +message ItemRequest { + bool req = 1; +} + +message CloseRequest { + bool req = 1; +} + +message CloseResponse { + bool close = 1; +} + + +//没有scala中的char,short,Byte +message essential_type { + oneof essential { + int32 int_data = 1; + int64 long_data = 2; + float float_data = 3; + double double_data = 4; + bool bool_data = 5; + string string_data = 6; + } +} + +message array_type { + repeated essential_type item = 1; +} + +message basic_type { + oneof basic { + essential_type essen = 1; + array_type array = 2; + } +} + +message PartitionInfo { + int32 partitionId = 1; + int32 partitionCnt = 2; + string dataType = 3; +} + +message PartitionItem { + repeated basic_type basic_data = 1; +} + diff --git a/analytical_engine/java/grape-rdd-reader/src/test/scala/com/alibaba/RDDTransfer.scala b/analytical_engine/java/grape-rdd-reader/src/test/scala/com/alibaba/RDDTransfer.scala new file mode 100644 index 000000000000..a07ad86631a1 --- /dev/null +++ b/analytical_engine/java/grape-rdd-reader/src/test/scala/com/alibaba/RDDTransfer.scala @@ -0,0 +1,76 @@ +/* +* Copyright 2022 Alibaba Group Holding Limited. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package com.alibaba.RDDReaderTransfer + +import org.apache.spark.{SparkConf, SparkContext, TaskContext} + +import scala.collection.mutable.{ArrayBuffer, Map} +import scala.jdk.CollectionConverters._ +import Array._ + +object RDDReader { + val node_executors: Map[String, Int] = Map() + def getExecutorId(hostName: String): Int = { + this.synchronized { + if (node_executors.contains(hostName)) { + node_executors(hostName) = node_executors(hostName) + 1 + } else { + node_executors += (hostName -> 0) + } + return node_executors(hostName) + } + } + + def main(args: Array[String]): Unit = { + val conf = new SparkConf().setAppName("RDDReader") + val sc = new SparkContext(conf) + + val node_file_name = sc.getConf.get("vertex_file") + val edge_file_name = sc.getConf.get("edge_file") + + val nodes = sc.textFile(node_file_name).map(line => line.split(",")) + .map(parts =>(parts.head.toLong,parts.drop(1))) + + val node_type = "tuple:long:Array,string" + println(node_type) + + nodes.foreachPartition(iter => { + val cur_host = RDDReadServer.getLocalHostLANAddress() + val executor_id = getExecutorId(cur_host) + val server = new RDDReadServer(executor_id, TaskContext.get.partitionId, iter.asJava, node_type, nodes.getNumPartitions) + server.start() + server.blockUntilShutdown() + }) + println("node transfer over") + + val edges = sc.textFile(edge_file_name).map(line => line.split(",")) + .map(parts => (parts(0).toLong, parts(1).toLong, parts.drop(2))) + + val edge_type = "tuple:long:long:Array,string" + println(edge_type) + + edges.foreachPartition(iter => { + val cur_host = RDDReadServer.getLocalHostLANAddress() + val executor_id = getExecutorId(cur_host) + val server = new RDDReadServer(executor_id, TaskContext.get.partitionId, iter.asJava, edge_type, edges.getNumPartitions) + server.start() + server.blockUntilShutdown() + }) + println("edge transfer over") + println("graph transfer all over") + } +} diff --git a/analytical_engine/java/pom.xml b/analytical_engine/java/pom.xml index db07e00bdee1..ed365be39fd8 100644 --- a/analytical_engine/java/pom.xml +++ b/analytical_engine/java/pom.xml @@ -60,6 +60,7 @@ grape-giraph grape-demo grape-runtime + grape-rdd-reader @@ -96,6 +97,10 @@ 3.3.1 grape-jdk-javadoc grape-jdk-javadoc + 1.47.0 + 3.19.2 + 3.19.2 + 2.12.15