diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ccebf97a88e3..8e04883670f1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -396,6 +396,32 @@ jobs: --cov-report=term --exitfirst -s -vvv --log-cli-level=INFO \ ./tests/kubernetes + - name: HDFS test + env: + CHANGE_MINIKUBE_NONE_USER: true + GS_IMAGE: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope:${{ github.sha }} + run: | + cd /tmp + wget -q https://mirror.cogentco.com/pub/apache/hadoop/common/hadoop-2.10.1/hadoop-2.10.1.tar.gz + tar -zxf hadoop-2.10.1.tar.gz + export JAVA_HOME=/usr/lib/jvm/default-java/ + + cd ${GITHUB_WORKSPACE}/.github/workflows/hadoop_scripts + ./prepare_hadoop.sh /tmp/hadoop-2.10.1 + export PATH=${PATH}:/tmp/hadoop-2.10.1/bin + export GS_TEST_DIR=${GITHUB_WORKSPACE}/gstest + hadoop fs -mkdir /ldbc_sample || true + hadoop fs -chmod 777 /ldbc_sample + hadoop fs -put ${GS_TEST_DIR}/ldbc_sample/person_0_0.csv /ldbc_sample/person_0_0.csv + hadoop fs -put ${GS_TEST_DIR}/ldbc_sample/person_knows_person_0_0.csv /ldbc_sample/person_knows_person_0_0.csv + export HDFS_TEST_DIR=hdfs:///ldbc_sample + export HDFS_HOST=$(hostname -I | awk '{print $1}') + + cd ${GITHUB_WORKSPACE}/python + python3 -m pytest ./tests/kubernetes/test_demo_script.py -k test_demo_on_hdfs + # FIXME: context.output file not found + hdfs dfs -test -e /ldbc_sample/res.csv_0 && hdfs dfs -test -e /ldbc_sample/res.csv_1 + - name: Helm test run: | curl https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3 | bash @@ -416,6 +442,9 @@ jobs: - name: Clean if: always() run: | + export JAVA_HOME=/usr/lib/jvm/default-java/ + HADOOP_SSH_OPTS="-o StrictHostKeyChecking=no" /tmp/hadoop-2.10.1/sbin/stop-dfs.sh || true + rm -rf /tmp/hadoop* || true sudo docker rmi -f registry.cn-hongkong.aliyuncs.com/graphscope/graphscope:${{ github.sha }} || true helm uninstall graphscope || true kubectl delete pod graphscope-test-rpc-service --wait=false || true diff --git a/coordinator/gscoordinator/coordinator.py b/coordinator/gscoordinator/coordinator.py index 45120c2c6cdb..e6bfba709086 100644 --- a/coordinator/gscoordinator/coordinator.py +++ b/coordinator/gscoordinator/coordinator.py @@ -39,10 +39,11 @@ import grpc from packaging import version -from gscoordinator.io_utils import StdoutWrapper +from gscoordinator.io_utils import StdStreamWrapper # capture system stdout -sys.stdout = StdoutWrapper(sys.stdout) +sys.stdout = StdStreamWrapper(sys.stdout) +sys.stderr = StdStreamWrapper(sys.stderr) from graphscope.framework import utils from graphscope.framework.dag_utils import create_graph diff --git a/coordinator/gscoordinator/io_utils.py b/coordinator/gscoordinator/io_utils.py index e069a119c9d3..1f4fb3958901 100644 --- a/coordinator/gscoordinator/io_utils.py +++ b/coordinator/gscoordinator/io_utils.py @@ -20,9 +20,9 @@ from queue import Queue -class StdoutWrapper(object): - def __init__(self, stdout, queue=None, drop=True): - self._stdout_backup = stdout +class StdStreamWrapper(object): + def __init__(self, std_stream, queue=None, drop=True): + self._stream_backup = std_stream if queue is None: self._lines = Queue() else: @@ -31,19 +31,23 @@ def __init__(self, stdout, queue=None, drop=True): @property def stdout(self): - return self._stdout_backup + return self._stream_backup + + @property + def stderr(self): + return self._stream_backup def drop(self, drop=True): self._drop = drop def write(self, line): line = line.encode("ascii", "ignore").decode("ascii") - self._stdout_backup.write(line) + self._stream_backup.write(line) if not self._drop: self._lines.put(line) def flush(self): - self._stdout_backup.flush() + self._stream_backup.flush() def poll(self, block=True, timeout=None): return self._lines.get(block=block, timeout=timeout) diff --git a/coordinator/gscoordinator/launcher.py b/coordinator/gscoordinator/launcher.py index c530e1ca0579..57d87a2ae48d 100644 --- a/coordinator/gscoordinator/launcher.py +++ b/coordinator/gscoordinator/launcher.py @@ -281,7 +281,7 @@ def _launch_zetcd(self): encoding="utf-8", stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, - stderr=sys.stderr, + stderr=subprocess.STDOUT, bufsize=1, ) diff --git a/python/tests/kubernetes/test_demo_script.py b/python/tests/kubernetes/test_demo_script.py index ca10861514fb..673c255c9167 100644 --- a/python/tests/kubernetes/test_demo_script.py +++ b/python/tests/kubernetes/test_demo_script.py @@ -136,6 +136,62 @@ def test_demo(gs_session, data_dir): # GNN engine +@pytest.mark.skipif("HDFS_TEST_DIR" not in os.environ, reason="the test case need HDFS") +def test_demo_on_hdfs(gs_session_distributed): + graph = gs_session_distributed.g() + graph = graph.add_vertices( + Loader( + os.environ["HDFS_TEST_DIR"] + "/person_0_0.csv", + host=os.environ["HDFS_HOST"], + port=9000, + delimiter="|", + ), + "person", + [ + "firstName", + "lastName", + "gender", + "birthday", + "creationDate", + "locationIP", + "browserUsed", + ], + "id", + ) + graph = graph.add_edges( + Loader( + os.environ["HDFS_TEST_DIR"] + "/person_knows_person_0_0.csv", + host=os.environ["HDFS_HOST"], + port=9000, + delimiter="|", + ), + "knows", + ["creationDate"], + src_label="person", + dst_label="person", + ) + + # Interactive engine + interactive = gs_session_distributed.gremlin(graph) + sub_graph = interactive.subgraph( # noqa: F841 + 'g.V().hasLabel("person").outE("knows")' + ) + + # Analytical engine + # project the projected graph to simple graph. + simple_g = sub_graph.project(vertices={"person": []}, edges={"knows": []}) + + pr_result = graphscope.pagerank(simple_g, delta=0.8) + + # output to hdfs + pr_result.output( + os.environ["HDFS_TEST_DIR"] + "/res.csv", + selector={"id": "v.id", "rank": "r"}, + host=os.environ["HDFS_HOST"], + port=9000, + ) + + def test_demo_distribute(gs_session_distributed, data_dir, modern_graph_data_dir): graph = load_ldbc(gs_session_distributed, data_dir)