From 07104f4db522bc4195918e7fc37625f9e53b1297 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Wed, 13 Sep 2023 19:18:53 +0800 Subject: [PATCH] feat(interactive): Enhancement pieces for groot (#3217) This PR includes several enhancements: 1. Tune up the message bytes default value 2. Clear temporal directory for ingest when finished or failed. 3. Set groot-client compliance level to JDK 8 4. Add maximum retry limits for batchWrite 5. Fix an error when deleting edges. 6. Add some useful debug logs --------- Co-authored-by: BingqingLyu --- charts/graphscope-store/values.yaml | 2 +- docs/storage_engine/groot.md | 2 +- .../dataload/databuild/OfflineBuildOdps.java | 5 +++ .../executor/engine/pegasus/server/src/rpc.rs | 1 + .../src/adapters/gs_store/partitioner.rs | 16 +++++++++- .../src/adapters/gs_store/read_graph.rs | 4 +++ .../executor/ir/runtime/src/router.rs | 2 ++ .../store/groot/src/db/graph/store.rs | 32 +++++++++++++++++++ interactive_engine/groot-client/pom.xml | 1 - .../groot/coordinator/SchemaManager.java | 1 + .../groot/frontend/ClientService.java | 2 ++ .../groot/frontend/StoreIngestClient.java | 4 +-- .../groot/frontend/StoreIngestClients.java | 4 +-- .../groot/frontend/StoreIngestor.java | 2 +- .../groot/frontend/write/GraphWriter.java | 5 ++- .../graphscope/groot/store/StoreService.java | 7 ++-- .../groot/servers/ir/IrServiceProducer.java | 1 + interactive_engine/pom.xml | 10 ++++++ 18 files changed, 88 insertions(+), 13 deletions(-) diff --git a/charts/graphscope-store/values.yaml b/charts/graphscope-store/values.yaml index 883131b45be2..063a81baa478 100644 --- a/charts/graphscope-store/values.yaml +++ b/charts/graphscope-store/values.yaml @@ -497,7 +497,7 @@ javaOpts: "" graphName: "graphscope" -rpcMaxBytesMb: 4 +rpcMaxBytesMb: 20 engineType: "gaia" diff --git a/docs/storage_engine/groot.md b/docs/storage_engine/groot.md index ac2a5750c002..60ac625dc89c 100644 --- a/docs/storage_engine/groot.md +++ b/docs/storage_engine/groot.md @@ -430,7 +430,7 @@ skip.header=true load.after.build=true # This is not required when load.after.build=true # hadoop.endpoint=127.0.0.1:9000 -# ``` +``` Details of the parameters are listed below: diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java index d8663df4e8d2..79a98ba8722a 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java @@ -224,6 +224,11 @@ public static void main(String[] args) throws IOException { client.clearIngest(uniquePath); throw ex; } + try { + client.clearIngest(uniquePath); + } catch (Exception ex) { + logger.error("Clear ingest failed, ignored."); + } } } diff --git a/interactive_engine/executor/engine/pegasus/server/src/rpc.rs b/interactive_engine/executor/engine/pegasus/server/src/rpc.rs index 7303b611cf41..ee8b9140fb91 100644 --- a/interactive_engine/executor/engine/pegasus/server/src/rpc.rs +++ b/interactive_engine/executor/engine/pegasus/server/src/rpc.rs @@ -165,6 +165,7 @@ where } let conf = parse_conf_req(conf.unwrap()); + info!("job conf {:?}", conf); pegasus::wait_servers_ready(conf.servers()); let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let rpc_sink = RpcSink::new(conf.job_id, tx); diff --git a/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/partitioner.rs b/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/partitioner.rs index e9ad2d702f99..62ddb2e2d544 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/partitioner.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/partitioner.rs @@ -69,11 +69,25 @@ impl VineyardMultiPartition { impl PartitionInfo for VineyardMultiPartition { fn get_partition_id(&self, data: &D) -> GraphProxyResult { + let data = data.get_partition_key_id(); + trace!( + "get partition id for data: {:?}, result {:?}", + data, + self.graph_partition_manager + .get_partition_id(data as VertexId) as PartitionId + ); Ok(self .graph_partition_manager - .get_partition_id(data.get_partition_key_id() as VertexId) as PartitionId) + .get_partition_id(data as VertexId) as PartitionId) } fn get_server_id(&self, partition_id: PartitionId) -> GraphProxyResult { + trace!( + "get server id for partition id: {:?}, result {:?}", + partition_id, + self.partition_server_index_mapping + .get(&partition_id) + .cloned() + ); self.partition_server_index_mapping .get(&partition_id) .cloned() diff --git a/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/read_graph.rs b/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/read_graph.rs index 3f747a5111b7..0cda00d00398 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/read_graph.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/read_graph.rs @@ -93,6 +93,7 @@ where &self, params: &QueryParams, ) -> GraphProxyResult + Send>> { let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?; + debug!("scan_vertex worker_partitions: {:?}", worker_partitions); if !worker_partitions.is_empty() { let store = self.store.clone(); let si = params @@ -174,10 +175,12 @@ where .map(|(_pk, value)| encode_store_prop_val(value.clone())) .collect(), }; + debug!("index_scan_vertex store_indexed_values {:?}", store_indexed_values); if let Some(vid) = self .partition_manager .get_vertex_id_by_primary_keys(store_label_id, store_indexed_values.as_ref()) { + debug!("index_scan_vertex vid {:?}", vid); Ok(Some(Vertex::new(vid as ID, Some(label_id.clone()), DynDetails::default()))) } else { Ok(None) @@ -470,6 +473,7 @@ where let store = self.store.clone(); let outer_id = store.translate_vertex_id(*id as VertexId); let pk_val = Object::from(outer_id); + trace!("get_primary_key: id: {}, outer_id {:?}, pk_val: {:?}", id, outer_id, pk_val); Ok(Some((GS_STORE_PK.into(), pk_val).into())) } } diff --git a/interactive_engine/executor/ir/runtime/src/router.rs b/interactive_engine/executor/ir/runtime/src/router.rs index 9f8de084d52a..c7a05cb71f93 100644 --- a/interactive_engine/executor/ir/runtime/src/router.rs +++ b/interactive_engine/executor/ir/runtime/src/router.rs @@ -57,9 +57,11 @@ impl Router for DefaultRouter { type C = C; fn route(&self, data: PartitionKeyId) -> GraphProxyResult { let partition_id = self.partition_info.get_partition_id(&data)?; + debug!("route partition id: {:?}", partition_id); let server_id = self .partition_info .get_server_id(partition_id)?; + debug!("route server id: {:?}", partition_id); let servers_num = self.cluster_info.get_server_num()?; let magic_num = (data as u32) / servers_num; let workers_num = self.cluster_info.get_local_worker_num()?; diff --git a/interactive_engine/executor/store/groot/src/db/graph/store.rs b/interactive_engine/executor/store/groot/src/db/graph/store.rs index 7325db4862af..dfeecfdecf0e 100644 --- a/interactive_engine/executor/store/groot/src/db/graph/store.rs +++ b/interactive_engine/executor/store/groot/src/db/graph/store.rs @@ -72,6 +72,7 @@ impl MultiVersionGraph for GraphStore { &self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option, property_ids: Option<&Vec>, ) -> GraphResult> { + trace!("get_vertex"); if let Some(label_id) = label_id { self.get_vertex_from_label(snapshot_id, vertex_id, label_id, property_ids) } else { @@ -94,6 +95,7 @@ impl MultiVersionGraph for GraphStore { &self, snapshot_id: SnapshotId, edge_id: EdgeId, edge_relation: Option<&EdgeKind>, property_ids: Option<&Vec>, ) -> GraphResult> { + trace!("get_edge"); if let Some(relation) = edge_relation { self.get_edge_from_relation(snapshot_id, edge_id, relation, property_ids) } else { @@ -122,6 +124,7 @@ impl MultiVersionGraph for GraphStore { &self, snapshot_id: SnapshotId, label_id: Option, condition: Option<&Condition>, property_ids: Option<&Vec>, ) -> GraphResult> { + trace!("scan_vertex"); let mut iter = match label_id { Some(label_id) => { match self @@ -175,6 +178,7 @@ impl MultiVersionGraph for GraphStore { &self, snapshot_id: SnapshotId, label_id: Option, condition: Option<&Condition>, property_ids: Option<&Vec>, ) -> GraphResult> { + trace!("scan_edge"); self.query_edges(snapshot_id, None, EdgeDirection::Both, label_id, condition, property_ids) } @@ -182,6 +186,7 @@ impl MultiVersionGraph for GraphStore { &self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option, condition: Option<&Condition>, property_ids: Option<&Vec>, ) -> GraphResult> { + trace!("get_out_edges"); self.query_edges( snapshot_id, Some(vertex_id), @@ -196,12 +201,14 @@ impl MultiVersionGraph for GraphStore { &self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option, condition: Option<&Condition>, property_ids: Option<&Vec>, ) -> GraphResult> { + trace!("get_in_edges"); self.query_edges(snapshot_id, Some(vertex_id), EdgeDirection::In, label_id, condition, property_ids) } fn get_out_degree( &self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option, ) -> GraphResult { + trace!("get_out_degree"); let edges_iter = self.get_out_edges(snapshot_id, vertex_id, label_id, None, Some(vec![]).as_ref())?; Ok(edges_iter.count()) @@ -210,6 +217,7 @@ impl MultiVersionGraph for GraphStore { fn get_in_degree( &self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option, ) -> GraphResult { + trace!("get_in_degree"); let edges_iter = self.get_in_edges(snapshot_id, vertex_id, label_id, None, Some(vec![]).as_ref())?; Ok(edges_iter.count()) @@ -219,6 +227,7 @@ impl MultiVersionGraph for GraphStore { &self, snapshot_id: SnapshotId, vertex_id: VertexId, edge_relation: &EdgeKind, k: SerialId, property_ids: Option<&Vec>, ) -> GraphResult> { + trace!("get_kth_out_edge"); let mut edges_iter = self.get_out_edges( snapshot_id, vertex_id, @@ -233,6 +242,7 @@ impl MultiVersionGraph for GraphStore { &self, snapshot_id: SnapshotId, vertex_id: VertexId, edge_relation: &EdgeKind, k: SerialId, property_ids: Option<&Vec>, ) -> GraphResult> { + trace!("get_kth_in_edge"); let mut edges_iter = self.get_in_edges( snapshot_id, vertex_id, @@ -246,6 +256,7 @@ impl MultiVersionGraph for GraphStore { fn create_vertex_type( &self, si: i64, schema_version: i64, label_id: LabelId, type_def: &TypeDef, table_id: i64, ) -> GraphResult { + trace!("create_vertex_type"); let _guard = res_unwrap!(self.lock.lock(), create_vertex_type)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -270,6 +281,7 @@ impl MultiVersionGraph for GraphStore { fn create_edge_type( &self, si: i64, schema_version: i64, label_id: LabelId, type_def: &TypeDef, ) -> GraphResult { + trace!("create_edge_type"); let _guard = res_unwrap!(self.lock.lock(), create_edge_type)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -293,6 +305,7 @@ impl MultiVersionGraph for GraphStore { fn add_edge_kind( &self, si: i64, schema_version: i64, edge_kind: &EdgeKind, table_id: i64, ) -> GraphResult { + trace!("add_edge_kind"); let _guard = res_unwrap!(self.lock.lock(), add_edge_kind)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -318,6 +331,7 @@ impl MultiVersionGraph for GraphStore { } fn drop_vertex_type(&self, si: i64, schema_version: i64, label_id: LabelId) -> GraphResult { + trace!("drop_vertex_type"); let _guard = res_unwrap!(self.lock.lock(), drop_vertex_type, si, label_id)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -331,6 +345,7 @@ impl MultiVersionGraph for GraphStore { } fn drop_edge_type(&self, si: i64, schema_version: i64, label_id: LabelId) -> GraphResult { + trace!("drop_edge_type"); let _guard = res_unwrap!(self.lock.lock(), drop_edge_type, si, label_id)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -344,6 +359,7 @@ impl MultiVersionGraph for GraphStore { } fn remove_edge_kind(&self, si: i64, schema_version: i64, edge_kind: &EdgeKind) -> GraphResult { + trace!("remove_edge_kind"); let _guard = res_unwrap!(self.lock.lock(), remove_edge_kind, si, edge_kind)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -362,6 +378,7 @@ impl MultiVersionGraph for GraphStore { fn insert_overwrite_vertex( &self, si: SnapshotId, id: VertexId, label: LabelId, properties: &dyn PropertyMap, ) -> GraphResult<()> { + trace!("insert_overwrite_vertex"); self.check_si_guard(si)?; let res = self .vertex_manager @@ -374,6 +391,7 @@ impl MultiVersionGraph for GraphStore { fn insert_update_vertex( &self, si: i64, id: i64, label: LabelId, properties: &dyn PropertyMap, ) -> GraphResult<()> { + trace!("insert_update_vertex"); self.check_si_guard(si)?; let info = res_unwrap!(self.vertex_manager.get_type(si, label), si, id, label)?; match res_unwrap!(self.get_vertex_data(si, id, &info), insert_update_vertex, si, id, label)? { @@ -399,6 +417,7 @@ impl MultiVersionGraph for GraphStore { fn clear_vertex_properties( &self, si: i64, id: i64, label: LabelId, prop_ids: &[PropertyId], ) -> GraphResult<()> { + trace!("clear_vertex_properties"); self.check_si_guard(si)?; let info = res_unwrap!(self.vertex_manager.get_type(si, label), si, id, label)?; if let Some(data) = self.get_vertex_data(si, id, &info)? { @@ -415,6 +434,7 @@ impl MultiVersionGraph for GraphStore { } fn delete_vertex(&self, si: i64, id: i64, label: LabelId) -> GraphResult<()> { + trace!("delete_vertex"); self.check_si_guard(si)?; let info = res_unwrap!(self.vertex_manager.get_type(si, label), si, id, label)?; if let Some(table) = info.get_table(si) { @@ -430,6 +450,7 @@ impl MultiVersionGraph for GraphStore { fn insert_overwrite_edge( &self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool, properties: &dyn PropertyMap, ) -> GraphResult<()> { + trace!("insert_overwrite_edge"); self.check_si_guard(si)?; let direction = if forward { EdgeDirection::Out } else { EdgeDirection::In }; let res = self @@ -443,6 +464,7 @@ impl MultiVersionGraph for GraphStore { fn insert_update_edge( &self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool, properties: &dyn PropertyMap, ) -> GraphResult<()> { + trace!("insert_update_edge"); self.check_si_guard(si)?; let info = res_unwrap!( self.edge_manager.get_edge_kind(si, edge_kind), @@ -476,6 +498,7 @@ impl MultiVersionGraph for GraphStore { fn clear_edge_properties( &self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool, prop_ids: &[PropertyId], ) -> GraphResult<()> { + trace!("clear_edge_properties"); self.check_si_guard(si)?; self.check_si_guard(si)?; let info = res_unwrap!( @@ -500,6 +523,7 @@ impl MultiVersionGraph for GraphStore { } fn delete_edge(&self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool) -> GraphResult<()> { + trace!("delete_edge"); self.check_si_guard(si)?; let info = res_unwrap!(self.edge_manager.get_edge_kind(si, edge_kind), si, id, edge_kind)?; let direction = if forward { EdgeDirection::Out } else { EdgeDirection::In }; @@ -592,6 +616,7 @@ impl MultiVersionGraph for GraphStore { impl GraphStore { pub fn open(config: &GraphConfig, path: &str) -> GraphResult { + info!("open graph store at {}, with config: {:?}", path, config); match config.get_storage_engine() { "rocksdb" => { let res = RocksDB::open(config.get_storage_options(), path).and_then(|db| { @@ -627,6 +652,7 @@ impl GraphStore { fn get_vertex_data( &self, si: SnapshotId, id: VertexId, info: &VertexTypeInfoRef, ) -> GraphResult> { + trace!("get_vertex_data"); if let Some(table) = info.get_table(si) { let key = vertex_key(table.id, id, si - table.start_si); let mut iter = self.storage.scan_from(&key)?; @@ -643,6 +669,7 @@ impl GraphStore { fn get_edge_data( &self, si: SnapshotId, id: EdgeId, info: &EdgeKindInfoRef, direction: EdgeDirection, ) -> GraphResult> { + trace!("get_edge_data"); if let Some(table) = info.get_table(si) { let ts = si - table.start_si; let key = edge_key(table.id, id, direction, ts); @@ -660,6 +687,7 @@ impl GraphStore { fn do_insert_vertex_data( &self, si: SnapshotId, info: VertexTypeInfoRef, id: VertexId, properties: &dyn PropertyMap, ) -> GraphResult<()> { + trace!("do_insert_vertex_data"); if let Some(table) = info.get_table(si) { let encoder = res_unwrap!(info.get_encoder(si), do_insert_vertex_data)?; let mut buf = Vec::new(); @@ -680,6 +708,7 @@ impl GraphStore { &self, si: SnapshotId, edge_id: EdgeId, info: EdgeKindInfoRef, direction: EdgeDirection, properties: &dyn PropertyMap, ) -> GraphResult<()> { + trace!("do_insert_edge_data"); if let Some(table) = info.get_table(si) { let encoder = res_unwrap!(info.get_encoder(si), do_insert_edge_data)?; let mut buf = Vec::new(); @@ -725,6 +754,7 @@ impl GraphStore { &self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: LabelId, property_ids: Option<&Vec>, ) -> GraphResult> { + trace!("get_vertex_from_label"); let snapshot_id = snapshot_id as i64; let vertex_type_info = self .vertex_manager @@ -755,6 +785,7 @@ impl GraphStore { &self, snapshot_id: SnapshotId, edge_id: EdgeId, edge_relation: &EdgeKind, property_ids: Option<&Vec>, ) -> GraphResult> { + trace!("get_edge_from_relation"); let snapshot_id = snapshot_id as i64; let info = self .edge_manager @@ -785,6 +816,7 @@ impl GraphStore { &self, snapshot_id: SnapshotId, vertex_id: Option, direction: EdgeDirection, label_id: Option, condition: Option<&Condition>, property_ids: Option<&Vec>, ) -> GraphResult> { + trace!("query_edges"); let mut iter = match label_id { Some(label_id) => { match self diff --git a/interactive_engine/groot-client/pom.xml b/interactive_engine/groot-client/pom.xml index 01d2b7c3f0dd..9b46320d90ed 100644 --- a/interactive_engine/groot-client/pom.xml +++ b/interactive_engine/groot-client/pom.xml @@ -181,7 +181,6 @@ com.alibaba.graphscope.groot.sdk.example.RealtimeWrite com.alibaba.graphscope.groot.sdk.example.RealtimeWrite - 1.11 -1 diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java index b183ca71736b..9632d482092d 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java @@ -106,6 +106,7 @@ private void recoverInternal() throws IOException, ExecutionException, Interrupt this.graphDefRef.set(graphDef); this.ready = true; logger.info("SchemaManager recovered. version [" + graphDef.getVersion() + "]"); + logger.info(graphDef.toProto().toString()); } public void stop() { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java index 7c20e804f23f..de816a69cff4 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java @@ -380,9 +380,11 @@ public void clearIngest( int storeCount = this.metaService.getStoreCount(); AtomicInteger counter = new AtomicInteger(storeCount); AtomicBoolean finished = new AtomicBoolean(false); + String dataPath = request.getDataPath(); for (int i = 0; i < storeCount; i++) { this.storeIngestor.clearIngest( i, + dataPath, new CompletionCallback() { @Override public void onCompleted(Void res) { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClient.java index b1daa9d9f1de..9a8f3e1b70bb 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClient.java @@ -58,9 +58,9 @@ public void onCompleted() {} }); } - public void storeClearIngest(CompletionCallback callback) { + public void storeClearIngest(String path, CompletionCallback callback) { this.stub.storeClearIngest( - StoreClearIngestRequest.newBuilder().build(), + StoreClearIngestRequest.newBuilder().setDataPath(path).build(), new StreamObserver() { @Override public void onNext(StoreClearIngestResponse storeClearIngestResponse) { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClients.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClients.java index 2bb4b5d8f208..ea2d2008b291 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClients.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClients.java @@ -48,7 +48,7 @@ public void ingest( } @Override - public void clearIngest(int storeId, CompletionCallback callback) { - this.getClient(storeId).storeClearIngest(callback); + public void clearIngest(int storeId, String path, CompletionCallback callback) { + this.getClient(storeId).storeClearIngest(path, callback); } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestor.java index 8a99e8dd6b9d..285ca06d3620 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestor.java @@ -26,5 +26,5 @@ void ingest( Map config, CompletionCallback callback); - void clearIngest(int storeId, CompletionCallback callback); + void clearIngest(int storeId, String path, CompletionCallback callback); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java index 01b41c8ee221..7da92153507e 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java @@ -249,6 +249,9 @@ private void addDeleteVertexOperation( int labelId = vertexDef.getLabelId(); Map pkVals = parseRawProperties(vertexDef, vertexRecordKey.getProperties()); + Map properties = dataRecord.getProperties(); + Map propertyVals = parseRawProperties(vertexDef, properties); + propertyVals.putAll(pkVals); long hashId = getPrimaryKeysHashId(labelId, pkVals, vertexDef); batchBuilder.addOperation( new DeleteVertexOperation(new VertexId(hashId), new LabelId(labelId))); @@ -257,12 +260,12 @@ private void addDeleteVertexOperation( private void addUpdateVertexOperation( OperationBatch.Builder batchBuilder, GraphSchema schema, DataRecord dataRecord) { VertexRecordKey vertexRecordKey = dataRecord.getVertexRecordKey(); - Map properties = dataRecord.getProperties(); String label = vertexRecordKey.getLabel(); GraphElement vertexDef = schema.getElement(label); int labelId = vertexDef.getLabelId(); Map pkVals = parseRawProperties(vertexDef, vertexRecordKey.getProperties()); + Map properties = dataRecord.getProperties(); Map propertyVals = parseRawProperties(vertexDef, properties); propertyVals.putAll(pkVals); long hashId = getPrimaryKeysHashId(labelId, pkVals, vertexDef); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java index 475a62a713cd..6e9c2d6e61ae 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java @@ -224,9 +224,11 @@ public boolean batchWrite(StoreDataBatch storeDataBatch) long snapshotId = storeDataBatch.getSnapshotId(); List> dataBatch = storeDataBatch.getDataBatch(); AtomicBoolean hasDdl = new AtomicBoolean(false); + int maxRetry = 10; for (Map partitionToBatch : dataBatch) { - while (!shouldStop && partitionToBatch.size() != 0) { + while (!shouldStop && partitionToBatch.size() != 0 && maxRetry > 0) { partitionToBatch = writeStore(snapshotId, partitionToBatch, hasDdl); + maxRetry--; } } return hasDdl.get(); @@ -364,12 +366,11 @@ private void ingestDataInternal( } public void clearIngest(String dataPath) throws IOException { - String dataRoot = StoreConfig.STORE_DATA_PATH.get(storeConfigs); if (dataPath == null || dataPath.isEmpty()) { logger.warn("Must set a sub-path for clearing."); return; } - + String dataRoot = StoreConfig.STORE_DATA_PATH.get(storeConfigs); Path downloadPath = Paths.get(dataRoot, "download", dataPath); try { logger.info("Clearing directory {}", downloadPath); diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/IrServiceProducer.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/IrServiceProducer.java index 4f092157c98e..c7d7ac83fc8a 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/IrServiceProducer.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/IrServiceProducer.java @@ -56,6 +56,7 @@ public AbstractService makeGraphService( ChannelFetcher channelFetcher = new RpcChannelManagerFetcher(channelManager, executorCount, RoleType.GAIA_RPC); com.alibaba.graphscope.common.config.Configs irConfigs = getConfigs(); + logger.info("IR configs: {}", irConfigs); IrMetaFetcher irMetaFetcher = new GrootMetaFetcher(schemaFetcher); SnapshotUpdateCommitter updateCommitter = new SnapshotUpdateCommitter(channelManager); int frontendId = CommonConfig.NODE_IDX.get(configs); diff --git a/interactive_engine/pom.xml b/interactive_engine/pom.xml index 61733289326d..f20ecc0a98a1 100644 --- a/interactive_engine/pom.xml +++ b/interactive_engine/pom.xml @@ -114,6 +114,13 @@ + + maven-compiler-plugin + + 1.8 + 1.8 + + org.apache.maven.plugins maven-gpg-plugin @@ -216,6 +223,9 @@ 3.0.0 3.2.2 2.7 + 1.8 + 1.8 + 4.4.0