Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(interactive): Enhancement pieces for groot #3217

Merged
merged 2 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ javaOpts: ""

graphName: "graphscope"

rpcMaxBytesMb: 4
rpcMaxBytesMb: 20

engineType: "gaia"

Expand Down
2 changes: 1 addition & 1 deletion docs/storage_engine/groot.md
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ skip.header=true
load.after.build=true
# This is not required when load.after.build=true
# hadoop.endpoint=127.0.0.1:9000
# ```
```

Details of the parameters are listed below:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ public static void main(String[] args) throws IOException {
client.clearIngest(uniquePath);
throw ex;
}
try {
client.clearIngest(uniquePath);
} catch (Exception ex) {
logger.error("Clear ingest failed, ignored.");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ where
}

let conf = parse_conf_req(conf.unwrap());
info!("job conf {:?}", conf);
pegasus::wait_servers_ready(conf.servers());
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let rpc_sink = RpcSink::new(conf.job_id, tx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,25 @@ impl VineyardMultiPartition {

impl PartitionInfo for VineyardMultiPartition {
fn get_partition_id<D: PartitionedData>(&self, data: &D) -> GraphProxyResult<PartitionId> {
let data = data.get_partition_key_id();
trace!(
"get partition id for data: {:?}, result {:?}",
data,
self.graph_partition_manager
.get_partition_id(data as VertexId) as PartitionId
);
Ok(self
.graph_partition_manager
.get_partition_id(data.get_partition_key_id() as VertexId) as PartitionId)
.get_partition_id(data as VertexId) as PartitionId)
}
fn get_server_id(&self, partition_id: PartitionId) -> GraphProxyResult<ServerId> {
trace!(
"get server id for partition id: {:?}, result {:?}",
partition_id,
self.partition_server_index_mapping
.get(&partition_id)
.cloned()
);
self.partition_server_index_mapping
.get(&partition_id)
.cloned()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ where
&self, params: &QueryParams,
) -> GraphProxyResult<Box<dyn Iterator<Item = Vertex> + Send>> {
let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?;
debug!("scan_vertex worker_partitions: {:?}", worker_partitions);
if !worker_partitions.is_empty() {
let store = self.store.clone();
let si = params
Expand Down Expand Up @@ -174,10 +175,12 @@ where
.map(|(_pk, value)| encode_store_prop_val(value.clone()))
.collect(),
};
debug!("index_scan_vertex store_indexed_values {:?}", store_indexed_values);
if let Some(vid) = self
.partition_manager
.get_vertex_id_by_primary_keys(store_label_id, store_indexed_values.as_ref())
{
debug!("index_scan_vertex vid {:?}", vid);
Ok(Some(Vertex::new(vid as ID, Some(label_id.clone()), DynDetails::default())))
} else {
Ok(None)
Expand Down Expand Up @@ -470,6 +473,7 @@ where
let store = self.store.clone();
let outer_id = store.translate_vertex_id(*id as VertexId);
let pk_val = Object::from(outer_id);
trace!("get_primary_key: id: {}, outer_id {:?}, pk_val: {:?}", id, outer_id, pk_val);
Ok(Some((GS_STORE_PK.into(), pk_val).into()))
}
}
Expand Down
2 changes: 2 additions & 0 deletions interactive_engine/executor/ir/runtime/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ impl<P: PartitionInfo, C: ClusterInfo> Router for DefaultRouter<P, C> {
type C = C;
fn route(&self, data: PartitionKeyId) -> GraphProxyResult<WorkerId> {
let partition_id = self.partition_info.get_partition_id(&data)?;
debug!("route partition id: {:?}", partition_id);
let server_id = self
.partition_info
.get_server_id(partition_id)?;
debug!("route server id: {:?}", partition_id);
let servers_num = self.cluster_info.get_server_num()?;
let magic_num = (data as u32) / servers_num;
let workers_num = self.cluster_info.get_local_worker_num()?;
Expand Down
32 changes: 32 additions & 0 deletions interactive_engine/executor/store/groot/src/db/graph/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl MultiVersionGraph for GraphStore {
&self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option<LabelId>,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Option<Self::V>> {
trace!("get_vertex");
if let Some(label_id) = label_id {
self.get_vertex_from_label(snapshot_id, vertex_id, label_id, property_ids)
} else {
Expand All @@ -94,6 +95,7 @@ impl MultiVersionGraph for GraphStore {
&self, snapshot_id: SnapshotId, edge_id: EdgeId, edge_relation: Option<&EdgeKind>,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Option<Self::E>> {
trace!("get_edge");
if let Some(relation) = edge_relation {
self.get_edge_from_relation(snapshot_id, edge_id, relation, property_ids)
} else {
Expand Down Expand Up @@ -122,6 +124,7 @@ impl MultiVersionGraph for GraphStore {
&self, snapshot_id: SnapshotId, label_id: Option<LabelId>, condition: Option<&Condition>,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Records<Self::V>> {
trace!("scan_vertex");
let mut iter = match label_id {
Some(label_id) => {
match self
Expand Down Expand Up @@ -175,13 +178,15 @@ impl MultiVersionGraph for GraphStore {
&self, snapshot_id: SnapshotId, label_id: Option<LabelId>, condition: Option<&Condition>,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Records<Self::E>> {
trace!("scan_edge");
self.query_edges(snapshot_id, None, EdgeDirection::Both, label_id, condition, property_ids)
}

fn get_out_edges(
&self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option<LabelId>,
condition: Option<&Condition>, property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Records<Self::E>> {
trace!("get_out_edges");
self.query_edges(
snapshot_id,
Some(vertex_id),
Expand All @@ -196,12 +201,14 @@ impl MultiVersionGraph for GraphStore {
&self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option<LabelId>,
condition: Option<&Condition>, property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Records<Self::E>> {
trace!("get_in_edges");
self.query_edges(snapshot_id, Some(vertex_id), EdgeDirection::In, label_id, condition, property_ids)
}

fn get_out_degree(
&self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option<LabelId>,
) -> GraphResult<usize> {
trace!("get_out_degree");
let edges_iter =
self.get_out_edges(snapshot_id, vertex_id, label_id, None, Some(vec![]).as_ref())?;
Ok(edges_iter.count())
Expand All @@ -210,6 +217,7 @@ impl MultiVersionGraph for GraphStore {
fn get_in_degree(
&self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option<LabelId>,
) -> GraphResult<usize> {
trace!("get_in_degree");
let edges_iter =
self.get_in_edges(snapshot_id, vertex_id, label_id, None, Some(vec![]).as_ref())?;
Ok(edges_iter.count())
Expand All @@ -219,6 +227,7 @@ impl MultiVersionGraph for GraphStore {
&self, snapshot_id: SnapshotId, vertex_id: VertexId, edge_relation: &EdgeKind, k: SerialId,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Option<Self::E>> {
trace!("get_kth_out_edge");
let mut edges_iter = self.get_out_edges(
snapshot_id,
vertex_id,
Expand All @@ -233,6 +242,7 @@ impl MultiVersionGraph for GraphStore {
&self, snapshot_id: SnapshotId, vertex_id: VertexId, edge_relation: &EdgeKind, k: SerialId,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Option<Self::E>> {
trace!("get_kth_in_edge");
let mut edges_iter = self.get_in_edges(
snapshot_id,
vertex_id,
Expand All @@ -246,6 +256,7 @@ impl MultiVersionGraph for GraphStore {
fn create_vertex_type(
&self, si: i64, schema_version: i64, label_id: LabelId, type_def: &TypeDef, table_id: i64,
) -> GraphResult<bool> {
trace!("create_vertex_type");
let _guard = res_unwrap!(self.lock.lock(), create_vertex_type)?;
self.check_si_guard(si)?;
if let Err(_) = self.meta.check_version(schema_version) {
Expand All @@ -270,6 +281,7 @@ impl MultiVersionGraph for GraphStore {
fn create_edge_type(
&self, si: i64, schema_version: i64, label_id: LabelId, type_def: &TypeDef,
) -> GraphResult<bool> {
trace!("create_edge_type");
let _guard = res_unwrap!(self.lock.lock(), create_edge_type)?;
self.check_si_guard(si)?;
if let Err(_) = self.meta.check_version(schema_version) {
Expand All @@ -293,6 +305,7 @@ impl MultiVersionGraph for GraphStore {
fn add_edge_kind(
&self, si: i64, schema_version: i64, edge_kind: &EdgeKind, table_id: i64,
) -> GraphResult<bool> {
trace!("add_edge_kind");
let _guard = res_unwrap!(self.lock.lock(), add_edge_kind)?;
self.check_si_guard(si)?;
if let Err(_) = self.meta.check_version(schema_version) {
Expand All @@ -318,6 +331,7 @@ impl MultiVersionGraph for GraphStore {
}

fn drop_vertex_type(&self, si: i64, schema_version: i64, label_id: LabelId) -> GraphResult<bool> {
trace!("drop_vertex_type");
let _guard = res_unwrap!(self.lock.lock(), drop_vertex_type, si, label_id)?;
self.check_si_guard(si)?;
if let Err(_) = self.meta.check_version(schema_version) {
Expand All @@ -331,6 +345,7 @@ impl MultiVersionGraph for GraphStore {
}

fn drop_edge_type(&self, si: i64, schema_version: i64, label_id: LabelId) -> GraphResult<bool> {
trace!("drop_edge_type");
let _guard = res_unwrap!(self.lock.lock(), drop_edge_type, si, label_id)?;
self.check_si_guard(si)?;
if let Err(_) = self.meta.check_version(schema_version) {
Expand All @@ -344,6 +359,7 @@ impl MultiVersionGraph for GraphStore {
}

fn remove_edge_kind(&self, si: i64, schema_version: i64, edge_kind: &EdgeKind) -> GraphResult<bool> {
trace!("remove_edge_kind");
let _guard = res_unwrap!(self.lock.lock(), remove_edge_kind, si, edge_kind)?;
self.check_si_guard(si)?;
if let Err(_) = self.meta.check_version(schema_version) {
Expand All @@ -362,6 +378,7 @@ impl MultiVersionGraph for GraphStore {
fn insert_overwrite_vertex(
&self, si: SnapshotId, id: VertexId, label: LabelId, properties: &dyn PropertyMap,
) -> GraphResult<()> {
trace!("insert_overwrite_vertex");
self.check_si_guard(si)?;
let res = self
.vertex_manager
Expand All @@ -374,6 +391,7 @@ impl MultiVersionGraph for GraphStore {
fn insert_update_vertex(
&self, si: i64, id: i64, label: LabelId, properties: &dyn PropertyMap,
) -> GraphResult<()> {
trace!("insert_update_vertex");
self.check_si_guard(si)?;
let info = res_unwrap!(self.vertex_manager.get_type(si, label), si, id, label)?;
match res_unwrap!(self.get_vertex_data(si, id, &info), insert_update_vertex, si, id, label)? {
Expand All @@ -399,6 +417,7 @@ impl MultiVersionGraph for GraphStore {
fn clear_vertex_properties(
&self, si: i64, id: i64, label: LabelId, prop_ids: &[PropertyId],
) -> GraphResult<()> {
trace!("clear_vertex_properties");
self.check_si_guard(si)?;
let info = res_unwrap!(self.vertex_manager.get_type(si, label), si, id, label)?;
if let Some(data) = self.get_vertex_data(si, id, &info)? {
Expand All @@ -415,6 +434,7 @@ impl MultiVersionGraph for GraphStore {
}

fn delete_vertex(&self, si: i64, id: i64, label: LabelId) -> GraphResult<()> {
trace!("delete_vertex");
self.check_si_guard(si)?;
let info = res_unwrap!(self.vertex_manager.get_type(si, label), si, id, label)?;
if let Some(table) = info.get_table(si) {
Expand All @@ -430,6 +450,7 @@ impl MultiVersionGraph for GraphStore {
fn insert_overwrite_edge(
&self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool, properties: &dyn PropertyMap,
) -> GraphResult<()> {
trace!("insert_overwrite_edge");
self.check_si_guard(si)?;
let direction = if forward { EdgeDirection::Out } else { EdgeDirection::In };
let res = self
Expand All @@ -443,6 +464,7 @@ impl MultiVersionGraph for GraphStore {
fn insert_update_edge(
&self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool, properties: &dyn PropertyMap,
) -> GraphResult<()> {
trace!("insert_update_edge");
self.check_si_guard(si)?;
let info = res_unwrap!(
self.edge_manager.get_edge_kind(si, edge_kind),
Expand Down Expand Up @@ -476,6 +498,7 @@ impl MultiVersionGraph for GraphStore {
fn clear_edge_properties(
&self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool, prop_ids: &[PropertyId],
) -> GraphResult<()> {
trace!("clear_edge_properties");
self.check_si_guard(si)?;
self.check_si_guard(si)?;
let info = res_unwrap!(
Expand All @@ -500,6 +523,7 @@ impl MultiVersionGraph for GraphStore {
}

fn delete_edge(&self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool) -> GraphResult<()> {
trace!("delete_edge");
self.check_si_guard(si)?;
let info = res_unwrap!(self.edge_manager.get_edge_kind(si, edge_kind), si, id, edge_kind)?;
let direction = if forward { EdgeDirection::Out } else { EdgeDirection::In };
Expand Down Expand Up @@ -592,6 +616,7 @@ impl MultiVersionGraph for GraphStore {

impl GraphStore {
pub fn open(config: &GraphConfig, path: &str) -> GraphResult<Self> {
info!("open graph store at {}, with config: {:?}", path, config);
match config.get_storage_engine() {
"rocksdb" => {
let res = RocksDB::open(config.get_storage_options(), path).and_then(|db| {
Expand Down Expand Up @@ -627,6 +652,7 @@ impl GraphStore {
fn get_vertex_data(
&self, si: SnapshotId, id: VertexId, info: &VertexTypeInfoRef,
) -> GraphResult<Option<&[u8]>> {
trace!("get_vertex_data");
if let Some(table) = info.get_table(si) {
let key = vertex_key(table.id, id, si - table.start_si);
let mut iter = self.storage.scan_from(&key)?;
Expand All @@ -643,6 +669,7 @@ impl GraphStore {
fn get_edge_data(
&self, si: SnapshotId, id: EdgeId, info: &EdgeKindInfoRef, direction: EdgeDirection,
) -> GraphResult<Option<&[u8]>> {
trace!("get_edge_data");
if let Some(table) = info.get_table(si) {
let ts = si - table.start_si;
let key = edge_key(table.id, id, direction, ts);
Expand All @@ -660,6 +687,7 @@ impl GraphStore {
fn do_insert_vertex_data(
&self, si: SnapshotId, info: VertexTypeInfoRef, id: VertexId, properties: &dyn PropertyMap,
) -> GraphResult<()> {
trace!("do_insert_vertex_data");
if let Some(table) = info.get_table(si) {
let encoder = res_unwrap!(info.get_encoder(si), do_insert_vertex_data)?;
let mut buf = Vec::new();
Expand All @@ -680,6 +708,7 @@ impl GraphStore {
&self, si: SnapshotId, edge_id: EdgeId, info: EdgeKindInfoRef, direction: EdgeDirection,
properties: &dyn PropertyMap,
) -> GraphResult<()> {
trace!("do_insert_edge_data");
if let Some(table) = info.get_table(si) {
let encoder = res_unwrap!(info.get_encoder(si), do_insert_edge_data)?;
let mut buf = Vec::new();
Expand Down Expand Up @@ -725,6 +754,7 @@ impl GraphStore {
&self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: LabelId,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Option<RocksVertexImpl>> {
trace!("get_vertex_from_label");
let snapshot_id = snapshot_id as i64;
let vertex_type_info = self
.vertex_manager
Expand Down Expand Up @@ -755,6 +785,7 @@ impl GraphStore {
&self, snapshot_id: SnapshotId, edge_id: EdgeId, edge_relation: &EdgeKind,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Option<RocksEdgeImpl>> {
trace!("get_edge_from_relation");
let snapshot_id = snapshot_id as i64;
let info = self
.edge_manager
Expand Down Expand Up @@ -785,6 +816,7 @@ impl GraphStore {
&self, snapshot_id: SnapshotId, vertex_id: Option<VertexId>, direction: EdgeDirection,
label_id: Option<LabelId>, condition: Option<&Condition>, property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Records<RocksEdgeImpl>> {
trace!("query_edges");
let mut iter = match label_id {
Some(label_id) => {
match self
Expand Down
1 change: 0 additions & 1 deletion interactive_engine/groot-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@
<argument>com.alibaba.graphscope.groot.sdk.example.RealtimeWrite</argument>
</arguments>
<mainClass>com.alibaba.graphscope.groot.sdk.example.RealtimeWrite</mainClass>
<complianceLevel>1.11</complianceLevel>
<killAfter>-1</killAfter>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ private void recoverInternal() throws IOException, ExecutionException, Interrupt
this.graphDefRef.set(graphDef);
this.ready = true;
logger.info("SchemaManager recovered. version [" + graphDef.getVersion() + "]");
logger.info(graphDef.toProto().toString());
}

public void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,11 @@ public void clearIngest(
int storeCount = this.metaService.getStoreCount();
AtomicInteger counter = new AtomicInteger(storeCount);
AtomicBoolean finished = new AtomicBoolean(false);
String dataPath = request.getDataPath();
for (int i = 0; i < storeCount; i++) {
this.storeIngestor.clearIngest(
i,
dataPath,
new CompletionCallback<Void>() {
@Override
public void onCompleted(Void res) {
Expand Down
Loading
Loading