Skip to content

Commit

Permalink
Add a ci to test loading graph from HDFS (#750)
Browse files Browse the repository at this point in the history
  • Loading branch information
acezen committed Sep 3, 2021
1 parent 1e602fc commit 96fba3d
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 9 deletions.
29 changes: 29 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,32 @@ jobs:
--cov-report=term --exitfirst -s -vvv --log-cli-level=INFO \
./tests/kubernetes
- name: HDFS test
env:
CHANGE_MINIKUBE_NONE_USER: true
GS_IMAGE: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope:${{ github.sha }}
run: |
cd /tmp
wget -q https://mirror.cogentco.com/pub/apache/hadoop/common/hadoop-2.10.1/hadoop-2.10.1.tar.gz
tar -zxf hadoop-2.10.1.tar.gz
export JAVA_HOME=/usr/lib/jvm/default-java/
cd ${GITHUB_WORKSPACE}/.github/workflows/hadoop_scripts
./prepare_hadoop.sh /tmp/hadoop-2.10.1
export PATH=${PATH}:/tmp/hadoop-2.10.1/bin
export GS_TEST_DIR=${GITHUB_WORKSPACE}/gstest
hadoop fs -mkdir /ldbc_sample || true
hadoop fs -chmod 777 /ldbc_sample
hadoop fs -put ${GS_TEST_DIR}/ldbc_sample/person_0_0.csv /ldbc_sample/person_0_0.csv
hadoop fs -put ${GS_TEST_DIR}/ldbc_sample/person_knows_person_0_0.csv /ldbc_sample/person_knows_person_0_0.csv
export HDFS_TEST_DIR=hdfs:///ldbc_sample
export HDFS_HOST=$(hostname -I | awk '{print $1}')
cd ${GITHUB_WORKSPACE}/python
python3 -m pytest ./tests/kubernetes/test_demo_script.py -k test_demo_on_hdfs
# FIXME: context.output file not found
hdfs dfs -test -e /ldbc_sample/res.csv_0 && hdfs dfs -test -e /ldbc_sample/res.csv_1
- name: Helm test
run: |
curl https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3 | bash
Expand All @@ -416,6 +442,9 @@ jobs:
- name: Clean
if: always()
run: |
export JAVA_HOME=/usr/lib/jvm/default-java/
HADOOP_SSH_OPTS="-o StrictHostKeyChecking=no" /tmp/hadoop-2.10.1/sbin/stop-dfs.sh || true
rm -rf /tmp/hadoop* || true
sudo docker rmi -f registry.cn-hongkong.aliyuncs.com/graphscope/graphscope:${{ github.sha }} || true
helm uninstall graphscope || true
kubectl delete pod graphscope-test-rpc-service --wait=false || true
Expand Down
5 changes: 3 additions & 2 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@
import grpc
from packaging import version

from gscoordinator.io_utils import StdoutWrapper
from gscoordinator.io_utils import StdStreamWrapper

# capture system stdout
sys.stdout = StdoutWrapper(sys.stdout)
sys.stdout = StdStreamWrapper(sys.stdout)
sys.stderr = StdStreamWrapper(sys.stderr)

from graphscope.framework import utils
from graphscope.framework.dag_utils import create_graph
Expand Down
16 changes: 10 additions & 6 deletions coordinator/gscoordinator/io_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
from queue import Queue


class StdoutWrapper(object):
def __init__(self, stdout, queue=None, drop=True):
self._stdout_backup = stdout
class StdStreamWrapper(object):
def __init__(self, std_stream, queue=None, drop=True):
self._stream_backup = std_stream
if queue is None:
self._lines = Queue()
else:
Expand All @@ -31,19 +31,23 @@ def __init__(self, stdout, queue=None, drop=True):

@property
def stdout(self):
return self._stdout_backup
return self._stream_backup

@property
def stderr(self):
return self._stream_backup

def drop(self, drop=True):
self._drop = drop

def write(self, line):
line = line.encode("ascii", "ignore").decode("ascii")
self._stdout_backup.write(line)
self._stream_backup.write(line)
if not self._drop:
self._lines.put(line)

def flush(self):
self._stdout_backup.flush()
self._stream_backup.flush()

def poll(self, block=True, timeout=None):
return self._lines.get(block=block, timeout=timeout)
Expand Down
2 changes: 1 addition & 1 deletion coordinator/gscoordinator/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def _launch_zetcd(self):
encoding="utf-8",
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=sys.stderr,
stderr=subprocess.STDOUT,
bufsize=1,
)

Expand Down
56 changes: 56 additions & 0 deletions python/tests/kubernetes/test_demo_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,62 @@ def test_demo(gs_session, data_dir):
# GNN engine


@pytest.mark.skipif("HDFS_TEST_DIR" not in os.environ, reason="the test case need HDFS")
def test_demo_on_hdfs(gs_session_distributed):
graph = gs_session_distributed.g()
graph = graph.add_vertices(
Loader(
os.environ["HDFS_TEST_DIR"] + "/person_0_0.csv",
host=os.environ["HDFS_HOST"],
port=9000,
delimiter="|",
),
"person",
[
"firstName",
"lastName",
"gender",
"birthday",
"creationDate",
"locationIP",
"browserUsed",
],
"id",
)
graph = graph.add_edges(
Loader(
os.environ["HDFS_TEST_DIR"] + "/person_knows_person_0_0.csv",
host=os.environ["HDFS_HOST"],
port=9000,
delimiter="|",
),
"knows",
["creationDate"],
src_label="person",
dst_label="person",
)

# Interactive engine
interactive = gs_session_distributed.gremlin(graph)
sub_graph = interactive.subgraph( # noqa: F841
'g.V().hasLabel("person").outE("knows")'
)

# Analytical engine
# project the projected graph to simple graph.
simple_g = sub_graph.project(vertices={"person": []}, edges={"knows": []})

pr_result = graphscope.pagerank(simple_g, delta=0.8)

# output to hdfs
pr_result.output(
os.environ["HDFS_TEST_DIR"] + "/res.csv",
selector={"id": "v.id", "rank": "r"},
host=os.environ["HDFS_HOST"],
port=9000,
)


def test_demo_distribute(gs_session_distributed, data_dir, modern_graph_data_dir):
graph = load_ldbc(gs_session_distributed, data_dir)

Expand Down

0 comments on commit 96fba3d

Please sign in to comment.