Skip to content

Commit

Permalink
Merge branch 'main' into ir_data_type
Browse files Browse the repository at this point in the history
  • Loading branch information
shirly121 committed Jun 14, 2023
2 parents a53db01 + 79cd577 commit e52644a
Show file tree
Hide file tree
Showing 19 changed files with 194 additions and 28 deletions.
2 changes: 1 addition & 1 deletion analytical_engine/core/grape_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ bl::result<rpc::graph::GraphDefPb> GrapeInstance::projectToSimple(
return graph_def;
}
VY_OK_OR_RAISE(client_->Persist(vy_info.vineyard_id()));
// contruct fragment group
// construct fragment group
BOOST_LEAF_AUTO(frag_group_id,
vineyard::ConstructFragmentGroup(
*client_, vy_info.vineyard_id(), comm_spec_));
Expand Down
3 changes: 3 additions & 0 deletions analytical_engine/core/launcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ void VineyardServer::Start() {
" --size " + FLAGS_vineyard_shared_mem +
" --etcd_endpoint " + FLAGS_etcd_endpoint +
" --etcd_prefix vineyard.gsa." + std::to_string(ts);
if (comm_spec_.worker_num() == comm_spec_.local_num()) {
cmd += " --meta local";
}
auto env = boost::this_process::environment();
// Set verbosity level to 2 can get rid of most of vineyard server's
// debugging output
Expand Down
74 changes: 74 additions & 0 deletions analytical_engine/java/grape-graphx/performance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Performance

We test GraphScope for GraphX in end-to-end scenarios to measure the performance improvement of graph computing on Spark GraphX. This includes:
- Graph loading: loading graphs from the file system into memory in the form of a graph
- RDD Op: transforming the graph using RDD-defined operators
- Pregel computin: running graph algorithms based on GraphX Pregel, such as SSSP, PageRank, and CC

## Settings:

| dataset | num of vertices | num of edges | avg degree |
|:--------------: |:---------------: |:-------------: |:-----------: |
| datagen-9_0-fb | 12,857,672 | 1,049,527,226 | 81.6 |
| com-friendster | 65,608,366 | 1,806,067,135 | 27.5 |

The following tests are run on 4 Nodes cluster, each with 48 cores, 96 cpu.


## End-to-End time

By using ORC-format files as input, the time for graph loading and converting it to ```RDD[(Long, Long)]``` is the same for GraphScope and GraphX.

### On com-friendster
#### 256 partitions

| Algorithm | GS Graph Loading | GraphX Graph Loading | GS Query Time | GraphX Query Time | GS E2E Time| GraphX E2E Time | Performance Gain Query | Performance Gain E2E |
|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------: |
|PageRank | 108s | 106s | 152s | 1129s | 260s | 1235s | 7.4x | 4.8x |
| SSSP | 108s | 106s | 31s | 164s | 139s | 270s | 5.3 | 1.9x |
| CC | 108s | 106s | 58s | 228s | 166s | 334s | 3.9x | 2x |


#### 320 partitions

| Algorithm | GS Graph Loading | GraphX Graph Loading | GS Query Time | GraphX Query Time | GS E2E Time| GraphX E2E Time | Performance Gain Query | Performance Gain E2d |
|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------: |
| PageRank | 100s | 100s | 158s | 1089s | 268s | 1189s | 6.5x | 4.4x |
| SSSP | 100s | 100s | 31s | 156s | 131s | 256s | 5x | 2x |
| CC | 100s | 100s | 62s | 219s | 162s | 319s | 2.8x | 2x |

#### 384 partitions
| Algorithm | GS Graph Loading | GraphX Graph Loading | GS Query Time | GraphX Query Time | GS E2E Time| GraphX E2E Time | Performance Gain Query | Performance Gain E2d |
|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------: |
| PageRank | 99s | 98s | 154s | 1028s | 253s | 1126s | 6.7x | 4.5x |
| SSSP | 99s | 98s | 33s | 163s | 132s | 261s | 5x | 2x |
| CC | 99s | 98s | 60s | 223s | 159s | 321s | 2.8x | 2x |



### On Datagen-9_0-fb

#### 256 partitions

| Algorithm | GS Graph Loading | GraphX Graph Loading | GS Query Time | GraphX Query Time | GS E2E Time| GraphX E2E Time | Performance Gain Query | Performance Gain E2d |
|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------: |
| PageRank | 70s | 84s | 90s | 430s | 160s | 514s | 4.8x | 3.2x |
| SSSP | 70s | 84s | 14s | 45s | 84s | 129s | 3x | 1.5x |
| CC | 70s | 84s | 36s | 74s | 106s | 158s | 2x | 1.5x |


#### 320 partitions

| Algorithm | GS Graph Loading | GraphX Graph Loading | GS Query Time | GraphX Query Time | GS E2E Time| GraphX E2E Time | Performance Gain Query | Performance Gain E2d |
|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------: |
| PageRank | 68s | 76s | 87s | 406s | 155s | 482s | 4.7x | 3.1x |
| SSSP | 68s | 76s | 13s | 40s | 81s | 116s | 3x | 1.4x |
| CC | 68s | 76s | 30s | 53s | 98s | 129s | 1.8x | 1.3x |

#### 384 partitions

| Algorithm | GS Graph Loading | GraphX Graph Loading | GS Query Time | GraphX Query Time | GS E2E Time| GraphX E2E Time | Performance Gain Query | Performance Gain E2d |
|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------:|:--------------: |
| PageRank | 68s | 73s | 82s | 395s | 150s | 468s | 4.8x | 3x |
| SSSP | 68s | 73s | 13s | 40s | 81s | 113s | 3x | 1.4x |
| CC | 68s | 73s | 30s | 50s | 98s | 143s | 1.7x | 1.4x |
5 changes: 5 additions & 0 deletions analytical_engine/java/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,8 @@ pr_delta set to 0.85, running for 50 rounds.
| C++ time | 24.15 | 12.46 | 6.59 | 3.59 | 2.11 | 1.56 | 1.53 |
| Java time | 80.77 | 40.94 | 20.87 | 14.55 | 8.14 | 5.13 | 5.15 |
| Java(+LLVM4JNI) time | 49.80 | 24.15 | 10.54 | 6.63 | 3.83 | 2.95 | 3.42 |


## Graphscope-GraphX Integration

We also evaluate the performance of `grape-graphx`, the integration of GraphScope on Spark GraphX. See [grape-graphX performace](grape-graphx/performance.md).
5 changes: 4 additions & 1 deletion coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,11 @@ def _match_frontend_endpoint(pattern, lines):
# create instance
object_id = request.object_id
schema_path = request.schema_path
params = request.params
try:
proc = self._launcher.create_interactive_instance(object_id, schema_path)
proc = self._launcher.create_interactive_instance(
object_id, schema_path, params
)
gie_manager = InteractiveQueryManager(object_id)
# Put it to object_manager to ensure it could be killed during coordinator cleanup
# If coordinator is shutdown by force when creating interactive instance
Expand Down
18 changes: 15 additions & 3 deletions coordinator/gscoordinator/kubernetes_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,12 @@ def _allocate_interactive_engine(self, object_id):
return self.deploy_interactive_engine(object_id)

def _distribute_interactive_process(
self, hosts, object_id: int, schema_path: str, engine_selector: str
self,
hosts,
object_id: int,
schema_path: str,
params: dict,
engine_selector: str,
):
"""
Args:
Expand All @@ -617,6 +622,10 @@ def _distribute_interactive_process(
env = os.environ.copy()
env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME
container = self._engine_cluster.interactive_executor_container_name

params = "\n".join([f"{k}={v}" for k, v in params.items()])
params = base64.b64encode(params.encode("utf-8")).decode("utf-8")

cmd = [
INTERACTIVE_ENGINE_SCRIPT,
"create_gremlin_instance_on_k8s",
Expand All @@ -630,6 +639,7 @@ def _distribute_interactive_process(
str(self._interactive_port + 2), # frontend port
self._coordinator_name,
engine_selector,
params,
]
self._interactive_port += 3
logger.info("Create GIE instance with command: %s", " ".join(cmd))
Expand All @@ -648,7 +658,9 @@ def _distribute_interactive_process(
)
return process

def create_interactive_instance(self, object_id: int, schema_path: str):
def create_interactive_instance(
self, object_id: int, schema_path: str, params: dict
):
pod_name_list, _, _ = self._allocate_interactive_engine(object_id)
if not pod_name_list:
raise RuntimeError("Failed to allocate interactive engine")
Expand All @@ -661,7 +673,7 @@ def create_interactive_instance(self, object_id: int, schema_path: str):
)

return self._distribute_interactive_process(
hosts, object_id, schema_path, engine_selector
hosts, object_id, schema_path, params, engine_selector
)

def close_interactive_instance(self, object_id):
Expand Down
4 changes: 3 additions & 1 deletion coordinator/gscoordinator/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ def create_analytical_instance(self):
pass

@abstractmethod
def create_interactive_instance(self, object_id: int, schema_path: str):
def create_interactive_instance(
self, object_id: int, schema_path: str, params: dict
):
pass

@abstractmethod
Expand Down
28 changes: 23 additions & 5 deletions coordinator/gscoordinator/local_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ def create_analytical_instance(self):
"Analytical engine is listening on %s", self._analytical_engine_endpoint
)

def create_interactive_instance(self, object_id: int, schema_path: str):
def create_interactive_instance(
self, object_id: int, schema_path: str, params: dict
):
try:
logger.info("Java version: %s", get_java_version())
except: # noqa: E722
Expand All @@ -218,6 +220,9 @@ def create_interactive_instance(self, object_id: int, schema_path: str):
else:
num_workers = self._num_workers

params = "\n".join([f"{k}={v}" for k, v in params.items()])
params = base64.b64encode(params.encode("utf-8")).decode("utf-8")

cmd = [
INTERACTIVE_ENGINE_SCRIPT,
"create_gremlin_instance_on_local",
Expand All @@ -229,6 +234,7 @@ def create_interactive_instance(self, object_id: int, schema_path: str):
str(self._interactive_port + 1), # executor rpc port
str(self._interactive_port + 2 * num_workers), # frontend port
self.vineyard_socket,
params,
]
logger.info("Create GIE instance with command: %s", " ".join(cmd))
self._interactive_port += 3
Expand Down Expand Up @@ -356,8 +362,12 @@ def launch_etcd(self):
else:
self._etcd_peer_port = get_free_port()

if isinstance(self._hosts, (list, tuple)):
hosts = self._hosts
else:
hosts = self._hosts.split(",")
local_hostname = "127.0.0.1"
if len(self._hosts) > 1:
if len(hosts) > 1:
try:
local_hostname = socket.gethostname()
socket.gethostbyname(
Expand Down Expand Up @@ -446,8 +456,11 @@ def launch_vineyard(self):
cmd.extend(["--socket", self.vineyard_socket])
cmd.extend(["--rpc_socket_port", str(self._vineyard_rpc_port)])
cmd.extend(["--size", self._shared_mem])
cmd.extend(["-etcd_endpoint", self._etcd_endpoint])
cmd.extend(["-etcd_prefix", f"vineyard.gsa.{ts}"])
if len(hosts) == 1:
cmd.extend(["--meta", "local"])
else:
cmd.extend(["-etcd_endpoint", self._etcd_endpoint])
cmd.extend(["-etcd_prefix", f"vineyard.gsa.{ts}"])
env = os.environ.copy()
env["GLOG_v"] = str(self._glog_level)
env.update(mpi_env)
Expand Down Expand Up @@ -544,7 +557,12 @@ def configure_etcd_endpoint(self):
def start(self):
try:
# create etcd
self.configure_etcd_endpoint()
if isinstance(self._hosts, (list, tuple)):
hosts = self._hosts
else:
hosts = self._hosts.split(",")
if len(hosts) > 1:
self.configure_etcd_endpoint()
# create vineyard
self.launch_vineyard()
except Exception: # pylint: disable=broad-except
Expand Down
9 changes: 9 additions & 0 deletions docs/interactive_engine/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ You may see something like:
The number 6 is printed, which is the number of vertices in modern graph.
### Customize Configurations for GIE instance
You could pass additional key-value pairs to customize the startup configuration of GIE, for example:
```python
# Set the timeout value to 10 min
g = gs.gremlin(graph, params={'pegasus.timeout': 600000})
```
## What's the Next
As shown in the above example, it is very easy to use GraphScope to interactively query a graph using the gremlin query language on your local machine. You may find more tutorials [here](https://tinkerpop.apache.org/docs/current/tutorials/getting-started/) for the basic Gremlin usage, in which most read-only queries can be seamlessly executed with the above `g.execute()` function.
Expand Down
25 changes: 21 additions & 4 deletions interactive_engine/assembly/src/bin/graphscope/giectl
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,23 @@ start_frontend() {
declare -r schema_path=$3
declare -r pegasus_hosts=$4
declare -r frontend_port=$5
declare -r params=$6

declare -r threads_per_worker=${THREADS_PER_WORKER:-2}

# create related directories
declare -r log_dir=${GS_LOG}/${object_id}
declare -r config_dir=${GRAPHSCOPE_RUNTIME}/config/${object_id}
declare -r pid_dir=${GRAPHSCOPE_RUNTIME}/pid/${object_id}

mkdir -p ${log_dir} ${config_dir} ${pid_dir}

# make a "current" link
unlink ${GS_LOG}/current || true
ln -s ${log_dir} ${GS_LOG}/current

decoded_params=`echo -n $params | base64 -d`

declare java_opt="-server
-verbose:gc
-Xloggc:${log_dir}/frontend.gc.log
Expand Down Expand Up @@ -113,6 +118,8 @@ start_frontend() {
-e "s@FRONTEND_SERVICE_PORT@${frontend_port}@g" \
-e "s@THREADS_PER_WORKER@${threads_per_worker}@g" \
${GRAPHSCOPE_HOME}/conf/frontend.vineyard.properties > ${config_dir}/frontend.vineyard.properties
echo -e "\n" >> ${config_dir}/frontend.vineyard.properties
echo -e "$decoded_params" >> ${config_dir}/frontend.vineyard.properties

# frontend service hold a handle client of coordinator
java ${java_opt} \
Expand Down Expand Up @@ -142,6 +149,8 @@ start_executor() {
declare -r server_size=$4
declare -r rpc_port=$5
declare -r network_servers=$6
declare -r params=$7

declare -r threads_per_worker=${THREADS_PER_WORKER:-2}

declare -r log_dir=${GS_LOG}/${object_id}
Expand All @@ -157,6 +166,8 @@ start_executor() {
unlink ${GS_LOG}/current || true
ln -s ${log_dir} ${GS_LOG}/current

decoded_params=`echo -n $params | base64 -d`

# set executor config file
sed -e "s@GRAPH_NAME@${object_id}@g" \
-e "s@VINEYARD_OBJECT_ID@${object_id}@g" \
Expand All @@ -166,6 +177,8 @@ start_executor() {
-e "s@NETWORK_SERVERS@${network_servers}@g" \
-e "s@THREADS_PER_WORKER@${threads_per_worker}@g" \
${GRAPHSCOPE_HOME}/conf/executor.vineyard.properties > ${config_dir}/executor.$server_id.vineyard.properties
echo -e "\n" >> ${config_dir}/executor.$server_id.vineyard.properties
echo "$decoded_params" >> ${config_dir}/executor.$server_id.vineyard.properties

cp ${GRAPHSCOPE_HOME}/conf/log4rs.yml ${config_dir}/log4rs.yml

Expand Down Expand Up @@ -201,6 +214,7 @@ create_gremlin_instance_on_local() {
declare -r executor_rpc_port=$6
declare -r frontend_port=$7
export VINEYARD_IPC_SOCKET=$8
declare -r params=$9

declare -r cluster_type="local"
declare -r executor_count="1" # local mode only start one executor
Expand Down Expand Up @@ -228,7 +242,7 @@ create_gremlin_instance_on_local() {
pegasus_hosts=${pegasus_hosts:1}

start_frontend ${GRAPHSCOPE_RUNTIME} ${object_id} ${schema_path} ${pegasus_hosts} \
${frontend_port}
${frontend_port} ${params}

log "FRONTEND_ENDPOINT:127.0.0.1:${frontend_port}"

Expand All @@ -237,7 +251,7 @@ create_gremlin_instance_on_local() {
current_executor_port=$(($executor_port + 2 * $server_id))
current_executor_rpc_port=$(($executor_rpc_port + 2 * $server_id))
start_executor ${GRAPHSCOPE_RUNTIME} ${object_id} ${server_id} ${server_size} ${current_executor_rpc_port} \
${network_servers}
${network_servers} ${params}
done
}

Expand Down Expand Up @@ -271,6 +285,7 @@ create_gremlin_instance_on_k8s() {
declare -r frontend_port=$8
declare -r coordinator_name=$9 # deployment name of coordinator
declare -r engine_selector=${10}
declare -r params=${11}

instance_id=${coordinator_name#*-}

Expand All @@ -289,7 +304,7 @@ create_gremlin_instance_on_k8s() {

launch_frontend_cmd="GRAPHSCOPE_HOME=${GRAPHSCOPE_HOME} \
${GRAPHSCOPE_HOME}/bin/giectl start_frontend \
${GRAPHSCOPE_RUNTIME} ${object_id} ${schema_path} ${pegasus_hosts} ${frontend_port}"
${GRAPHSCOPE_RUNTIME} ${object_id} ${schema_path} ${pegasus_hosts} ${frontend_port} '${params}'"
kubectl cp ${schema_path} ${frontend_name}:${schema_path}

kubectl exec ${frontend_name} -- /bin/bash -c "${launch_frontend_cmd}"
Expand All @@ -303,7 +318,9 @@ create_gremlin_instance_on_k8s() {
_server_id=0
for pod in $(echo ${pod_hosts})
do
launch_executor_cmd="GRAPHSCOPE_HOME=${GRAPHSCOPE_HOME} ${GRAPHSCOPE_HOME}/bin/giectl start_executor ${GRAPHSCOPE_RUNTIME} ${object_id} ${_server_id} ${server_size} ${executor_rpc_port} ${network_servers}"
launch_executor_cmd="GRAPHSCOPE_HOME=${GRAPHSCOPE_HOME} ${GRAPHSCOPE_HOME}/bin/giectl \
start_executor ${GRAPHSCOPE_RUNTIME} ${object_id} ${_server_id} ${server_size} \
${executor_rpc_port} ${network_servers} '${params}'"
# kubectl exec ${pod} -c ${engine_container} -- sudo mkdir -p /var/log/graphscope
# kubectl exec ${pod} -c ${engine_container} -- sudo chown -R graphscope:graphscope /var/log/graphscope
kubectl exec ${pod} -c ${engine_container} -- /bin/bash -c "${launch_executor_cmd}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ frontend.service.port = FRONTEND_SERVICE_PORT

# disable the authentication if username or password is not set
#auth.username = default
#auth.password = default
#auth.password = default
Loading

0 comments on commit e52644a

Please sign in to comment.