Skip to content

Commit

Permalink
Add GAIA on vineyard test (#708)
Browse files Browse the repository at this point in the history
This PR add a test suite for GAIA on vineyard, and includes some bugfixes.

Co-authored-by: BingqingLyu <lv_bingqing@163.com>
Co-authored-by: siyuan0322 <siyuan0322@gmail.com>
  • Loading branch information
3 people committed Aug 19, 2021
1 parent e092774 commit 9405412
Show file tree
Hide file tree
Showing 13 changed files with 168 additions and 22 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ jobs:
if: false
uses: mxschmitt/action-tmate@v2

- name: Run Gaia on Vineyard Store Test
if: matrix.os == 'ubuntu-20.04'
run: |
source ~/.graphscope_env
export GRAPHSCOPE_HOME=/usr/local
export GS_TEST_DIR=${GITHUB_WORKSPACE}/gstest
# run test
cd interactive_engine/src/gaia-adaptor && ./gaia_on_vineyard_test.sh
- name: Run Gaia on Maxgraph Store Test
if: matrix.os == 'ubuntu-20.04'
run: |
Expand Down
27 changes: 27 additions & 0 deletions interactive_engine/src/gaia-adaptor/gaia_on_vineyard_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/bash
base_dir=$(cd `dirname $0`; pwd)
_port=8277
rm -rf /tmp/test_modern.log
cd ${base_dir}/../../deploy/testing && python3 maxgraph_test_server.py ${_port} &
sleep 5s
# load data and start instance
curl -XPOST http://localhost:${_port} -d 'import graphscope'
curl -XPOST http://localhost:${_port} -d 'graphscope.set_option(show_log=True)'
curl -XPOST http://localhost:${_port} -d 'from graphscope.framework.loader import Loader'
curl -XPOST http://localhost:${_port} -d 'from graphscope.dataset.modern_graph import load_modern_graph'
curl -XPOST http://localhost:${_port} -d 'session=graphscope.session(cluster_type="hosts", num_workers=2, enable_gaia=True)'
curl_sess="curl -XPOST http://localhost:${_port} -d 'graph = load_modern_graph(session, \"${GITHUB_WORKSPACE}/interactive_engine/tests/src/main/resources/modern_graph\")'"
sh -c "$curl_sess"
curl -XPOST http://localhost:${_port} -d 'interactive = session.gremlin(graph)' --output /tmp/test_modern.log
curl -XPOST http://localhost:${_port} -d 'interactive._graph_url[1]' --output /tmp/test_modern.log
GAIA_PORT=`cat /tmp/test_modern.log | awk -F'\/|:' '{print $5}'`
echo "localhost:$GAIA_PORT" > ${base_dir}/src/test/resources/graph.endpoint
cd ${base_dir}/../.. && mvn clean install -DskipTests -Pjava-release
cd ${base_dir} && mvn test
exit_code=$?
curl -XPOST http://localhost:${_port} -d 'session.close()'
ps -ef | grep "python3 maxgraph_test_server.py ${_port}" | grep -v grep | awk '{print $2}' | xargs kill -9
if [ $exit_code -ne 0 ]; then
echo "gaia_on_vineyard_store gremlin test fail"
exit 1
fi
1 change: 1 addition & 0 deletions interactive_engine/src/gaia-adaptor/gremlin_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ LOG_NAME=maxnode ${GRAPHSCOPE_HOME}/bin/giectl max_node_gaia
sleep 20
# load data
cd ${base_dir}/../v2 && mvn -Dtest=com.alibaba.maxgraph.tests.sdk.DataLoadingTest test
echo "localhost:12312" > ${base_dir}/src/test/resources/graph.endpoint
cd ${base_dir} && mvn test
exit_code=$?
ps -ef | grep "com.alibaba.graphscope.gaia.MaxNode" | grep -v grep | awk '{print $2}' | xargs kill -9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public class VineyardGraphStore extends GraphStoreService {
private static final Logger logger = LoggerFactory.getLogger(VineyardGraphStore.class);
public static final String VINEYARD_MODERN_PROPERTY_RESOURCE = "maxgraph.modern.properties.json";
public static final String VINEYARD_MODERN_PROPERTY_RESOURCE = "vineyard.modern.properties.json";
private SchemaFetcher schemaFetcher;

public VineyardGraphStore(SchemaFetcher schemaFetcher) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package com.alibaba.graphscope.gaia;

import org.apache.commons.configuration.Configuration;
import org.apache.commons.io.IOUtils;
import org.apache.tinkerpop.gremlin.AbstractGraphProvider;
import org.apache.tinkerpop.gremlin.LoadGraphWith;
import org.apache.tinkerpop.gremlin.structure.Graph;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class GaiaAdaptorTestGraphProvider extends AbstractGraphProvider {
public static String MODERN_GRAPH_ENDPOINT = "localhost:12312";

@Override
public Map<String, Object> getBaseConfiguration(String graphName, Class<?> test, String testMethodName, LoadGraphWith.GraphData loadGraphWith) {
Map config = new HashMap();
config.put(Graph.GRAPH, RemoteTestGraph.class.getName());
config.put(RemoteTestGraph.GRAPH_NAME, MODERN_GRAPH_ENDPOINT);
config.put(RemoteTestGraph.GRAPH_NAME, getModernGraphEndpoint());
return config;
}

Expand All @@ -34,4 +36,15 @@ public Set<Class> getImplementations() {
public void loadGraphData(final Graph graph, final LoadGraphWith loadGraphWith, final Class testClass, final String testName) {
// do nothing
}

public String getModernGraphEndpoint() {
try {
InputStream inputStream = GaiaAdaptorTestGraphProvider.class.getClassLoader().getResourceAsStream("graph.endpoint");
String endpoint = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
// remove invalid char
return endpoint.replaceAll("[^A-Za-z0-9:]", "");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
localhost:12312
1 change: 1 addition & 0 deletions python/graphscope/dataset/ldbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ def load_ldbc(sess, prefix, directed=True):
Returns:
:class:`graphscope.Graph`: A Graph object which graph type is ArrowProperty
"""
prefix = os.path.expandvars(prefix)
vertices = {
"comment": (
Loader(
Expand Down
1 change: 1 addition & 0 deletions python/graphscope/dataset/modern_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def load_modern_graph(sess, prefix, directed=True):
Returns:
:class:`graphscope.Graph`: A Graph object which graph type is ArrowProperty
"""
prefix = os.path.expandvars(prefix)
graph = sess.g(directed=directed)
graph = (
graph.add_vertices(
Expand Down
1 change: 1 addition & 0 deletions python/graphscope/dataset/ogbn_mag.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def load_ogbn_mag(sess, prefix):
Returns:
:class:`graphscope.Graph`: A Graph object which graph type is ArrowProperty
"""
prefix = os.path.expandvars(prefix)
graph = sess.g()
graph = (
graph.add_vertices(os.path.join(prefix, "paper.csv"), "paper")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@
import org.apache.tinkerpop.gremlin.structure.T;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class CachePropGaiaGraphStep extends GaiaGraphStep implements PropertiesCacheStep {
private ToFetchProperties toFetchProperties;

public CachePropGaiaGraphStep(GaiaGraphStep originalGraphStep) {
super(originalGraphStep);
originalGraphStep.getGraphLabels().forEach(k -> this.addGraphLabels((String) k));
originalGraphStep.getHasContainers().forEach(k -> this.addHasContainer((HasContainer) k));
this.setTraverserRequirement(originalGraphStep.getTraverserRequirement());
this.toFetchProperties = new ToFetchProperties(false, Collections.EMPTY_LIST);
}

@Override
Expand All @@ -27,11 +31,12 @@ public Gremlin.PropKeys cacheProperties() {
keys.add(container.getKey());
}
}
return PlanUtils.convertFrom(new ToFetchProperties(false, keys));
keys.addAll(toFetchProperties.getProperties());
return PlanUtils.convertFrom(new ToFetchProperties(toFetchProperties.isAll(), keys));
}

@Override
public void addPropertiesToCache(ToFetchProperties properties) {
throw new UnsupportedOperationException();
this.toFetchProperties = properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"vertex_properties": {
"-9223372036854775808": {
"name": "marko",
"age": 29
},
"1": {
"name": "vadas",
"age": 27
},
"-9151314442816847872": {
"name": "lop",
"lang": "java"
},
"0": {
"name": "josh",
"age": 32
},
"-9151314442816847871": {
"name": "ripple",
"lang": "java"
},
"2": {
"name": "peter",
"age": 35
}
},
"edge_properties": {
"171470411456254147604591110776164450304": {
"weight": 0.4
},
"171470411456254147623037854849874001920": {
"weight": 1.0
},
"171470411456254147604591110776164450306": {
"weight": 0.2
},
"9223372036854775808": {
"weight": 1.0
},
"27670116110564327424": {
"weight": 0.5
},
"171470411456254147613814482813019226112": {
"weight": 0.4
}
}
}
2 changes: 2 additions & 0 deletions research/gaia/gremlin/gs_gremlin/src/graph_proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ where
self.graph_partitioner.clone(),
self.partition_worker_mapping.clone(),
self.worker_partition_list_mapping.clone(),
self.num_servers,
self.server_index,
);
GremlinJobCompiler::new(partition, self.num_servers, self.server_index)
}
Expand Down
69 changes: 53 additions & 16 deletions research/gaia/gremlin/gs_gremlin/src/graph_proxy/partitioner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ impl Partitioner for MaxGraphMultiPartition {
worker_partition_list.push(pid as u64)
}
}
info!("job_workers {:?}, worker id: {:?}, worker_partition_list {:?}", job_workers, worker_id, worker_partition_list);
info!(
"job_workers {:?}, worker id: {:?}, worker_partition_list {:?}",
job_workers, worker_id, worker_partition_list
);
Ok(Some(worker_partition_list))
}
}
Expand All @@ -87,48 +90,75 @@ pub struct VineyardMultiPartition {
partition_worker_mapping: Arc<RwLock<Option<HashMap<u32, u32>>>>,
// mapping of worker id -> partition list
worker_partition_list_mapping: Arc<RwLock<Option<HashMap<u32, Vec<u32>>>>>,
num_servers: usize,
server_index: u64,
}

impl VineyardMultiPartition {
pub fn new(
graph_partition_manager: Arc<dyn GraphPartitionManager>,
partition_worker_mapping: Arc<RwLock<Option<HashMap<u32, u32>>>>,
worker_partition_list_mapping: Arc<RwLock<Option<HashMap<u32, Vec<u32>>>>>,
num_servers: usize,
server_index: u64,
) -> VineyardMultiPartition {
VineyardMultiPartition {
graph_partition_manager,
partition_worker_mapping,
worker_partition_list_mapping,
num_servers,
server_index,
}
}
}

impl Partitioner for VineyardMultiPartition {
fn get_partition(&self, id: &ID, _worker_num_per_server: usize) -> DynResult<u64> {
fn get_partition(&self, id: &ID, worker_num_per_server: usize) -> DynResult<u64> {
// The partitioning logics is as follows:
// 1. `partition_id = self.graph_partition_manager.get_partition_id(*id as VertexId)` routes a given id
// to the partition that holds its data.
// 2. get worker_id by the prebuild partition_worker_map, which specifies partition_id -> worker_id
let vid = (*id & (ID_MASK)) as VertexId;
let partition_id = self.graph_partition_manager.get_partition_id(vid) as PartitionId;
if let Ok(partition_worker_mapping) = self.partition_worker_mapping.read() {
if let Some(partition_worker_mapping) = partition_worker_mapping.as_ref() {
if let Some(worker_id) = partition_worker_mapping.get(&partition_id) {
Ok(*worker_id as u64)

// Firstly, we check if the job parallelism is identical to the pre-allocated parallelism,
// while one exception is that GAIA will optimize when the plan only have a source step (which may access the storage);
// Then we just follow the above routing rule.
let parallelism = self
.worker_partition_list_mapping
.read()
.unwrap()
.as_ref()
.map_or(0, |map| map.len());
if self.num_servers * worker_num_per_server != parallelism {
// GAIA will optimize to directly query the storage if it only has a source step
if worker_num_per_server == 1 {
Ok(self.server_index)
} else {
Err(str_to_dyn_error(
"Job parallelism is not identical to the pre-allocated parallelism",
))
}
} else {
let vid = (*id & (ID_MASK)) as VertexId;
let partition_id = self.graph_partition_manager.get_partition_id(vid) as PartitionId;
if let Ok(partition_worker_mapping) = self.partition_worker_mapping.read() {
if let Some(partition_worker_mapping) = partition_worker_mapping.as_ref() {
if let Some(worker_id) = partition_worker_mapping.get(&partition_id) {
Ok(*worker_id as u64)
} else {
Err(str_to_dyn_error(
"get worker id failed in VineyardMultiPartition",
))
}
} else {
Err(str_to_dyn_error(
"get worker id failed in VineyardMultiPartition",
"partition_worker_mapping is not initialized in VineyardMultiPartition",
))
}
} else {
Err(str_to_dyn_error(
"partition_worker_mapping is not initialized in VineyardMultiPartition",
"read partition_worker_mapping in VineyardMultiPartition failed",
))
}
} else {
Err(str_to_dyn_error(
"read partition_worker_mapping in VineyardMultiPartition failed",
))
}
}

Expand All @@ -139,10 +169,17 @@ impl Partitioner for VineyardMultiPartition {
) -> DynResult<Option<Vec<u64>>> {
// If only one worker each server, it will process all partitions
if job_workers == 1 {
Ok(Some(self.graph_partition_manager.get_process_partition_list().into_iter().map(|pid| pid as u64).collect()))
Ok(Some(
self.graph_partition_manager
.get_process_partition_list()
.into_iter()
.map(|pid| pid as u64)
.collect(),
))
}
// Vineyard will pre-allocate the worker_partition_list mapping
else if let Ok(worker_partition_list_mapping) = self.worker_partition_list_mapping.read() {
else if let Ok(worker_partition_list_mapping) = self.worker_partition_list_mapping.read()
{
if let Some(worker_partition_list_mapping) = worker_partition_list_mapping.as_ref() {
if let Some(partition_list) = worker_partition_list_mapping.get(&worker_id) {
Ok(Some(partition_list.iter().map(|pid| *pid as u64).collect()))
Expand Down

0 comments on commit 9405412

Please sign in to comment.