diff --git a/coordinator/gscoordinator/cluster.py b/coordinator/gscoordinator/cluster.py index d6fb502a90fc..c1a63233063e 100644 --- a/coordinator/gscoordinator/cluster.py +++ b/coordinator/gscoordinator/cluster.py @@ -59,6 +59,7 @@ from gscoordinator.io_utils import PipeWatcher from gscoordinator.launcher import Launcher +from gscoordinator.utils import GRAPHSCOPE_HOME from gscoordinator.utils import INTERACTIVE_ENGINE_SCRIPT from gscoordinator.utils import WORKSPACE from gscoordinator.utils import ResolveMPICmdPrefix @@ -360,6 +361,8 @@ def create_interactive_instance(self, config: dict): "{}:{}".format(key, value) for key, value in engine_params.items() ] enable_gaia = config[types_pb2.GIE_ENABLE_GAIA].b + env = os.environ.copy() + env.update({"GRAPHSCOPE_HOME": GRAPHSCOPE_HOME}) cmd = [ INTERACTIVE_ENGINE_SCRIPT, "create_gremlin_instance_on_k8s", @@ -377,7 +380,7 @@ def create_interactive_instance(self, config: dict): cmd, start_new_session=True, cwd=os.getcwd(), - env=os.environ.copy(), + env=env, universal_newlines=True, encoding="utf-8", stdin=subprocess.DEVNULL, @@ -388,6 +391,8 @@ def create_interactive_instance(self, config: dict): return process def close_interactive_instance(self, object_id): + env = os.environ.copy() + env.update({"GRAPHSCOPE_HOME": GRAPHSCOPE_HOME}) cmd = [ INTERACTIVE_ENGINE_SCRIPT, "close_gremlin_instance_on_k8s", @@ -401,7 +406,7 @@ def close_interactive_instance(self, object_id): cmd, start_new_session=True, cwd=os.getcwd(), - env=os.environ.copy(), + env=env, universal_newlines=True, encoding="utf-8", stdin=subprocess.DEVNULL, diff --git a/coordinator/gscoordinator/launcher.py b/coordinator/gscoordinator/launcher.py index 3fbefa96fc17..8d6a80e0a0d5 100644 --- a/coordinator/gscoordinator/launcher.py +++ b/coordinator/gscoordinator/launcher.py @@ -32,6 +32,7 @@ from gscoordinator.io_utils import PipeWatcher from gscoordinator.utils import ANALYTICAL_ENGINE_PATH +from gscoordinator.utils import GRAPHSCOPE_HOME from gscoordinator.utils import INTERACTIVE_ENGINE_SCRIPT from gscoordinator.utils import WORKSPACE from gscoordinator.utils import ResolveMPICmdPrefix @@ -209,6 +210,8 @@ def create_interactive_instance(self, config: dict): "{}:{}".format(key, value) for key, value in engine_params.items() ] enable_gaia = config[types_pb2.GIE_ENABLE_GAIA].b + env = os.environ.copy() + env.update({"GRAPHSCOPE_HOME": GRAPHSCOPE_HOME}) cmd = [ INTERACTIVE_ENGINE_SCRIPT, "create_gremlin_instance_on_local", @@ -226,7 +229,7 @@ def create_interactive_instance(self, config: dict): cmd, start_new_session=True, cwd=os.getcwd(), - env=os.environ.copy(), + env=env, universal_newlines=True, encoding="utf-8", stdin=subprocess.DEVNULL, @@ -237,6 +240,8 @@ def create_interactive_instance(self, config: dict): return process def close_interactive_instance(self, object_id): + env = os.environ.copy() + env.update({"GRAPHSCOPE_HOME": GRAPHSCOPE_HOME}) cmd = [ INTERACTIVE_ENGINE_SCRIPT, "close_gremlin_instance_on_local", @@ -248,7 +253,7 @@ def close_interactive_instance(self, object_id): cmd, start_new_session=True, cwd=os.getcwd(), - env=os.environ.copy(), + env=env, universal_newlines=True, encoding="utf-8", stdin=subprocess.DEVNULL, diff --git a/coordinator/gscoordinator/utils.py b/coordinator/gscoordinator/utils.py index 11b3441085db..97606f1fceb5 100644 --- a/coordinator/gscoordinator/utils.py +++ b/coordinator/gscoordinator/utils.py @@ -85,9 +85,20 @@ # GRAPHSCOPE_HOME # 1) get from environment variable `GRAPHSCOPE_HOME`, if not exist, # 2) infer from COORDINATOR_HOME -try: - GRAPHSCOPE_HOME = os.environ["GRAPHSCOPE_HOME"] -except KeyError: +GRAPHSCOPE_HOME = os.environ.get("GRAPHSCOPE_HOME", None) + +# resolve from pip installed package +if GRAPHSCOPE_HOME is None: + if os.path.isdir(os.path.join(COORDINATOR_HOME, "graphscope.runtime")): + GRAPHSCOPE_HOME = os.path.join(COORDINATOR_HOME, "graphscope.runtime") + +# resolve from egg installed package +if GRAPHSCOPE_HOME is None: + if os.path.isdir(os.path.join(COORDINATOR_HOME, "..", "graphscope.runtime")): + GRAPHSCOPE_HOME = os.path.join(COORDINATOR_HOME, "..", "graphscope.runtime") + +# resolve from develop source tree +if GRAPHSCOPE_HOME is None: GRAPHSCOPE_HOME = os.path.join(COORDINATOR_HOME, "..") # ANALYTICAL_ENGINE_HOME diff --git a/python/graphscope/dataset/__init__.py b/python/graphscope/dataset/__init__.py index 618dda69a7be..d820da2bdd60 100644 --- a/python/graphscope/dataset/__init__.py +++ b/python/graphscope/dataset/__init__.py @@ -15,3 +15,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +DATA_SITE = "https://graphscope.oss-cn-beijing.aliyuncs.com/dataset" diff --git a/python/graphscope/dataset/io_utils.py b/python/graphscope/dataset/io_utils.py new file mode 100644 index 000000000000..9b82fdd4fca0 --- /dev/null +++ b/python/graphscope/dataset/io_utils.py @@ -0,0 +1,268 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2021 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import hashlib +import logging +import os +import shutil +import sys +import tarfile +import urllib +import zipfile +from urllib.request import urlretrieve + +from tqdm import tqdm + +logger = logging.getLogger("graphscope") + + +if sys.version_info >= (3, 6): + + def _path_to_string(path): + if isinstance(path, os.PathLike): + return os.fspath(path) + return path + + +elif sys.version_info >= (3, 4): + + def _path_to_string(path): + import pathlib + + if isinstance(path, pathlib.Path): + return str(path) + return path + + +else: + + def _path_to_string(path): + return path + + +def _resolve_hasher(algorithm, file_hash=None): + """Returns hash algorithm as hashlib function.""" + if algorithm == "sha256": + return hashlib.sha256() + if algorithm == "auto" and file_hash is not None and len(file_hash) == 64: + return hashlib.sha256() + return hashlib.md5() + + +def _hash_file(fpath, algorithm="sha256", chunk_size=65535): + """Calculates a file sha256 or md5 hash. + + Examples: + ..code:: python + + >>> _hash_file("/path/to/file") + "ccd128ab673e5d7dd1cceeaa4ba5d65b67a18212c4a27b0cd090359bd7042b10" + """ + if isinstance(algorithm, str): + hasher = _resolve_hasher(algorithm) + else: + hasher = algorithm + with open(fpath, "rb") as file: + for chunk in iter(lambda: file.read(chunk_size), b""): + hasher.update(chunk) + return hasher.hexdigest() + + +def _extract_archive(fpath, path=".", archive_format="auto"): + """Extracts an archive if it matches tar.gz, tar.bz, tar, or zip formats. + + Args: + fpath: `PathLike` object of the archive file. + path: path to extract the archive file. + archive_format (str): Archive format to try for extracting the file. + Options are "tar", "zip", "auto" or None. + "tar" includes tar, tar.gz, tar.bz files. + The default "auto" is ["tar", "zip"]. + None or an empty list will return no matches found. + + Returns: + True if a match was found and an archive extraction was completed, + False otherwise. + """ + if archive_format is None: + return False + if archive_format == "auto": + archive_format = ["tar", "zip"] + if isinstance(archive_format, str): + archive_format = [archive_format] + + fpath = _path_to_string(fpath) + path = _path_to_string(path) + + for archive_type in archive_format: + if archive_type == "tar": + open_fn = tarfile.open + is_match_fn = tarfile.is_tarfile + if archive_type == "zip": + open_fn = zipfile.ZipFile + is_match_fn = zipfile.is_zipfile + + if is_match_fn(fpath): + with open_fn(fpath) as archive: + try: + archive.extractall(path) + except (tarfile.TarError, RuntimeError, KeyboardInterrupt): + if os.path.exists(path): + if os.path.isfile(path): + os.remove(path) + else: + shutil.rmtree(path) + raise + return True + return False + + +def validate_file(fpath, file_hash, algorithm="auto", chunk_size=65535): + """Validates a file against a sha256 or md5 hash. + + Args: + fpath: `PathLike` object of the file being validated + file_hash (str): The expected hash string of the file. + The sha256 and md5 hash algorithms are both supported. + algorithm (str): Hash algorithm, one of "auto", "sha256", or "md5". + The default "auto" detects the hash algorithm in use. + chunk_size (int): Bytes to read at a time, important for large files. + + Returns (bool): Whether the file is valid. + """ + hasher = _resolve_hasher(algorithm, file_hash) + if str(_hash_file(fpath, hasher, chunk_size)) == str(file_hash): + return True + return False + + +def download_file( + fname, + origin, + file_hash=None, + hash_algorithm="auto", + extract=False, + archive_format="auto", + cache_dir=None, + cache_subdir="datasets", +): + """Downloads a file from a URL if it not already in the cache. + + By default the file at the url `origin` is downloaded to the cache_dir + `~/.graphscope, placed in the cache_subdir `datasets`, and given the + filename `fname`. The final location of a file `example.txt` would + therefore be `~/.graphscope/datsets/example.txt` + + File in tar, tar.gz, tar.bz, and zip formats can also be extracted. + Passing a hash will verify the file after download. The command line + programs `shasum` and `sha256sum` can compute the hash. + + Args: + fname: `PathLike` object of the file. If an absolute path `/path/to/file` + is specified the file will be saved at that location. + origin (str): Original URL of the file. + file_hash (str): The excepted hash string of the file after download. + The sha256 and md5 hash algorithms are both supported. + hash_algorithm (str): Select the hash algorithm to verify the file. + Options are `"md5"`, `"sha256"`, and `"auto"` + The default "auto" detects the hash algorithm in use. + extract (bool): True tries extracting the file as an Archive, like zip. + archive_format (str): Archive format to try for extracting the file. + Options are `"auto"` `"tar"` `"zip"` and `None`. + `"tar"` includes "tar", "tar.gz", and "tar.bz" files. + The default `"auto"` corresponds to `["tar", "zip"]`. + None or an empty list will return no matches found. + cache_dir: Location of `PathLike` object to store cached files, when None, + it defaults to the default directory `~/.graphscope` + cache_subdir: Subdirectory under the cache dir where the file is saved. + + Returns: + Path to the download file. + """ + if cache_dir is None: + cache_dir = os.path.join(os.path.expanduser("~"), ".graphscope") + cache_dir = os.path.expanduser(cache_dir) + if os.path.exists(cache_dir) and not os.access(cache_dir, os.W_OK): + cache_dir = os.path.join("/tmp", ".graphscope") + datadir = os.path.join(cache_dir, cache_subdir) + os.makedirs(datadir, exist_ok=True) + + fname = _path_to_string(fname) + fpath = os.path.join(datadir, fname) + + download = False + if os.path.exists(fpath): + # file found, verify if a hash was provided + if file_hash is not None: + if not validate_file(fpath, file_hash, algorithm=hash_algorithm): + logger.warning( + "A local file was found, but it seems to be incomplete " + "or outdated because the %s file hash does not match the " + "original value of %s, so we will re-download the data.", + hash_algorithm, + file_hash, + ) + download = True + else: + download = True + + if download: + logger.info("Downloading data from %s", origin) + + class ProgressTracker(object): + # Maintain progbar for the lifetime of download + progbar = None + record_downloaded = None + + def show_progress(block_num, block_size, total_size): + if ProgressTracker.progbar is None: + ProgressTracker.progbar = tqdm( + total=total_size, unit="iB", unit_scale=True + ) + downloaded = min(block_num * block_size, total_size) + if ProgressTracker.record_downloaded is None: + ProgressTracker.record_downloaded = downloaded + update_downloaded = downloaded + else: + update_downloaded = downloaded - ProgressTracker.record_downloaded + ProgressTracker.record_downloaded = downloaded + ProgressTracker.progbar.update(update_downloaded) + if downloaded >= total_size: + ProgressTracker.progbar.close() + ProgressTracker.progbar = None + ProgressTracker.record_downloaded = None + + error_msg = "URL fetch failure on {}:{} -- {}" + try: + try: + urlretrieve(origin, fpath, show_progress) + except urllib.error.HTTPError as e: + raise Exception(error_msg.format(origin, e.code, e.msg)) + except urllib.error.URLError as e: + # `URLError` has been made a subclass of OSError since version 3.3 + # https://docs.python.org/3/library/urllib.error.html + raise Exception(error_msg.format(origin, e.errno, e.reason)) + except (Exception, KeyboardInterrupt): + if os.path.exists(fpath): + os.remove(fpath) + raise + + if extract: + _extract_archive(fpath, datadir, archive_format) + + return fpath diff --git a/python/graphscope/dataset/ldbc.py b/python/graphscope/dataset/ldbc.py index 98ac57027166..18d796076542 100644 --- a/python/graphscope/dataset/ldbc.py +++ b/python/graphscope/dataset/ldbc.py @@ -18,6 +18,8 @@ import os +from graphscope.dataset import DATA_SITE +from graphscope.dataset.io_utils import download_file from graphscope.framework.graph import Graph from graphscope.framework.loader import Loader @@ -270,17 +272,51 @@ def load_ldbc(sess, prefix, directed=True): """ -def load_ldbc(sess, prefix, directed=True): +def load_ldbc(sess, prefix=None, directed=True): """Load ldbc dataset as a ArrowProperty Graph. + Args: sess (:class:`graphscope.Session`): Load graph within the session. - prefix (str): Data directory. + prefix: `PathLike` object that represents a path. + With standalone mode, set prefix None will try to download from + source URL. Defaults to None. directed (bool, optional): Determine to load a directed or undirected graph. Defaults to True. Returns: - :class:`graphscope.Graph`: A Graph object which graph type is ArrowProperty + :class:`graphscope.framework.graph.GraphDAGNode`: + A Graph node which graph type is ArrowProperty, evaluated in eager mode. + + Examples: + .. code:: python + + >>> # lazy mode + >>> import graphscope + >>> from graphscope.dataset.ldbc import load_ldbc + >>> sess = graphscope.session(mode="lazy") + >>> g = load_ldbc(sess, "/path/to/dataset", True) + >>> g1 = sess.run(g) + + >>> # eager mode + >>> import graphscope + >>> from graphscope.dataset.ldbc import load_ldbc + >>> sess = graphscope.session(mode="eager") + >>> g = load_ldbc(sess, "/path/to/dataset", True) + """ - prefix = os.path.expandvars(prefix) + if prefix is not None: + prefix = os.path.expandvars(prefix) + else: + fname = "ldbc_sample.tar.gz" + origin = f"{DATA_SITE}/ldbc_sample.tar.gz" + fpath = download_file( + fname, + origin=origin, + extract=True, + file_hash="1a3d3c36fbf416c2a02ca4163734192eed602649220d7ceef2735fc11173fc6c", + ) + # assumed dirname is ldbc_sample after extracting from ldbc_sample.tar.gz + prefix = fpath[0:-7] + vertices = { "comment": ( Loader( diff --git a/python/graphscope/dataset/modern_graph.py b/python/graphscope/dataset/modern_graph.py index 68ace62c124e..daddb5a05b63 100644 --- a/python/graphscope/dataset/modern_graph.py +++ b/python/graphscope/dataset/modern_graph.py @@ -18,25 +18,56 @@ import os +from graphscope.dataset import DATA_SITE +from graphscope.dataset.io_utils import download_file from graphscope.framework.graph import Graph from graphscope.framework.loader import Loader -def load_modern_graph(sess, prefix, directed=True): +def load_modern_graph(sess, prefix=None, directed=True): """Load modern graph. Modern graph consist 6 vertices and 6 edges, useful to test the basic functionalities. Args: sess (:class:`graphscope.Session`): Load graph within the session. - prefix (str): Data directory. + prefix (str): `PathLike` object that represents a path. + With standalone mode, set prefix None will try to download from + source URL. Defaults to None. directed (bool, optional): Determine to load a directed or undirected graph. Defaults to True. Returns: - :class:`graphscope.Graph`: A Graph object which graph type is ArrowProperty + :class:`graphscope.framework.graph.GraphDAGNode`: + A Graph node which graph type is ArrowProperty, evaluated in eager mode. + + >>> # lazy mode + >>> import graphscope + >>> from graphscope.dataset. modern_graph import load_modern_graph + >>> sess = graphscope.session(mode="lazy") + >>> g = load_modern_graph(sess, "/path/to/dataset", True) + >>> g1 = sess.run(g) + + >>> # eager mode + >>> import graphscope + >>> from graphscope.dataset. modern_graph import load_modern_graph + >>> sess = graphscope.session(mode="eager") + >>> g = load_modern_graph(sess, "/path/to/dataset", True) """ - prefix = os.path.expandvars(prefix) + if prefix is not None: + prefix = os.path.expandvars(prefix) + else: + fname = "modern_graph.tar.gz" + origin = f"{DATA_SITE}/modern_graph.tar.gz" + fpath = download_file( + fname, + origin=origin, + extract=True, + file_hash="a67c02191ea9dfa618a83d94087349a25937b92973f42206a28fdf6fa5299dec", + ) + # assumed dirname is modern_graph after extracting from modern_graph.tar.gz + prefix = fpath[0:-7] + graph = sess.g(directed=directed) graph = ( graph.add_vertices( diff --git a/python/graphscope/dataset/ogbn_mag.py b/python/graphscope/dataset/ogbn_mag.py index b6e49df484ac..27e3a410e04a 100644 --- a/python/graphscope/dataset/ogbn_mag.py +++ b/python/graphscope/dataset/ogbn_mag.py @@ -18,25 +18,58 @@ import os +from graphscope.dataset import DATA_SITE +from graphscope.dataset.io_utils import download_file from graphscope.framework.graph import Graph -def load_ogbn_mag(sess, prefix): +def load_ogbn_mag(sess, prefix=None): """Load ogbn_mag graph. - The ogbn-mag dataset is a heterogeneous network composed of a subset of the Microsoft Academic Graph (MAG). - See more details here: - https://ogb.stanford.edu/docs/nodeprop/#ogbn-mag + The ogbn-mag dataset is a heterogeneous network composed of a subset + of the Microsoft Academic Graph (MAG). See more details here: + + https://ogb.stanford.edu/docs/nodeprop/#ogbn-mag Args: sess (:class:`graphscope.Session`): Load graph within the session. - prefix (str): Data directory. - directed (bool, optional): Determine to load a directed or undirected graph. - Defaults to True. + prefix: `PathLike` object that represents a path. + With standalone mode, set prefix None will try to download from + source URL. Defaults to None. Returns: - :class:`graphscope.Graph`: A Graph object which graph type is ArrowProperty + :class:`graphscope.framework.graph.GraphDAGNode`: + A Graph node which graph type is ArrowProperty, evaluated in eager mode. + + Examples: + .. code:: python + + >>> # lazy mode + >>> import graphscope + >>> from graphscope.dataset.ogbn_mag import load_ogbn_mag + >>> sess = graphscope.session(mode="lazy") + >>> g = load_ogbn_mag(sess, "/path/to/dataset") + >>> g1 = sess.run(g) + + >>> # eager mode + >>> import graphscope + >>> from graphscope.dataset.ogbn_mag import load_ogbn_mag + >>> sess = graphscope.session(mode="eager") + >>> g = load_ogbn_mag(sess, "/path/to/dataset") """ - prefix = os.path.expandvars(prefix) + if prefix is not None: + prefix = os.path.expandvars(prefix) + else: + fname = "ogbn_mag_small.tar.gz" + origin = f"{DATA_SITE}/ogbn_mag_small.tar.gz" + fpath = download_file( + fname, + origin=origin, + extract=True, + file_hash="ccd128ab673e5d7dd1cceeaa4ba5d65b67a18212c4a27b0cd090359bd7042b10", + ) + # assumed dirname is ogbn_mag_small after extracting from ogbn_mag_small.tar.gz + prefix = fpath[0:-7] + graph = sess.g() graph = ( graph.add_vertices(os.path.join(prefix, "paper.csv"), "paper") diff --git a/python/requirements.txt b/python/requirements.txt index 0d0bb09a2ce7..dc8d7fe49d5e 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -6,8 +6,9 @@ kubernetes nest_asyncio networkx numpy +packaging pandas protobuf>=3.12.0 PyYAML scipy -packaging +tqdm diff --git a/python/tests/unittest/test_graph.py b/python/tests/unittest/test_graph.py index a50786dd1b37..f6813eb7776f 100644 --- a/python/tests/unittest/test_graph.py +++ b/python/tests/unittest/test_graph.py @@ -27,6 +27,8 @@ from graphscope import property_sssp from graphscope import sssp from graphscope.dataset.ldbc import load_ldbc +from graphscope.dataset.modern_graph import load_modern_graph +from graphscope.dataset.ogbn_mag import load_ogbn_mag from graphscope.framework.errors import AnalyticalEngineInternalError from graphscope.framework.errors import GRPCError from graphscope.framework.errors import InvalidArgumentError @@ -622,3 +624,12 @@ def test_add_column(ldbc_graph, arrow_modern_graph): # g6 = sub_graph_5.add_column(ret, selector={"cc": "r"}) # with pytest.raises(AnalyticalEngineInternalError): # print(g6.schema) + + +def test_download_dataset(graphscope_session): + g1 = load_modern_graph(graphscope_session) + g1.unload() + g2 = load_ldbc(graphscope_session) + g2.unload() + g3 = load_ogbn_mag(graphscope_session) + g3.unload()