Permalink
Browse files

upgrade to 0.6.8

  • Loading branch information...
1 parent 7ad933e commit a490a255efd0553cca4454d79ed83b777aae8888 @chineking chineking committed Jan 5, 2017
Showing with 1,239 additions and 203 deletions.
  1. +113 −1 docs/source/df-sort-distinct-apply-zh.rst
  2. +1 −1 odps/_version.py
  3. +2 −1 odps/df/backends/context.py
  4. +64 −28 odps/df/backends/core.py
  5. +151 −86 odps/df/backends/odpssql/codegen.py
  6. +37 −12 odps/df/backends/odpssql/compiler.py
  7. +57 −1 odps/df/backends/odpssql/context.py
  8. +51 −2 odps/df/backends/odpssql/engine.py
  9. +26 −0 odps/df/backends/odpssql/models.py
  10. +8 −1 odps/df/backends/odpssql/rewriter.py
  11. +24 −1 odps/df/backends/odpssql/tests/test_codegen.py
  12. +97 −5 odps/df/backends/odpssql/tests/test_compiler.py
  13. +157 −2 odps/df/backends/odpssql/tests/test_engine.py
  14. +2 −0 odps/df/backends/optimize/core.py
  15. +5 −5 odps/df/backends/pd/compiler.py
  16. +7 −1 odps/df/backends/pd/engine.py
  17. +120 −17 odps/df/backends/pd/tests/test_engine.py
  18. +2 −3 odps/df/backends/selecter.py
  19. +23 −0 odps/df/backends/tests/test_mixed_engine.py
  20. +2 −1 odps/df/backends/utils.py
  21. +114 −1 odps/df/expr/collections.py
  22. +3 −0 odps/df/expr/dynamic.py
  23. +25 −2 odps/df/expr/expressions.py
  24. +1 −3 odps/df/expr/groupby.py
  25. +1 −1 odps/df/expr/merge.py
  26. +1 −1 odps/df/expr/reduction.py
  27. +1 −1 odps/df/expr/tests/test_utils.py
  28. +4 −3 odps/df/expr/utils.py
  29. +5 −5 odps/df/ui.py
  30. +9 −2 odps/df/utils.py
  31. +21 −0 odps/errors.py
  32. +60 −0 odps/lib/xnamedtuple.py
  33. +3 −1 odps/ml/adapter/mixin.py
  34. +1 −1 odps/models/instance.py
  35. +4 −4 odps/runner/df/adapter.py
  36. +15 −0 odps/runner/utils.py
  37. +2 −2 odps/static/ui/src/common.js
  38. +8 −8 odps/static/ui/target/main.js
  39. +12 −0 odps/tests/core.py
@@ -185,7 +185,7 @@
数据缩放
--------
-DataFrame 支持通过最大/最小值或平均值/标准差对数据进行缩放。例如,
+DataFrame 支持通过最大/最小值或平均值/标准差对数据进行缩放。例如,对数据
.. code:: python
@@ -248,6 +248,118 @@ std_scale 可依照标准正态分布对数据进行调整。例如,
std_scale 同样支持 preserve 参数保留原始列,具体请参考 min_max_scale,此处不再赘述。
+空值处理
+--------
+
+DataFrame 支持筛去空值以及填充空值的功能。例如,对数据
+
+.. code:: python
+
+ id name f1 f2 f3 f4
+ 0 0 name1 1.0 NaN 3.0 4.0
+ 1 1 name1 2.0 NaN NaN 1.0
+ 2 2 name1 3.0 4.0 1.0 NaN
+ 3 3 name1 NaN 1.0 2.0 3.0
+ 4 4 name1 1.0 NaN 3.0 4.0
+ 5 5 name1 1.0 2.0 3.0 4.0
+ 6 6 name1 NaN NaN NaN NaN
+
+使用 dropna 可删除 subset 中包含空值的行:
+
+.. code:: python
+
+ >>> df.dropna(subset=['f1', 'f2', 'f3', 'f4'])
+ id name f1 f2 f3 f4
+ 0 5 name1 1.0 2.0 3.0 4.0
+
+如果行中包含非空值则不删除,可以使用 how='all':
+
+.. code:: python
+
+ >>> df.dropna(how='all', subset=['f1', 'f2', 'f3', 'f4'])
+ id name f1 f2 f3 f4
+ 0 0 name1 1.0 NaN 3.0 4.0
+ 1 1 name1 2.0 NaN NaN 1.0
+ 2 2 name1 3.0 4.0 1.0 NaN
+ 3 3 name1 NaN 1.0 2.0 3.0
+ 4 4 name1 1.0 NaN 3.0 4.0
+ 5 5 name1 1.0 2.0 3.0 4.0
+
+你也可以使用 thresh 参数来指定行中至少要有多少个非空值。例如:
+
+.. code:: python
+
+ >>> df.dropna(thresh=3, subset=['f1', 'f2', 'f3', 'f4'])
+ id name f1 f2 f3 f4
+ 0 0 name1 1.0 NaN 3.0 4.0
+ 2 2 name1 3.0 4.0 1.0 NaN
+ 3 3 name1 NaN 1.0 2.0 3.0
+ 4 4 name1 1.0 NaN 3.0 4.0
+ 5 5 name1 1.0 2.0 3.0 4.0
+
+使用 fillna 可使用常数或已有的列填充未知值。下面给出了使用常数填充的例子:
+
+.. code:: python
+
+ >>> df.fillna(100, subset=['f1', 'f2', 'f3', 'f4'])
+ id name f1 f2 f3 f4
+ 0 0 name1 1.0 100.0 3.0 4.0
+ 1 1 name1 2.0 100.0 100.0 1.0
+ 2 2 name1 3.0 4.0 1.0 100.0
+ 3 3 name1 100.0 1.0 2.0 3.0
+ 4 4 name1 1.0 100.0 3.0 4.0
+ 5 5 name1 1.0 2.0 3.0 4.0
+ 6 6 name1 100.0 100.0 100.0 100.0
+
+你也可以使用一个已有的列来填充未知值。例如:
+
+.. code:: python
+
+ >>> df.fillna(df.f2, subset=['f1', 'f2', 'f3', 'f4'])
+ id name f1 f2 f3 f4
+ 0 0 name1 1.0 NaN 3.0 4.0
+ 1 1 name1 2.0 NaN NaN 1.0
+ 2 2 name1 3.0 4.0 1.0 4.0
+ 3 3 name1 1.0 1.0 2.0 3.0
+ 4 4 name1 1.0 NaN 3.0 4.0
+ 5 5 name1 1.0 2.0 3.0 4.0
+ 6 6 name1 NaN NaN NaN NaN
+
+特别地,DataFrame 提供了向前 / 向后填充的功能。通过指定 method 参数为下列值可以达到目的:
+
+================== ==============
+ 取值 含义
+================== ==============
+ bfill / backfill 向前填充
+ ffill / pad 向后填充
+================== ==============
+
+例如:
+
+.. code:: python
+
+ >>> df.fillna(method='bfill', subset=['f1', 'f2', 'f3', 'f4'])
+ id name f1 f2 f3 f4
+ 0 0 name1 1.0 3.0 3.0 4.0
+ 1 1 name1 2.0 1.0 1.0 1.0
+ 2 2 name1 3.0 4.0 1.0 NaN
+ 3 3 name1 1.0 1.0 2.0 3.0
+ 4 4 name1 1.0 3.0 3.0 4.0
+ 5 5 name1 1.0 2.0 3.0 4.0
+ 6 6 name1 NaN NaN NaN NaN
+ >>> df.fillna(method='ffill', subset=['f1', 'f2', 'f3', 'f4'])
+ id name f1 f2 f3 f4
+ 0 0 name1 1.0 1.0 3.0 4.0
+ 1 1 name1 2.0 2.0 2.0 1.0
+ 2 2 name1 3.0 4.0 1.0 1.0
+ 3 3 name1 NaN 1.0 2.0 3.0
+ 4 4 name1 1.0 1.0 3.0 4.0
+ 5 5 name1 1.0 2.0 3.0 4.0
+ 6 6 name1 NaN NaN NaN NaN
+
+你也可以使用 ffill / bfill 函数来简化代码。ffill 等价于 fillna(method='ffill'),
+bfill 等价于 fillna(method='bfill')
+
对所有行/列调用自定义函数
------------------------
View
@@ -15,5 +15,5 @@
# specific language governing permissions and limitations
# under the License.
-version_info = (0, 6, 7)
+version_info = (0, 6, 8)
__version__ = '.'.join(map(str, version_info[:3])) + ''.join(version_info[3:])
@@ -23,7 +23,8 @@ def __init__(self):
self._expr_id_cached_data = dict()
def cache(self, expr, data):
- self._expr_id_cached_data[expr._id] = data
+ if data is not None:
+ self._expr_id_cached_data[expr._id] = data
def is_cached(self, expr):
return expr._id in self._expr_id_cached_data
@@ -24,8 +24,9 @@
import time
import types
import sys
+import threading
-from ...compat import Enum
+from ...compat import six, Enum
from ...dag import DAG
from ...utils import init_progress_ui
from ...ui.progress import create_instance_group
@@ -200,47 +201,73 @@ def _run(self, ui):
return [results[result_idx[idx]] for idx in sorted(result_idx)]
def _run_in_parallel(self, ui, n_parallel, async=False, timeout=None):
+ submits_lock = threading.RLock()
submits = dict()
+ user_wait = dict()
results = dict()
calls = self.topological_sort()
def close_ui(*_):
- if all(call in submits and results[call] is not None for call in calls):
- ui.close()
+ with submits_lock:
+ if all(call in submits and results[call] is not None for call in calls):
+ ui.close()
executor = futures.ThreadPoolExecutor(max_workers=n_parallel)
for call in calls:
- def run(func):
- prevs = self.predecessors(func)
- if prevs:
- fs = [submits[p] for p in prevs]
- futures.wait(fs)
- for f in fs:
- if f.exception():
- raise RuntimeError('Node execution failed due to failure '
- 'of previous node, exception: %s' % f.exception())
- res = func(ui=ui, progress_proportion=1.0 / len(calls))
- results[func] = res
- return res
-
- future = executor.submit(run, call)
+ future = futures.Future()
if call.result_index is not None:
future.add_done_callback(close_ui)
- submits[call] = future
+ user_wait[call] = future
+
+ for call in calls:
+ def run(func):
+ try:
+ prevs = self.predecessors(func)
+ if prevs:
+ fs = [user_wait[p] for p in prevs]
+ for f in fs:
+ if f.exception():
+ raise RuntimeError('Node execution failed due to failure '
+ 'of previous node, exception: %s' % f.exception())
+
+ user_wait[func].set_running_or_notify_cancel()
+ res = func(ui=ui, progress_proportion=1.0 / len(calls))
+ results[func] = res
+ user_wait[func].set_result(res)
+ return res
+ except:
+ e, tb = sys.exc_info()[1:]
+ if six.PY2:
+ user_wait[func].set_exception_info(e, tb)
+ else:
+ user_wait[func].set_exception(e)
+ raise
+ finally:
+ with submits_lock:
+ for f in self.successors(func):
+ if f in submits:
+ continue
+ prevs = self.predecessors(f)
+ if all(p in submits and user_wait[p].done() for p in prevs):
+ submits[f] = executor.submit(run, f)
+
+ if not self.predecessors(call):
+ with submits_lock:
+ submits[call] = executor.submit(run, call)
if not async:
- dones, _ = futures.wait(submits.values())
+ dones, _ = futures.wait(user_wait.values())
for done in dones:
done.result()
return [results[c] for c in
sorted([c for c in calls if c.result_index is not None],
key=lambda x: x.result_index)]
if timeout:
- futures.wait(submits.values(), timeout=timeout)
- return [submits[c] for c in
+ futures.wait(user_wait.values(), timeout=timeout)
+ return [user_wait[c] for c in
sorted([c for c in calls if c.result_index is not None],
key=lambda x: x.result_index)]
@@ -307,14 +334,19 @@ def _dispatch(self, expr_dag, expr, ctx):
if not ctx.is_cached(expr):
def h():
def inner(*args, **kwargs):
- data = self._cache(*args, **kwargs)
- if data:
+ ret = self._cache(*args, **kwargs)
+ if ret:
+ data, node = ret
ctx.cache(expr, data)
+ return node
return inner
return h()
else:
- expr_dag.substitute(expr, ctx.get_cached(expr))
- elif expr._deps is not None:
+ cached = ctx.get_cached(expr)
+ expr_dag.substitute(expr, cached,
+ parents=[e for e in expr_dag.successors(expr)
+ if e is not cached])
+ elif expr._deps:
return self._handle_dep
def _new_analyzer(self, expr_dag, on_sub=None):
@@ -348,6 +380,7 @@ def _new_execute_node(self, expr_dag):
def _handle_dep(self, expr_dag, dag, expr, **kwargs):
root = expr_dag.root
+ execute_nodes = []
for dep in root._deps:
if isinstance(dep, tuple):
if len(dep) == 3:
@@ -367,6 +400,9 @@ def dep_callback(res):
execute_node = getattr(self, action)(ExprDAG(node, dag=expr_dag), dag, node,
analyze=False, **kwargs)
execute_node.callback = dep_callback
+ execute_nodes.append(execute_node)
+
+ return execute_nodes
def compile(self, *expr_args_kwargs):
ctx = ExecuteContext() # expr -> new_expr
@@ -572,10 +608,10 @@ def _add_node(self, dag_node, dag):
dag.add_node(dag_node)
for node in nodes:
node_expr = node.expr
- if node_expr.is_ancestor(dag_node.expr):
- dag.add_edge(dag_node, node)
- elif dag_node.expr.is_ancestor(node_expr):
+ if dag_node.expr.is_ancestor(node_expr):
dag.add_edge(node, dag_node)
+ elif node_expr.is_ancestor(dag_node.expr):
+ dag.add_edge(dag_node, node)
@classmethod
def _execute_dag(cls, dag, ui=None, async=False, n_parallel=1, timeout=None):
Oops, something went wrong.

0 comments on commit a490a25

Please sign in to comment.