Skip to content

Commit

Permalink
Merge 0bfc736 into f9087f8
Browse files Browse the repository at this point in the history
  • Loading branch information
haowen-xu committed Jul 18, 2018
2 parents f9087f8 + 0bfc736 commit 2a9ebc5
Show file tree
Hide file tree
Showing 18 changed files with 498 additions and 154 deletions.
5 changes: 2 additions & 3 deletions mltoolkit/datafs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from . import (archivefs, base, errors, localfs, mongofs, utils)
from . import (archivefs, base, errors, localfs, mongofs)

__all__ = sum(
[m.__all__ for m in [archivefs, base, errors, localfs, mongofs, utils]],
[m.__all__ for m in [archivefs, base, errors, localfs, mongofs]],
[]
)

Expand All @@ -10,7 +10,6 @@
from .errors import *
from .localfs import *
from .mongofs import *
from .utils import *

try:
from . import dataflow
Expand Down
2 changes: 1 addition & 1 deletion mltoolkit/datafs/archivefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import tarfile
import zipfile

from mltoolkit.utils import ActiveFiles, maybe_close
from .base import *
from .errors import UnsupportedOperation, InvalidOpenMode, DataFileNotExist
from .utils import ActiveFiles, maybe_close

__all__ = ['TarArchiveFS', 'ZipArchiveFS']

Expand Down
3 changes: 1 addition & 2 deletions mltoolkit/datafs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@

import six

from mltoolkit.utils import DocInherit, AutoInitAndCloseable
from mltoolkit.utils import maybe_close, DocInherit, AutoInitAndCloseable
from .errors import UnsupportedOperation, DataFileNotExist
from .utils import maybe_close

__all__ = [
'DataFSCapacity',
Expand Down
3 changes: 1 addition & 2 deletions mltoolkit/datafs/localfs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os

from mltoolkit.utils import makedirs
from .utils import ActiveFiles, iter_files
from mltoolkit.utils import makedirs, ActiveFiles, iter_files
from .base import DataFS, DataFSCapacity
from .errors import InvalidOpenMode, UnsupportedOperation, DataFileNotExist

Expand Down
121 changes: 7 additions & 114 deletions mltoolkit/datafs/mongofs.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import six
from gridfs import GridFS, GridFSBucket
from pymongo import MongoClient, CursorType
from pymongo.database import Database
from pymongo.collection import Collection
from pymongo import CursorType

from mltoolkit.utils import MongoBinder
from .base import DataFS, DataFSCapacity
from .errors import DataFileNotExist, InvalidOpenMode, MetaKeyNotExist
from .utils import ActiveFiles

__all__ = ['MongoFS']

META_FIELD = 'metadata'


class MongoFS(DataFS):
class MongoFS(DataFS, MongoBinder):
"""
MongoDB GridFS based :class:`DataFS`.
Expand All @@ -33,21 +30,10 @@ def __init__(self, conn_str, db_name, coll_name, strict=False):
strict (bool): Whether or not this :class:`DataFS` works in
strict mode? (default :obj:`False`)
"""
super(MongoFS, self).__init__(
capacity=DataFSCapacity.ALL,
strict=strict
)

self._conn_str = conn_str
self._db_name = db_name
self._coll_name = coll_name
self._fs_coll_name = '{}.files'.format(coll_name)

self._client = None # type: MongoClient
self._db = None # type: Database
self._gridfs = None # type: GridFS
self._gridfs_bucket = None # type: GridFSBucket
self._collection = None # type: Collection
DataFS.__init__(
self, capacity=DataFSCapacity.ALL, strict=strict)
MongoBinder.__init__(
self, conn_str=conn_str, db_name=db_name, coll_name=coll_name)

if self.strict:
def get_meta_value(r, m, k):
Expand All @@ -58,7 +44,6 @@ def get_meta_value(r, m, k):
get_meta_value = lambda r, m, k: m.get(k)

self._get_meta_value_from_record = get_meta_value
self._active_files = ActiveFiles()

def _make_query_project(self, meta_keys=None, _id=1, filename=1):
ret = {'_id': _id, 'filename': filename}
Expand All @@ -81,98 +66,6 @@ def _make_result_meta(self, record, meta_keys):
return tuple(self._get_meta_value_from_record(record, meta_dict, k)
for k in meta_keys)

@property
def conn_str(self):
"""Get the MongoDB connection string."""
return self._conn_str

@property
def db_name(self):
"""Get the MongoDB database name."""
return self._db_name

@property
def coll_name(self):
"""Get the collection name (prefix) of the GridFS."""
return self._coll_name

@property
def client(self):
"""
Get the MongoDB client. Reading this property will force
the internal states of :class:`MongoFS` to be initialized.
Returns:
MongoClient: The MongoDB client.
"""
self.init()
return self._client

@property
def db(self):
"""
Get the MongoDB database object. Reading this property will force
the internal states of :class:`MongoFS` to be initialized.
Returns:
Database: The MongoDB database object.
"""
self.init()
return self._db

@property
def gridfs(self):
"""
Get the MongoDB GridFS client. Reading this property will force
the internal states of :class:`MongoFS` to be initialized.
Returns:
GridFS: The MongoDB GridFS client.
"""
self.init()
return self._gridfs

@property
def gridfs_bucket(self):
"""
Get the MongoDB GridFS bucket. Reading this property will force
the internal states of :class:`MongoFS` to be initialized.
Returns:
GridFSBucket: The MongoDB GridFS bucket.
"""
self.init()
return self._gridfs_bucket

@property
def collection(self):
"""
Get the MongoDB collection object. Reading this property will force
the internal states of :class:`MongoFS` to be initialized.
Returns:
Collection: The MongoDB collection object.
"""
self.init()
return self._collection

def _init(self):
self._client = MongoClient(self._conn_str)
self._db = self._client.get_database(self._db_name)
self._collection = self._db[self._coll_name]
self._gridfs = GridFS(self._db, self._coll_name)
self._gridfs_bucket = GridFSBucket(self._db, self._coll_name)

def _close(self):
self._active_files.close_all()
try:
if self._client is not None:
self._client.close()
finally:
self._gridfs = None
self._db = None
self._client = None

def clone(self):
return MongoFS(self.conn_str, self.db_name, self.coll_name,
strict=self.strict)
Expand Down
19 changes: 13 additions & 6 deletions mltoolkit/report/report.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import codecs
import os

import jinja2
import six

from mltoolkit.report.container import Container
from .container import Container

__all__ = ['Report']

Expand Down Expand Up @@ -78,12 +79,18 @@ def to_html(self):
styles=styles, scripts=scripts
)

def save(self, path):
def save(self, path_or_file):
"""
Save the rendered HTML as file.
Args:
path (str): The path of the HTML file.
path_or_file: The file path, or a file-like object to write.
"""
with codecs.open(path, 'wb', 'utf-8') as f:
f.write(self.to_html())
if hasattr(path_or_file, 'write'):
s = self.to_html()
if isinstance(s, six.text_type):
s = s.encode('utf-8')
path_or_file.write(s)
else:
with open(path_or_file, 'wb') as f:
self.save(f)
9 changes: 7 additions & 2 deletions mltoolkit/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from . import (concepts, doc_inherit, imported)
from . import (concepts, doc_inherit, exec_proc, file_utils, imported,
mongo_binder)

__all__ = sum(
[m.__all__ for m in [concepts, doc_inherit, imported]],
[m.__all__ for m in [concepts, doc_inherit, exec_proc, file_utils, imported,
mongo_binder]],
[]
)

from .concepts import *
from .doc_inherit import *
from .exec_proc import *
from .file_utils import *
from .imported import *
from .mongo_binder import *
131 changes: 131 additions & 0 deletions mltoolkit/utils/exec_proc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import os
import signal
import subprocess
import sys
import time
from contextlib import contextmanager
from threading import Thread

__all__ = ['timed_wait_proc', 'exec_proc']


if sys.version_info[:2] >= (3, 3):
def timed_wait_proc(proc, timeout):
try:
return proc.wait(timeout)
except subprocess.TimeoutExpired:
return None
else:
def timed_wait_proc(proc, timeout):
itv = min(timeout * .1, .5)
tot = 0.
exit_code = None
while tot + 1e-7 < timeout and exit_code is None:
exit_code = proc.poll()
if exit_code is None:
time.sleep(itv)
tot += itv
return exit_code


@contextmanager
def exec_proc(args, on_stdout=None, on_stderr=None, stderr_to_stdout=False,
buffer_size=16*1024, ctrl_c_timeout=3, kill_timeout=60, **kwargs):
"""
Execute an external program within a context.
Args:
args: Arguments of the program.
on_stdout ((bytes) -> None): Callback for capturing stdout.
on_stderr ((bytes) -> None): Callback for capturing stderr.
stderr_to_stdout (bool): Whether or not to redirect stderr to
stdout? If specified, `on_stderr` will be ignored.
(default :obj:`False`)
buffer_size (int): Size of buffers for reading from stdout and stderr.
ctrl_c_timeout (int): Seconds to wait for the program to
respond to CTRL+C signal. (default 3)
kill_timeout (int): Seconds to wait for the program to terminate after
being killed. (default 60)
**kwargs: Other named arguments passed to :func:`subprocess.Popen`.
Yields:
subprocess.Popen: The process object.
"""
# check the arguments
if stderr_to_stdout:
kwargs['stderr'] = subprocess.STDOUT
on_stderr = None
if on_stdout is not None:
kwargs['stdout'] = subprocess.PIPE
if on_stderr is not None:
kwargs['stderr'] = subprocess.PIPE

# output reader
def reader_func(fd, action):
while not giveup_waiting[0]:
buf = os.read(fd, buffer_size)
if not buf:
break
action(buf)

def make_reader_thread(fd, action):
th = Thread(target=reader_func, args=(fd, action))
th.daemon = True
th.start()
return th

# internal flags
giveup_waiting = [False]

# launch the process
stdout_thread = None # type: Thread
stderr_thread = None # type: Thread
proc = subprocess.Popen(args, **kwargs)

try:
if on_stdout is not None:
stdout_thread = make_reader_thread(proc.stdout.fileno(), on_stdout)
if on_stderr is not None:
stderr_thread = make_reader_thread(proc.stderr.fileno(), on_stderr)

try:
yield proc
except KeyboardInterrupt: # pragma: no cover
if proc.poll() is None:
# Wait for a while to ensure the program has properly dealt
# with the interruption signal. This will help to capture
# the final output of the program.
# TODO: use signal.signal instead for better treatment
_ = timed_wait_proc(proc, 1)

finally:
if proc.poll() is None:
# First, try to interrupt the process with Ctrl+C signal
ctrl_c_signal = (signal.SIGINT if sys.platform != 'win32'
else signal.CTRL_C_EVENT)
os.kill(proc.pid, ctrl_c_signal)
if timed_wait_proc(proc, ctrl_c_timeout) is None:
# If the Ctrl+C signal does not work, terminate it.
proc.kill()
# Finally, wait for at most 60 seconds
if timed_wait_proc(proc, kill_timeout) is None: # pragma: no cover
giveup_waiting[0] = True

# Close the pipes such that the reader threads will ensure to exit,
# if we decide to give up waiting.
def close_pipes():
for f in (proc.stdout, proc.stderr, proc.stdin):
if f is not None:
f.close()

if giveup_waiting[0]: # pragma: no cover
close_pipes()

# Wait for the reader threads to exit
for th in (stdout_thread, stderr_thread):
if th is not None:
th.join()

# Ensure all the pipes are closed.
if not giveup_waiting[0]:
close_pipes()
File renamed without changes.

0 comments on commit 2a9ebc5

Please sign in to comment.