Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix issues in review and improvements
- Loading branch information
Showing
4 changed files
with
152 additions
and
137 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,157 +1,166 @@ | ||
# Licensed under a 3-clause BSD style license - see LICENSE.rst | ||
"""Command line tool to perform devel management actions on jupyter notebooks.""" | ||
"""Command line tool to perform actions on jupyter notebooks.""" | ||
|
||
from __future__ import absolute_import, division, print_function, unicode_literals | ||
from black import format_str | ||
import click | ||
import logging | ||
import nbformat | ||
import subprocess | ||
import sys | ||
import time | ||
from ..extern.pathlib import Path | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
try: | ||
import nbformat | ||
except ImportError: | ||
log.error("Jupyter package not presen, 'jupyter' option is not available.") | ||
sys.exit() | ||
|
||
@click.command(name='black') | ||
|
||
@click.command(name='execute') | ||
@click.pass_context | ||
def cli_jupyter_black(ctx): | ||
"""Format code cells with black.""" | ||
def cli_jupyter_execute(ctx): | ||
"""Execute Jupyter notebooks.""" | ||
|
||
for path in build_nblist(ctx): | ||
rawnb = read_notebook(path) | ||
for path in ctx.obj['paths']: | ||
run_notebook(path) | ||
|
||
for cell in rawnb.cells: | ||
fmt = cell['source'] | ||
if cell['cell_type'] == 'code': | ||
try: | ||
fmt = '\n'.join(tag_magics(fmt)) | ||
has_semicolon = fmt.endswith(';') | ||
fmt = format_str(src_contents=fmt, | ||
line_length=79).rstrip() | ||
if has_semicolon: | ||
fmt += ';' | ||
except Exception as ex: | ||
logging.info(ex) | ||
fmt = fmt.replace("###-MAGIC COMMAND-", "") | ||
cell['source'] = fmt | ||
|
||
if rawnb: | ||
nbformat.write(rawnb, str(path)) | ||
log.info('Jupyter notebook {} painted in black.'.format(str(path))) | ||
def run_notebook(path, loglevel=20): | ||
"""Execute a Jupyter notebook.""" | ||
|
||
try: | ||
t = time.time() | ||
subprocess.call( | ||
"jupyter nbconvert " | ||
"--allow-errors " | ||
"--log-level={} " | ||
"--ExecutePreprocessor.timeout=None " | ||
"--ExecutePreprocessor.kernel_name=python3 " | ||
"--to notebook " | ||
"--inplace " | ||
"--execute '{}'".format(loglevel, path), | ||
shell=True) | ||
t = (time.time() - t) / 60 | ||
log.info(' ... Executing duration: {:.2f} mn'.format(t)) | ||
except Exception as ex: | ||
log.error('Error executing file {}'.format(str(path))) | ||
log.error(ex) | ||
|
||
|
||
@click.command(name='stripout') | ||
@click.pass_context | ||
def cli_jupyter_stripout(ctx): | ||
"""Strip output cells.""" | ||
|
||
for path in build_nblist(ctx): | ||
rawnb = read_notebook(path) | ||
for path in ctx.obj['paths']: | ||
rawnb = nbformat.read(str(path), as_version=nbformat.NO_CONVERT) | ||
|
||
for cell in rawnb.cells: | ||
if cell['cell_type'] == 'code': | ||
cell['outputs'] = [] | ||
|
||
if rawnb: | ||
nbformat.write(rawnb, str(path)) | ||
log.info('Jupyter notebook {} output stripped.'.format(str(path))) | ||
|
||
|
||
@click.command(name='execute') | ||
@click.pass_context | ||
def cli_jupyter_execute(ctx): | ||
"""Execute jupyter notebook.""" | ||
|
||
for path in build_nblist(ctx): | ||
run_notebook(path) | ||
nbformat.write(rawnb, str(path)) | ||
log.info('Jupyter notebook {} stripped out.'.format(str(path))) | ||
|
||
|
||
@click.command(name='test') | ||
@click.command(name='black') | ||
@click.pass_context | ||
def cli_jupyter_test(ctx): | ||
"""Check if Jupyter notebook is broken.""" | ||
|
||
for path in build_nblist(ctx): | ||
run_notebook(path) | ||
rawnb = read_notebook(path) | ||
|
||
if rawnb: | ||
report = "" | ||
passed = True | ||
log.info(" ... Testing {}".format(str(path))) | ||
for cell in rawnb.cells: | ||
if 'outputs' in cell.keys(): | ||
for output in cell['outputs']: | ||
if output['output_type'] == 'error': | ||
passed = False | ||
traceitems = ["--TRACEBACK: "] | ||
for o in output['traceback']: | ||
traceitems.append("{}".format(o)) | ||
traceback = "\n".join(traceitems) | ||
infos = "\n\n{} in cell [{}]\n\n" \ | ||
"--SOURCE CODE: \n{}\n\n".format( | ||
output['ename'], | ||
cell['execution_count'], | ||
cell['source'] | ||
) | ||
report = report + infos + traceback | ||
if passed: | ||
log.info(" ... {} Passed".format(str(path))) | ||
else: | ||
log.info(" ... {} Failed".format(str(path))) | ||
log.info(report) | ||
def cli_jupyter_black(ctx): | ||
"""Format code cells with black.""" | ||
try: | ||
import black | ||
except ImportError: | ||
log.error("Black package not present, 'black' option is not available.") | ||
sys.exit() | ||
|
||
for path in ctx.obj['paths']: | ||
rawnb = nbformat.read(str(path), as_version=nbformat.NO_CONVERT) | ||
blacknb = BlackNotebook(rawnb) | ||
blacknb.blackformat() | ||
rawnb = blacknb.rawnb | ||
nbformat.write(rawnb, str(path)) | ||
log.info('Jupyter notebook {} blacked.'.format(str(path))) | ||
|
||
def tag_magics(cellcode): | ||
"""Comment magic commands when formatting cells.""" | ||
|
||
lines = cellcode.splitlines(False) | ||
for line in lines: | ||
if line.startswith("%") or line.startswith("!"): | ||
magic_line = "###-MAGIC COMMAND-" + line | ||
yield magic_line | ||
else: | ||
yield line | ||
class BlackNotebook: | ||
"""Manage the process of black formatting.""" | ||
|
||
MAGIC_TAG = "###-MAGIC TAG-" | ||
|
||
def build_nblist(ctx): | ||
"""Fill list of notebooks in a the given folder.""" | ||
def __init__(self, rawnb): | ||
|
||
if ctx.obj['file']: | ||
yield Path(ctx.obj['file']) | ||
else: | ||
for f in Path(ctx.obj['fold']).iterdir(): | ||
if f.name.endswith('.ipynb'): | ||
yield f | ||
self.rawnb = rawnb | ||
|
||
def blackformat(self): | ||
"""Format code cells.""" | ||
from black import format_str | ||
for cell in self.rawnb.cells: | ||
fmt = cell['source'] | ||
if cell['cell_type'] == 'code': | ||
try: | ||
fmt = '\n'.join(self.tag_magics(fmt)) | ||
has_semicolon = fmt.endswith(';') | ||
fmt = format_str(src_contents=fmt, | ||
line_length=79).rstrip() | ||
if has_semicolon: | ||
fmt += ';' | ||
except Exception as ex: | ||
logging.info(ex) | ||
fmt = fmt.replace(self.MAGIC_TAG, "") | ||
cell['source'] = fmt | ||
|
||
def run_notebook(path): | ||
"""Execute a Jupyter notebook.""" | ||
def tag_magics(self, cellcode): | ||
"""Comment magic commands.""" | ||
|
||
try: | ||
t = time.time() | ||
subprocess.call( | ||
"jupyter nbconvert " | ||
"--allow-errors " | ||
"--ExecutePreprocessor.timeout=None " | ||
"--ExecutePreprocessor.kernel_name=python3 " | ||
"--to notebook " | ||
"--inplace " | ||
"--execute '{}'".format(path), | ||
shell=True) | ||
t = (time.time() - t) / 60 | ||
log.info('Executing duration: {:.2f} mn'.format(t)) | ||
except Exception as ex: | ||
log.error('Error executing file {}'.format(str(path))) | ||
log.error(ex) | ||
lines = cellcode.splitlines(False) | ||
for line in lines: | ||
if line.startswith("%") or line.startswith("!"): | ||
magic_line = self.MAGIC_TAG + line | ||
yield magic_line | ||
else: | ||
yield line | ||
|
||
|
||
def read_notebook(path): | ||
"""Read a Jupyter notebook raw structure.""" | ||
try: | ||
return nbformat.read(str(path), as_version=nbformat.NO_CONVERT) | ||
except Exception as ex: | ||
log.error('Error parsing file {}'.format(str(path))) | ||
log.error(ex) | ||
return | ||
@click.command(name='test') | ||
@click.pass_context | ||
def cli_jupyter_test(ctx): | ||
"""Check if Jupyter notebooks are broken.""" | ||
|
||
for path in ctx.obj['paths']: | ||
test_notebook(path, nbformat) | ||
|
||
|
||
def test_notebook(path): | ||
"""Execute and parse a Jupyter notebook exposing broken cells.""" | ||
|
||
passed = True | ||
log.info(" ... TESTING {}".format(str(path))) | ||
run_notebook(path, 30) | ||
rawnb = nbformat.read(str(path), as_version=nbformat.NO_CONVERT) | ||
|
||
for cell in rawnb.cells: | ||
if 'outputs' in cell.keys(): | ||
for output in cell['outputs']: | ||
if output['output_type'] == 'error': | ||
passed = False | ||
traceitems = ["--TRACEBACK: "] | ||
for o in output['traceback']: | ||
traceitems.append("{}".format(o)) | ||
traceback = "\n".join(traceitems) | ||
infos = "\n\n{} in cell [{}]\n\n" \ | ||
"--SOURCE CODE: \n{}\n\n".format( | ||
output['ename'], | ||
cell['execution_count'], | ||
cell['source'] | ||
) | ||
report = infos + traceback | ||
break | ||
if not passed: | ||
break | ||
|
||
if passed: | ||
log.info(" ... PASSED {}".format(str(path))) | ||
else: | ||
log.info(" ... FAILED {}".format(str(path))) | ||
log.info(report) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters