Skip to content

Commit

Permalink
Support download dataset from origin URL in standalone mode (#828)
Browse files Browse the repository at this point in the history
* Support download dataset from origin URL in standalone mode
  • Loading branch information
lidongze0629 committed Sep 22, 2021
1 parent 4bd128b commit 13b6b4f
Show file tree
Hide file tree
Showing 10 changed files with 428 additions and 25 deletions.
9 changes: 7 additions & 2 deletions coordinator/gscoordinator/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

from gscoordinator.io_utils import PipeWatcher
from gscoordinator.launcher import Launcher
from gscoordinator.utils import GRAPHSCOPE_HOME
from gscoordinator.utils import INTERACTIVE_ENGINE_SCRIPT
from gscoordinator.utils import WORKSPACE
from gscoordinator.utils import ResolveMPICmdPrefix
Expand Down Expand Up @@ -360,6 +361,8 @@ def create_interactive_instance(self, config: dict):
"{}:{}".format(key, value) for key, value in engine_params.items()
]
enable_gaia = config[types_pb2.GIE_ENABLE_GAIA].b
env = os.environ.copy()
env.update({"GRAPHSCOPE_HOME": GRAPHSCOPE_HOME})
cmd = [
INTERACTIVE_ENGINE_SCRIPT,
"create_gremlin_instance_on_k8s",
Expand All @@ -377,7 +380,7 @@ def create_interactive_instance(self, config: dict):
cmd,
start_new_session=True,
cwd=os.getcwd(),
env=os.environ.copy(),
env=env,
universal_newlines=True,
encoding="utf-8",
stdin=subprocess.DEVNULL,
Expand All @@ -388,6 +391,8 @@ def create_interactive_instance(self, config: dict):
return process

def close_interactive_instance(self, object_id):
env = os.environ.copy()
env.update({"GRAPHSCOPE_HOME": GRAPHSCOPE_HOME})
cmd = [
INTERACTIVE_ENGINE_SCRIPT,
"close_gremlin_instance_on_k8s",
Expand All @@ -401,7 +406,7 @@ def close_interactive_instance(self, object_id):
cmd,
start_new_session=True,
cwd=os.getcwd(),
env=os.environ.copy(),
env=env,
universal_newlines=True,
encoding="utf-8",
stdin=subprocess.DEVNULL,
Expand Down
9 changes: 7 additions & 2 deletions coordinator/gscoordinator/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from gscoordinator.io_utils import PipeWatcher
from gscoordinator.utils import ANALYTICAL_ENGINE_PATH
from gscoordinator.utils import GRAPHSCOPE_HOME
from gscoordinator.utils import INTERACTIVE_ENGINE_SCRIPT
from gscoordinator.utils import WORKSPACE
from gscoordinator.utils import ResolveMPICmdPrefix
Expand Down Expand Up @@ -209,6 +210,8 @@ def create_interactive_instance(self, config: dict):
"{}:{}".format(key, value) for key, value in engine_params.items()
]
enable_gaia = config[types_pb2.GIE_ENABLE_GAIA].b
env = os.environ.copy()
env.update({"GRAPHSCOPE_HOME": GRAPHSCOPE_HOME})
cmd = [
INTERACTIVE_ENGINE_SCRIPT,
"create_gremlin_instance_on_local",
Expand All @@ -226,7 +229,7 @@ def create_interactive_instance(self, config: dict):
cmd,
start_new_session=True,
cwd=os.getcwd(),
env=os.environ.copy(),
env=env,
universal_newlines=True,
encoding="utf-8",
stdin=subprocess.DEVNULL,
Expand All @@ -237,6 +240,8 @@ def create_interactive_instance(self, config: dict):
return process

def close_interactive_instance(self, object_id):
env = os.environ.copy()
env.update({"GRAPHSCOPE_HOME": GRAPHSCOPE_HOME})
cmd = [
INTERACTIVE_ENGINE_SCRIPT,
"close_gremlin_instance_on_local",
Expand All @@ -248,7 +253,7 @@ def close_interactive_instance(self, object_id):
cmd,
start_new_session=True,
cwd=os.getcwd(),
env=os.environ.copy(),
env=env,
universal_newlines=True,
encoding="utf-8",
stdin=subprocess.DEVNULL,
Expand Down
17 changes: 14 additions & 3 deletions coordinator/gscoordinator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,20 @@
# GRAPHSCOPE_HOME
# 1) get from environment variable `GRAPHSCOPE_HOME`, if not exist,
# 2) infer from COORDINATOR_HOME
try:
GRAPHSCOPE_HOME = os.environ["GRAPHSCOPE_HOME"]
except KeyError:
GRAPHSCOPE_HOME = os.environ.get("GRAPHSCOPE_HOME", None)

# resolve from pip installed package
if GRAPHSCOPE_HOME is None:
if os.path.isdir(os.path.join(COORDINATOR_HOME, "graphscope.runtime")):
GRAPHSCOPE_HOME = os.path.join(COORDINATOR_HOME, "graphscope.runtime")

# resolve from egg installed package
if GRAPHSCOPE_HOME is None:
if os.path.isdir(os.path.join(COORDINATOR_HOME, "..", "graphscope.runtime")):
GRAPHSCOPE_HOME = os.path.join(COORDINATOR_HOME, "..", "graphscope.runtime")

# resolve from develop source tree
if GRAPHSCOPE_HOME is None:
GRAPHSCOPE_HOME = os.path.join(COORDINATOR_HOME, "..")

# ANALYTICAL_ENGINE_HOME
Expand Down
2 changes: 2 additions & 0 deletions python/graphscope/dataset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

DATA_SITE = "https://graphscope.oss-cn-beijing.aliyuncs.com/dataset"
268 changes: 268 additions & 0 deletions python/graphscope/dataset/io_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2021 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import hashlib
import logging
import os
import shutil
import sys
import tarfile
import urllib
import zipfile
from urllib.request import urlretrieve

from tqdm import tqdm

logger = logging.getLogger("graphscope")


if sys.version_info >= (3, 6):

def _path_to_string(path):
if isinstance(path, os.PathLike):
return os.fspath(path)
return path


elif sys.version_info >= (3, 4):

def _path_to_string(path):
import pathlib

if isinstance(path, pathlib.Path):
return str(path)
return path


else:

def _path_to_string(path):
return path


def _resolve_hasher(algorithm, file_hash=None):
"""Returns hash algorithm as hashlib function."""
if algorithm == "sha256":
return hashlib.sha256()
if algorithm == "auto" and file_hash is not None and len(file_hash) == 64:
return hashlib.sha256()
return hashlib.md5()


def _hash_file(fpath, algorithm="sha256", chunk_size=65535):
"""Calculates a file sha256 or md5 hash.
Examples:
..code:: python
>>> _hash_file("/path/to/file")
"ccd128ab673e5d7dd1cceeaa4ba5d65b67a18212c4a27b0cd090359bd7042b10"
"""
if isinstance(algorithm, str):
hasher = _resolve_hasher(algorithm)
else:
hasher = algorithm
with open(fpath, "rb") as file:
for chunk in iter(lambda: file.read(chunk_size), b""):
hasher.update(chunk)
return hasher.hexdigest()


def _extract_archive(fpath, path=".", archive_format="auto"):
"""Extracts an archive if it matches tar.gz, tar.bz, tar, or zip formats.
Args:
fpath: `PathLike` object of the archive file.
path: path to extract the archive file.
archive_format (str): Archive format to try for extracting the file.
Options are "tar", "zip", "auto" or None.
"tar" includes tar, tar.gz, tar.bz files.
The default "auto" is ["tar", "zip"].
None or an empty list will return no matches found.
Returns:
True if a match was found and an archive extraction was completed,
False otherwise.
"""
if archive_format is None:
return False
if archive_format == "auto":
archive_format = ["tar", "zip"]
if isinstance(archive_format, str):
archive_format = [archive_format]

fpath = _path_to_string(fpath)
path = _path_to_string(path)

for archive_type in archive_format:
if archive_type == "tar":
open_fn = tarfile.open
is_match_fn = tarfile.is_tarfile
if archive_type == "zip":
open_fn = zipfile.ZipFile
is_match_fn = zipfile.is_zipfile

if is_match_fn(fpath):
with open_fn(fpath) as archive:
try:
archive.extractall(path)
except (tarfile.TarError, RuntimeError, KeyboardInterrupt):
if os.path.exists(path):
if os.path.isfile(path):
os.remove(path)
else:
shutil.rmtree(path)
raise
return True
return False


def validate_file(fpath, file_hash, algorithm="auto", chunk_size=65535):
"""Validates a file against a sha256 or md5 hash.
Args:
fpath: `PathLike` object of the file being validated
file_hash (str): The expected hash string of the file.
The sha256 and md5 hash algorithms are both supported.
algorithm (str): Hash algorithm, one of "auto", "sha256", or "md5".
The default "auto" detects the hash algorithm in use.
chunk_size (int): Bytes to read at a time, important for large files.
Returns (bool): Whether the file is valid.
"""
hasher = _resolve_hasher(algorithm, file_hash)
if str(_hash_file(fpath, hasher, chunk_size)) == str(file_hash):
return True
return False


def download_file(
fname,
origin,
file_hash=None,
hash_algorithm="auto",
extract=False,
archive_format="auto",
cache_dir=None,
cache_subdir="datasets",
):
"""Downloads a file from a URL if it not already in the cache.
By default the file at the url `origin` is downloaded to the cache_dir
`~/.graphscope, placed in the cache_subdir `datasets`, and given the
filename `fname`. The final location of a file `example.txt` would
therefore be `~/.graphscope/datsets/example.txt`
File in tar, tar.gz, tar.bz, and zip formats can also be extracted.
Passing a hash will verify the file after download. The command line
programs `shasum` and `sha256sum` can compute the hash.
Args:
fname: `PathLike` object of the file. If an absolute path `/path/to/file`
is specified the file will be saved at that location.
origin (str): Original URL of the file.
file_hash (str): The excepted hash string of the file after download.
The sha256 and md5 hash algorithms are both supported.
hash_algorithm (str): Select the hash algorithm to verify the file.
Options are `"md5"`, `"sha256"`, and `"auto"`
The default "auto" detects the hash algorithm in use.
extract (bool): True tries extracting the file as an Archive, like zip.
archive_format (str): Archive format to try for extracting the file.
Options are `"auto"` `"tar"` `"zip"` and `None`.
`"tar"` includes "tar", "tar.gz", and "tar.bz" files.
The default `"auto"` corresponds to `["tar", "zip"]`.
None or an empty list will return no matches found.
cache_dir: Location of `PathLike` object to store cached files, when None,
it defaults to the default directory `~/.graphscope`
cache_subdir: Subdirectory under the cache dir where the file is saved.
Returns:
Path to the download file.
"""
if cache_dir is None:
cache_dir = os.path.join(os.path.expanduser("~"), ".graphscope")
cache_dir = os.path.expanduser(cache_dir)
if os.path.exists(cache_dir) and not os.access(cache_dir, os.W_OK):
cache_dir = os.path.join("/tmp", ".graphscope")
datadir = os.path.join(cache_dir, cache_subdir)
os.makedirs(datadir, exist_ok=True)

fname = _path_to_string(fname)
fpath = os.path.join(datadir, fname)

download = False
if os.path.exists(fpath):
# file found, verify if a hash was provided
if file_hash is not None:
if not validate_file(fpath, file_hash, algorithm=hash_algorithm):
logger.warning(
"A local file was found, but it seems to be incomplete "
"or outdated because the %s file hash does not match the "
"original value of %s, so we will re-download the data.",
hash_algorithm,
file_hash,
)
download = True
else:
download = True

if download:
logger.info("Downloading data from %s", origin)

class ProgressTracker(object):
# Maintain progbar for the lifetime of download
progbar = None
record_downloaded = None

def show_progress(block_num, block_size, total_size):
if ProgressTracker.progbar is None:
ProgressTracker.progbar = tqdm(
total=total_size, unit="iB", unit_scale=True
)
downloaded = min(block_num * block_size, total_size)
if ProgressTracker.record_downloaded is None:
ProgressTracker.record_downloaded = downloaded
update_downloaded = downloaded
else:
update_downloaded = downloaded - ProgressTracker.record_downloaded
ProgressTracker.record_downloaded = downloaded
ProgressTracker.progbar.update(update_downloaded)
if downloaded >= total_size:
ProgressTracker.progbar.close()
ProgressTracker.progbar = None
ProgressTracker.record_downloaded = None

error_msg = "URL fetch failure on {}:{} -- {}"
try:
try:
urlretrieve(origin, fpath, show_progress)
except urllib.error.HTTPError as e:
raise Exception(error_msg.format(origin, e.code, e.msg))
except urllib.error.URLError as e:
# `URLError` has been made a subclass of OSError since version 3.3
# https://docs.python.org/3/library/urllib.error.html
raise Exception(error_msg.format(origin, e.errno, e.reason))
except (Exception, KeyboardInterrupt):
if os.path.exists(fpath):
os.remove(fpath)
raise

if extract:
_extract_archive(fpath, datadir, archive_format)

return fpath

0 comments on commit 13b6b4f

Please sign in to comment.