Skip to content

Commit

Permalink
Upgrade to v0.11.2.2 (#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Sep 15, 2022
1 parent 820e838 commit 61e34b2
Show file tree
Hide file tree
Showing 23 changed files with 305 additions and 101 deletions.
2 changes: 1 addition & 1 deletion odps/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

version_info = (0, 11, 2, 1)
version_info = (0, 11, 2, 2)
_num_index = max(idx if isinstance(v, int) else 0
for idx, v in enumerate(version_info))
__version__ = '.'.join(map(str, version_info[:_num_index + 1])) + \
Expand Down
34 changes: 28 additions & 6 deletions odps/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1776,16 +1776,38 @@ def set_security_option(self, option_name, value, project=None):
setattr(sec_options, option_name, value)
sec_options.update()

def run_security_query(self, query, project=None, token=None):
def run_security_query(self, query, project=None, token=None, output_json=True):
"""
Run a security query to grant / revoke / query privileges
Run a security query to grant / revoke / query privileges. If the query is `install package`
or `uninstall package`, return a waitable AuthQueryInstance object, otherwise returns
the result string or json value.
:param query: query text
:param project: project name, if not provided, will be the default project
:return: a JSON object representing the result.
:param str query: query text
:param str project: project name, if not provided, will be the default project
:param bool output_json: parse json for the output
:return: result string / json object
"""
project = self.get_project(name=project)
return project.run_security_query(query, token=token)
return project.run_security_query(query, token=token, output_json=output_json)

def execute_security_query(self, query, project=None, token=None, output_json=True):
"""
Execute a security query to grant / revoke / query privileges and returns
the result string or json value.
:param str query: query text
:param str project: project name, if not provided, will be the default project
:param bool output_json: parse json for the output
:return: result string / json object
"""
from .models import Project

instance_or_result = self.run_security_query(
query, project=project, token=token, output_json=output_json
)
if not isinstance(instance_or_result, Project.AuthQueryInstance):
return instance_or_result
return instance_or_result.wait_for_success()

@utils.deprecated('You no longer have to manipulate session instances to use MaxCompute QueryAcceleration. Try `run_sql_interactive`.')
def attach_session(self, session_name, taskname=None, hints=None):
Expand Down
2 changes: 1 addition & 1 deletion odps/df/backends/pd/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def _open_file(*args, **kwargs):
raise ValueError(
'Unknown library type which should be one of zip(egg, wheel), tar, or tar.gz')

return CompressImporter(*readers, extract=True, supersede=options.df.supersede_libraries)
return CompressImporter(*readers, extract_binary=True, supersede=options.df.supersede_libraries)

@with_thirdparty_libs
def _do_execute(self, expr_dag, expr, ui=None, progress_proportion=1,
Expand Down
4 changes: 4 additions & 0 deletions odps/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ class WaitTimeoutError(ODPSError, TimeoutError):
pass


class SecurityQueryError(ODPSError):
pass


class SQAError(ODPSError):
pass

Expand Down
71 changes: 38 additions & 33 deletions odps/lib/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
def _clean_extract():
if CompressImporter._extract_path:
import shutil
shutil.rmtree(CompressImporter._extract_path)
shutil.rmtree(CompressImporter._extract_path, ignore_errors=True)


class CompressImportError(ImportError):
Expand All @@ -80,9 +80,9 @@ def __init__(self, *compressed_files, **kwargs):
"""
self._files = []
self._prefixes = defaultdict(lambda: set(['']))
self._extract = kwargs.get('extract', False)
self._extract_binary = kwargs.get('extract_binary', kwargs.get('extract', False))
self._extract_all = kwargs.get('extract_all', False)
self._supersede = kwargs.get('supersede', True)
self._local_warned = False

for f in compressed_files:
if isinstance(f, zipfile.ZipFile):
Expand Down Expand Up @@ -113,35 +113,39 @@ def __init__(self, *compressed_files, **kwargs):
raise TypeError('Compressed file can only be zipfile.ZipFile or tarfile.TarFile')

if bin_package:
# binary packages need to be extracted before use
if not ALLOW_BINARY:
raise SystemError('Cannot load binary package. It is quite possible that you are using an old '
'MaxCompute service which does not support binary packages. If this is '
'not true, please set `odps.isolation.session.enable` to True or ask your '
'project owner to change project-level configuration.')
raise SystemError(
'Cannot load binary package. It is quite possible that you are using an old '
'MaxCompute service which does not support binary packages. If this is '
'not true, please set `odps.isolation.session.enable` to True or ask your '
'project owner to change project-level configuration.'
)
if need_extract:
f = self._extract_archive(f)
elif need_extract and self._extract_all:
# when it is forced to extract even if it is a text package, also extract
f = self._extract_archive(f)

prefixes = set([''])
rendered_names = set()
dir_prefixes = set()
if isinstance(f, zipfile.ZipFile):
for name in f.namelist():
name = name if name.endswith('/') else (name.rsplit('/', 1)[0] + '/')
if name in prefixes:
if name in dir_prefixes:
continue
try:
f.getinfo(name + '__init__.py')
except KeyError:
prefixes.add(name)
dir_prefixes.add(name)
elif isinstance(f, tarfile.TarFile):
for member in f.getmembers():
name = member.name if member.isdir() else member.name.rsplit('/', 1)[0]
if name in prefixes:
if name in dir_prefixes:
continue
try:
f.getmember(name + '/__init__.py')
except KeyError:
prefixes.add(name + '/')
dir_prefixes.add(name + '/')
elif isinstance(f, (list, dict)):
# Force ArchiveResource to run under binary mode to resolve manually
# opening __file__ paths in pure-python code.
Expand All @@ -155,45 +159,46 @@ def __init__(self, *compressed_files, **kwargs):

for name in rendered_names:
name = name if name.endswith('/') else (name.rsplit('/', 1)[0] + '/')
if name in prefixes or '/tests/' in name or '/__pycache__/' in name:
if name in dir_prefixes or '/tests/' in name or '/__pycache__/' in name:
continue
if name + '__init__.py' not in rendered_names:
prefixes.add(name)
dir_prefixes.add(name)
else:
if '/' in name.rstrip('/'):
ppath = name.rstrip('/').rsplit('/', 1)[0]
else:
ppath = ''
prefixes.add(ppath)
dir_prefixes.add(ppath)

# make sure only root packages are included,
# otherwise relative imports might be broken
path_patch = []
for p in sorted(dir_prefixes):
parent_exist = False
for pp in path_patch:
if p[:len(pp)] == pp:
parent_exist = True
break
if parent_exist:
continue
path_patch.append(p)

if bin_package:
path_patch = []
for p in sorted(dir_prefixes):
if p in sys.path:
continue
parent_exist = False
for pp in path_patch:
if p[:len(pp)] == pp and p.rstrip('/') + '/__init__.py' in rendered_names:
parent_exist = True
break
if parent_exist:
continue
path_patch.append(p)
path_patch = [p for p in path_patch if p not in sys.path]
if self._supersede:
sys.path = path_patch + sys.path
else:
sys.path = sys.path + path_patch
else:
self._files.append(f)
if prefixes:
self._prefixes[id(f)] = sorted(prefixes)
self._prefixes[id(f)] = sorted([''] + path_patch)

def _extract_archive(self, archive):
if not self._extract:
raise SystemError('We do not allow file-type resource for binary packages. Please upload an '
'archive-typed resource instead.')
if not self._extract_binary and not self._extract_all:
raise SystemError(
'We do not allow file-type resource for binary packages. Please upload an '
'archive-typed resource instead.'
)

cls = type(self)
if not cls._extract_path:
Expand Down
73 changes: 53 additions & 20 deletions odps/lib/tests/test_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
# limitations under the License.

from __future__ import absolute_import
import zipfile
import tarfile
import sys
import os
import shutil
import sys
import tarfile
import tempfile
import textwrap
import zipfile

from odps.compat import BytesIO as StringIO, six
from odps.compat import BytesIO, six
from odps.tests.core import TestBase
from odps.compat import unittest
from odps.lib import importer
Expand All @@ -48,27 +49,29 @@ def tearDown(self):
sys.meta_path = self.meta_path

def testImport(self):
zip_io = StringIO()
zip_io = BytesIO()
zip_f = zipfile.ZipFile(zip_io, 'w')
zip_f.writestr('testa/a.py', 'a = 1')
zip_f.close()

tar_io = StringIO()
tar_io = BytesIO()
tar_f = tarfile.TarFile(fileobj=tar_io, mode='w')
tar_f.addfile(tarfile.TarInfo(name='testb/__init__.py'), fileobj=StringIO())
tar_f.addfile(tarfile.TarInfo(name='testb/__init__.py'), fileobj=BytesIO())
info = tarfile.TarInfo(name='testb/b.py')
c = to_binary('from a import a; b = a + 1')
s = StringIO(c)
c = b'from a import a; b = a + 1'
s = BytesIO(c)
info.size = len(c)
tar_f.addfile(info, fileobj=s)
tar_f.close()

dict_io_init = dict()
dict_io_init['/opt/test_pyodps_dev/testc/__init__.py'] = StringIO()
dict_io_init['/opt/test_pyodps_dev/testc/c.py'] = StringIO(to_binary('from a import a; c = a + 2'))
dict_io_init['/opt/test_pyodps_dev/testc/__init__.py'] = BytesIO()
dict_io_init['/opt/test_pyodps_dev/testc/c.py'] = BytesIO(b'from a import a; c = a + 2')
dict_io_init['/opt/test_pyodps_dev/testc/sub/__init__.py'] = BytesIO(b'from . import mod')
dict_io_init['/opt/test_pyodps_dev/testc/sub/mod.py'] = BytesIO(b'from ..c import c')

dict_io_file = dict()
dict_io_file['/opt/test_pyodps_dev/testd/d.py'] = StringIO(to_binary('from a import a; d = a + 3'))
dict_io_file['/opt/test_pyodps_dev/testd/d.py'] = BytesIO(b'from a import a; d = a + 3')

zip_io.seek(0)
tar_io.seek(0)
Expand All @@ -86,12 +89,13 @@ def testImport(self):
from testc.c import c
self.assertEqual(c, 3)
self.assertRaises(ImportError, __import__, 'c', fromlist=[])
self.assertRaises(ImportError, __import__, 'sub', fromlist=[])
from d import d
self.assertEqual(d, 4)

def testRealImport(self):
six_path = os.path.join(os.path.dirname(os.path.abspath(six.__file__)), 'six.py')
zip_io = StringIO()
zip_io = BytesIO()
zip_f = zipfile.ZipFile(zip_io, 'w')
zip_f.write(six_path, arcname='mylib/five.py')
zip_f.close()
Expand All @@ -104,13 +108,42 @@ def testRealImport(self):
import five
self.assertEqual(list(to_binary('abc')), list(five.binary_type(to_binary('abc'))))

def testImportWithPackageResource(self):
test_src = textwrap.dedent("""
import os
with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "src_file.txt"), "r") as sf:
pass
""")

zip_io = BytesIO()
zip_f = zipfile.ZipFile(zip_io, 'w')
zip_f.writestr('test_all_imp/src_file.txt', '')
zip_f.writestr('test_all_imp/__init__.py', test_src)
zip_f.close()

zip_io.seek(0)
old_meta_path = [mp for mp in sys.meta_path]

with self.assertRaises(IOError):
sys.meta_path.append(CompressImporter(zipfile.ZipFile(zip_io, 'r'), extract_binary=True))
__import__("test_all_imp")

sys.meta_path = old_meta_path
sys.modules.pop("test_all_imp", None)

try:
CompressImporter(zipfile.ZipFile(zip_io, 'r'), extract_all=True)
__import__("test_all_imp")
finally:
shutil.rmtree(CompressImporter._extract_path)
sys.meta_path = old_meta_path
sys.modules.pop("test_all_imp", None)

def testBinaryImport(self):
zip_io = StringIO()
zip_io = BytesIO()
zip_f = zipfile.ZipFile(zip_io, 'w')
zip_f.writestr('test_a/a.so', '')
zip_f.writestr('test_a/__init__.py', '')
zip_f.writestr('testdir/test_b/b.so', '')
zip_f.writestr('testdir/test_b/__init__.py', '')
zip_f.writestr('test_direct.py', '')
zip_f.close()

Expand All @@ -119,11 +152,11 @@ def testBinaryImport(self):

try:
zip_io.seek(0)
CompressImporter(zipfile.ZipFile(zip_io, 'r'), extract=True)
CompressImporter(zipfile.ZipFile(zip_io, 'r'), extract_binary=True)
self.assertTrue(os.path.exists(CompressImporter._extract_path))
import test_direct, test_a, test_b
del test_direct, test_a, test_b
del sys.modules['test_direct'], sys.modules['test_a'], sys.modules['test_b']
import test_direct, test_a
del test_direct, test_a
del sys.modules['test_direct'], sys.modules['test_a']
finally:
shutil.rmtree(CompressImporter._extract_path)
CompressImporter._extract_path = None
Expand Down
6 changes: 4 additions & 2 deletions odps/mars_extension/legacy/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ def to_mars_dataframe(
runtime_endpoint
or kw.pop("cupid_internal_endpoint", None)
or cupid_options.cupid.runtime.endpoint
or odps.endpoint
)
odps_params = dict(project=odps.project, endpoint=runtime_endpoint)

Expand Down Expand Up @@ -487,6 +488,7 @@ def persist_mars_dataframe(
runtime_endpoint
or kw.pop("cupid_internal_endpoint", None)
or cupid_options.cupid.runtime.endpoint
or odps.endpoint
)
odps_params = dict(project=odps.project, endpoint=runtime_endpoint)
if isinstance(odps.account, AliyunAccount):
Expand Down Expand Up @@ -561,7 +563,7 @@ def run_script_in_mars(odps, script, mode="exec", n_workers=1, command_argv=None
)
odps_params = dict(
project=odps.project,
endpoint=runtime_endpoint or cupid_options.cupid.runtime.endpoint,
endpoint=runtime_endpoint or cupid_options.cupid.runtime.endpoint or odps.endpoint,
)
run_script(
script,
Expand Down Expand Up @@ -599,7 +601,7 @@ def run_mars_job(
)
r.op.extra_params["project"] = odps.project
r.op.extra_params["endpoint"] = (
runtime_endpoint or cupid_options.cupid.runtime.endpoint
runtime_endpoint or cupid_options.cupid.runtime.endpoint or odps.endpoint
)
r.execute()
finally:
Expand Down
Loading

0 comments on commit 61e34b2

Please sign in to comment.