Skip to content

Commit

Permalink
feat(interactive): Enhancement pieces for groot (#3217)
Browse files Browse the repository at this point in the history
This PR includes several enhancements:
1. Tune up the message bytes default value
2. Clear temporal directory for ingest when finished or failed.
3. Set groot-client compliance level to JDK 8
4. Add maximum retry limits for batchWrite
5. Fix an error when deleting edges.
6. Add some useful debug logs

---------

Co-authored-by: BingqingLyu <bingqing.lbq@alibaba-inc.com>
  • Loading branch information
siyuan0322 and BingqingLyu committed Sep 13, 2023
1 parent 8f06334 commit 07104f4
Show file tree
Hide file tree
Showing 18 changed files with 88 additions and 13 deletions.
2 changes: 1 addition & 1 deletion charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ javaOpts: ""

graphName: "graphscope"

rpcMaxBytesMb: 4
rpcMaxBytesMb: 20

engineType: "gaia"

Expand Down
2 changes: 1 addition & 1 deletion docs/storage_engine/groot.md
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ skip.header=true
load.after.build=true
# This is not required when load.after.build=true
# hadoop.endpoint=127.0.0.1:9000
# ```
```

Details of the parameters are listed below:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ public static void main(String[] args) throws IOException {
client.clearIngest(uniquePath);
throw ex;
}
try {
client.clearIngest(uniquePath);
} catch (Exception ex) {
logger.error("Clear ingest failed, ignored.");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ where
}

let conf = parse_conf_req(conf.unwrap());
info!("job conf {:?}", conf);
pegasus::wait_servers_ready(conf.servers());
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let rpc_sink = RpcSink::new(conf.job_id, tx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,25 @@ impl VineyardMultiPartition {

impl PartitionInfo for VineyardMultiPartition {
fn get_partition_id<D: PartitionedData>(&self, data: &D) -> GraphProxyResult<PartitionId> {
let data = data.get_partition_key_id();
trace!(
"get partition id for data: {:?}, result {:?}",
data,
self.graph_partition_manager
.get_partition_id(data as VertexId) as PartitionId
);
Ok(self
.graph_partition_manager
.get_partition_id(data.get_partition_key_id() as VertexId) as PartitionId)
.get_partition_id(data as VertexId) as PartitionId)
}
fn get_server_id(&self, partition_id: PartitionId) -> GraphProxyResult<ServerId> {
trace!(
"get server id for partition id: {:?}, result {:?}",
partition_id,
self.partition_server_index_mapping
.get(&partition_id)
.cloned()
);
self.partition_server_index_mapping
.get(&partition_id)
.cloned()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ where
&self, params: &QueryParams,
) -> GraphProxyResult<Box<dyn Iterator<Item = Vertex> + Send>> {
let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?;
debug!("scan_vertex worker_partitions: {:?}", worker_partitions);
if !worker_partitions.is_empty() {
let store = self.store.clone();
let si = params
Expand Down Expand Up @@ -174,10 +175,12 @@ where
.map(|(_pk, value)| encode_store_prop_val(value.clone()))
.collect(),
};
debug!("index_scan_vertex store_indexed_values {:?}", store_indexed_values);
if let Some(vid) = self
.partition_manager
.get_vertex_id_by_primary_keys(store_label_id, store_indexed_values.as_ref())
{
debug!("index_scan_vertex vid {:?}", vid);
Ok(Some(Vertex::new(vid as ID, Some(label_id.clone()), DynDetails::default())))
} else {
Ok(None)
Expand Down Expand Up @@ -470,6 +473,7 @@ where
let store = self.store.clone();
let outer_id = store.translate_vertex_id(*id as VertexId);
let pk_val = Object::from(outer_id);
trace!("get_primary_key: id: {}, outer_id {:?}, pk_val: {:?}", id, outer_id, pk_val);
Ok(Some((GS_STORE_PK.into(), pk_val).into()))
}
}
Expand Down
2 changes: 2 additions & 0 deletions interactive_engine/executor/ir/runtime/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ impl<P: PartitionInfo, C: ClusterInfo> Router for DefaultRouter<P, C> {
type C = C;
fn route(&self, data: PartitionKeyId) -> GraphProxyResult<WorkerId> {
let partition_id = self.partition_info.get_partition_id(&data)?;
debug!("route partition id: {:?}", partition_id);
let server_id = self
.partition_info
.get_server_id(partition_id)?;
debug!("route server id: {:?}", partition_id);
let servers_num = self.cluster_info.get_server_num()?;
let magic_num = (data as u32) / servers_num;
let workers_num = self.cluster_info.get_local_worker_num()?;
Expand Down
32 changes: 32 additions & 0 deletions interactive_engine/executor/store/groot/src/db/graph/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl MultiVersionGraph for GraphStore {
&self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option<LabelId>,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Option<Self::V>> {
trace!("get_vertex");
if let Some(label_id) = label_id {
self.get_vertex_from_label(snapshot_id, vertex_id, label_id, property_ids)
} else {
Expand All @@ -94,6 +95,7 @@ impl MultiVersionGraph for GraphStore {
&self, snapshot_id: SnapshotId, edge_id: EdgeId, edge_relation: Option<&EdgeKind>,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Option<Self::E>> {
trace!("get_edge");
if let Some(relation) = edge_relation {
self.get_edge_from_relation(snapshot_id, edge_id, relation, property_ids)
} else {
Expand Down Expand Up @@ -122,6 +124,7 @@ impl MultiVersionGraph for GraphStore {
&self, snapshot_id: SnapshotId, label_id: Option<LabelId>, condition: Option<&Condition>,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Records<Self::V>> {
trace!("scan_vertex");
let mut iter = match label_id {
Some(label_id) => {
match self
Expand Down Expand Up @@ -175,13 +178,15 @@ impl MultiVersionGraph for GraphStore {
&self, snapshot_id: SnapshotId, label_id: Option<LabelId>, condition: Option<&Condition>,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Records<Self::E>> {
trace!("scan_edge");
self.query_edges(snapshot_id, None, EdgeDirection::Both, label_id, condition, property_ids)
}

fn get_out_edges(
&self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option<LabelId>,
condition: Option<&Condition>, property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Records<Self::E>> {
trace!("get_out_edges");
self.query_edges(
snapshot_id,
Some(vertex_id),
Expand All @@ -196,12 +201,14 @@ impl MultiVersionGraph for GraphStore {
&self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option<LabelId>,
condition: Option<&Condition>, property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Records<Self::E>> {
trace!("get_in_edges");
self.query_edges(snapshot_id, Some(vertex_id), EdgeDirection::In, label_id, condition, property_ids)
}

fn get_out_degree(
&self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option<LabelId>,
) -> GraphResult<usize> {
trace!("get_out_degree");
let edges_iter =
self.get_out_edges(snapshot_id, vertex_id, label_id, None, Some(vec![]).as_ref())?;
Ok(edges_iter.count())
Expand All @@ -210,6 +217,7 @@ impl MultiVersionGraph for GraphStore {
fn get_in_degree(
&self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option<LabelId>,
) -> GraphResult<usize> {
trace!("get_in_degree");
let edges_iter =
self.get_in_edges(snapshot_id, vertex_id, label_id, None, Some(vec![]).as_ref())?;
Ok(edges_iter.count())
Expand All @@ -219,6 +227,7 @@ impl MultiVersionGraph for GraphStore {
&self, snapshot_id: SnapshotId, vertex_id: VertexId, edge_relation: &EdgeKind, k: SerialId,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Option<Self::E>> {
trace!("get_kth_out_edge");
let mut edges_iter = self.get_out_edges(
snapshot_id,
vertex_id,
Expand All @@ -233,6 +242,7 @@ impl MultiVersionGraph for GraphStore {
&self, snapshot_id: SnapshotId, vertex_id: VertexId, edge_relation: &EdgeKind, k: SerialId,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Option<Self::E>> {
trace!("get_kth_in_edge");
let mut edges_iter = self.get_in_edges(
snapshot_id,
vertex_id,
Expand All @@ -246,6 +256,7 @@ impl MultiVersionGraph for GraphStore {
fn create_vertex_type(
&self, si: i64, schema_version: i64, label_id: LabelId, type_def: &TypeDef, table_id: i64,
) -> GraphResult<bool> {
trace!("create_vertex_type");
let _guard = res_unwrap!(self.lock.lock(), create_vertex_type)?;
self.check_si_guard(si)?;
if let Err(_) = self.meta.check_version(schema_version) {
Expand All @@ -270,6 +281,7 @@ impl MultiVersionGraph for GraphStore {
fn create_edge_type(
&self, si: i64, schema_version: i64, label_id: LabelId, type_def: &TypeDef,
) -> GraphResult<bool> {
trace!("create_edge_type");
let _guard = res_unwrap!(self.lock.lock(), create_edge_type)?;
self.check_si_guard(si)?;
if let Err(_) = self.meta.check_version(schema_version) {
Expand All @@ -293,6 +305,7 @@ impl MultiVersionGraph for GraphStore {
fn add_edge_kind(
&self, si: i64, schema_version: i64, edge_kind: &EdgeKind, table_id: i64,
) -> GraphResult<bool> {
trace!("add_edge_kind");
let _guard = res_unwrap!(self.lock.lock(), add_edge_kind)?;
self.check_si_guard(si)?;
if let Err(_) = self.meta.check_version(schema_version) {
Expand All @@ -318,6 +331,7 @@ impl MultiVersionGraph for GraphStore {
}

fn drop_vertex_type(&self, si: i64, schema_version: i64, label_id: LabelId) -> GraphResult<bool> {
trace!("drop_vertex_type");
let _guard = res_unwrap!(self.lock.lock(), drop_vertex_type, si, label_id)?;
self.check_si_guard(si)?;
if let Err(_) = self.meta.check_version(schema_version) {
Expand All @@ -331,6 +345,7 @@ impl MultiVersionGraph for GraphStore {
}

fn drop_edge_type(&self, si: i64, schema_version: i64, label_id: LabelId) -> GraphResult<bool> {
trace!("drop_edge_type");
let _guard = res_unwrap!(self.lock.lock(), drop_edge_type, si, label_id)?;
self.check_si_guard(si)?;
if let Err(_) = self.meta.check_version(schema_version) {
Expand All @@ -344,6 +359,7 @@ impl MultiVersionGraph for GraphStore {
}

fn remove_edge_kind(&self, si: i64, schema_version: i64, edge_kind: &EdgeKind) -> GraphResult<bool> {
trace!("remove_edge_kind");
let _guard = res_unwrap!(self.lock.lock(), remove_edge_kind, si, edge_kind)?;
self.check_si_guard(si)?;
if let Err(_) = self.meta.check_version(schema_version) {
Expand All @@ -362,6 +378,7 @@ impl MultiVersionGraph for GraphStore {
fn insert_overwrite_vertex(
&self, si: SnapshotId, id: VertexId, label: LabelId, properties: &dyn PropertyMap,
) -> GraphResult<()> {
trace!("insert_overwrite_vertex");
self.check_si_guard(si)?;
let res = self
.vertex_manager
Expand All @@ -374,6 +391,7 @@ impl MultiVersionGraph for GraphStore {
fn insert_update_vertex(
&self, si: i64, id: i64, label: LabelId, properties: &dyn PropertyMap,
) -> GraphResult<()> {
trace!("insert_update_vertex");
self.check_si_guard(si)?;
let info = res_unwrap!(self.vertex_manager.get_type(si, label), si, id, label)?;
match res_unwrap!(self.get_vertex_data(si, id, &info), insert_update_vertex, si, id, label)? {
Expand All @@ -399,6 +417,7 @@ impl MultiVersionGraph for GraphStore {
fn clear_vertex_properties(
&self, si: i64, id: i64, label: LabelId, prop_ids: &[PropertyId],
) -> GraphResult<()> {
trace!("clear_vertex_properties");
self.check_si_guard(si)?;
let info = res_unwrap!(self.vertex_manager.get_type(si, label), si, id, label)?;
if let Some(data) = self.get_vertex_data(si, id, &info)? {
Expand All @@ -415,6 +434,7 @@ impl MultiVersionGraph for GraphStore {
}

fn delete_vertex(&self, si: i64, id: i64, label: LabelId) -> GraphResult<()> {
trace!("delete_vertex");
self.check_si_guard(si)?;
let info = res_unwrap!(self.vertex_manager.get_type(si, label), si, id, label)?;
if let Some(table) = info.get_table(si) {
Expand All @@ -430,6 +450,7 @@ impl MultiVersionGraph for GraphStore {
fn insert_overwrite_edge(
&self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool, properties: &dyn PropertyMap,
) -> GraphResult<()> {
trace!("insert_overwrite_edge");
self.check_si_guard(si)?;
let direction = if forward { EdgeDirection::Out } else { EdgeDirection::In };
let res = self
Expand All @@ -443,6 +464,7 @@ impl MultiVersionGraph for GraphStore {
fn insert_update_edge(
&self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool, properties: &dyn PropertyMap,
) -> GraphResult<()> {
trace!("insert_update_edge");
self.check_si_guard(si)?;
let info = res_unwrap!(
self.edge_manager.get_edge_kind(si, edge_kind),
Expand Down Expand Up @@ -476,6 +498,7 @@ impl MultiVersionGraph for GraphStore {
fn clear_edge_properties(
&self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool, prop_ids: &[PropertyId],
) -> GraphResult<()> {
trace!("clear_edge_properties");
self.check_si_guard(si)?;
self.check_si_guard(si)?;
let info = res_unwrap!(
Expand All @@ -500,6 +523,7 @@ impl MultiVersionGraph for GraphStore {
}

fn delete_edge(&self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool) -> GraphResult<()> {
trace!("delete_edge");
self.check_si_guard(si)?;
let info = res_unwrap!(self.edge_manager.get_edge_kind(si, edge_kind), si, id, edge_kind)?;
let direction = if forward { EdgeDirection::Out } else { EdgeDirection::In };
Expand Down Expand Up @@ -592,6 +616,7 @@ impl MultiVersionGraph for GraphStore {

impl GraphStore {
pub fn open(config: &GraphConfig, path: &str) -> GraphResult<Self> {
info!("open graph store at {}, with config: {:?}", path, config);
match config.get_storage_engine() {
"rocksdb" => {
let res = RocksDB::open(config.get_storage_options(), path).and_then(|db| {
Expand Down Expand Up @@ -627,6 +652,7 @@ impl GraphStore {
fn get_vertex_data(
&self, si: SnapshotId, id: VertexId, info: &VertexTypeInfoRef,
) -> GraphResult<Option<&[u8]>> {
trace!("get_vertex_data");
if let Some(table) = info.get_table(si) {
let key = vertex_key(table.id, id, si - table.start_si);
let mut iter = self.storage.scan_from(&key)?;
Expand All @@ -643,6 +669,7 @@ impl GraphStore {
fn get_edge_data(
&self, si: SnapshotId, id: EdgeId, info: &EdgeKindInfoRef, direction: EdgeDirection,
) -> GraphResult<Option<&[u8]>> {
trace!("get_edge_data");
if let Some(table) = info.get_table(si) {
let ts = si - table.start_si;
let key = edge_key(table.id, id, direction, ts);
Expand All @@ -660,6 +687,7 @@ impl GraphStore {
fn do_insert_vertex_data(
&self, si: SnapshotId, info: VertexTypeInfoRef, id: VertexId, properties: &dyn PropertyMap,
) -> GraphResult<()> {
trace!("do_insert_vertex_data");
if let Some(table) = info.get_table(si) {
let encoder = res_unwrap!(info.get_encoder(si), do_insert_vertex_data)?;
let mut buf = Vec::new();
Expand All @@ -680,6 +708,7 @@ impl GraphStore {
&self, si: SnapshotId, edge_id: EdgeId, info: EdgeKindInfoRef, direction: EdgeDirection,
properties: &dyn PropertyMap,
) -> GraphResult<()> {
trace!("do_insert_edge_data");
if let Some(table) = info.get_table(si) {
let encoder = res_unwrap!(info.get_encoder(si), do_insert_edge_data)?;
let mut buf = Vec::new();
Expand Down Expand Up @@ -725,6 +754,7 @@ impl GraphStore {
&self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: LabelId,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Option<RocksVertexImpl>> {
trace!("get_vertex_from_label");
let snapshot_id = snapshot_id as i64;
let vertex_type_info = self
.vertex_manager
Expand Down Expand Up @@ -755,6 +785,7 @@ impl GraphStore {
&self, snapshot_id: SnapshotId, edge_id: EdgeId, edge_relation: &EdgeKind,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Option<RocksEdgeImpl>> {
trace!("get_edge_from_relation");
let snapshot_id = snapshot_id as i64;
let info = self
.edge_manager
Expand Down Expand Up @@ -785,6 +816,7 @@ impl GraphStore {
&self, snapshot_id: SnapshotId, vertex_id: Option<VertexId>, direction: EdgeDirection,
label_id: Option<LabelId>, condition: Option<&Condition>, property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Records<RocksEdgeImpl>> {
trace!("query_edges");
let mut iter = match label_id {
Some(label_id) => {
match self
Expand Down
1 change: 0 additions & 1 deletion interactive_engine/groot-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@
<argument>com.alibaba.graphscope.groot.sdk.example.RealtimeWrite</argument>
</arguments>
<mainClass>com.alibaba.graphscope.groot.sdk.example.RealtimeWrite</mainClass>
<complianceLevel>1.11</complianceLevel>
<killAfter>-1</killAfter>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ private void recoverInternal() throws IOException, ExecutionException, Interrupt
this.graphDefRef.set(graphDef);
this.ready = true;
logger.info("SchemaManager recovered. version [" + graphDef.getVersion() + "]");
logger.info(graphDef.toProto().toString());
}

public void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,11 @@ public void clearIngest(
int storeCount = this.metaService.getStoreCount();
AtomicInteger counter = new AtomicInteger(storeCount);
AtomicBoolean finished = new AtomicBoolean(false);
String dataPath = request.getDataPath();
for (int i = 0; i < storeCount; i++) {
this.storeIngestor.clearIngest(
i,
dataPath,
new CompletionCallback<Void>() {
@Override
public void onCompleted(Void res) {
Expand Down

0 comments on commit 07104f4

Please sign in to comment.