In [1]:
GITHUB_USERNAME = "paxtonfitzpatrick"#"$GITHUB_USERNAME$"
GITHUB_REF = "62cba0f23e89e5cb838ac84cd8ed88939bc4d784"#"$GITHUB_REF$"
NOTEBOOK_TYPE = "jupyter"#"$NOTEBOOK_TYPE$"
PYTHON_VERSION = "3.8"#"$PYTHON_VERSION$"
IPYTHON_VERSION = "latest"#"$IPYTHON_VERSION$"

In [2]:
import warnings
from pathlib import Path

import requests


warnings.filterwarnings('error', module='davos')

if NOTEBOOK_TYPE == 'colab':
    # utils module doesn't exist on colab VM, so get current version from GitHub
    utils_module = Path('utils.py').resolve()
    response = requests.get(f'https://raw.githubusercontent.com/{GITHUB_USERNAME}/davos/{GITHUB_REF}/tests/utils.py')
    utils_module.write_text(response.text)
    # also need to install davos locally
    from utils import install_davos
    install_davos(source='github', ref=GITHUB_REF, fork=GITHUB_USERNAME)

In [3]:
import sys
import types
from contextlib import redirect_stdout
from io import StringIO
from pathlib import Path
from textwrap import dedent

import davos

from utils import is_imported, mark, raises, run_tests

In [4]:
IPYTHON_SHELL = get_ipython()

# tests for `davos.core.core`
**Note**: regular expressions defined in `davos.core.regexps` but used in `davos.core.core` are tested in `test_regexps.ipynb`

In [5]:
def test_capture_stdout_multistream():
    """
    should write correct output to stdout (captured for test), 
    in-memory object, and file streams simultaneously
    """
    msg = ("Strictly speaking, I didn't do the theiving. That would be the "
           "pirates. I just moved what they stole from one place to another")
    tmpfile = Path('tmpfile.txt')
    try:
        with redirect_stdout(StringIO()) as mock_stdout:
            with davos.core.core.capture_stdout(
                StringIO(), tmpfile.open('w')
            ) as (mem_stream, file_stream):
                print(msg, end='')
                in_mem = mem_stream.getvalue()
            
            in_stdout = mock_stdout.getvalue()
        
        in_file = tmpfile.read_text()
        
        assert mem_stream.closed, "StringIO object was not closed"
        assert file_stream.closed, "file stream was not closed"

        assert in_stdout == msg, (
            "content written to stdout doesn't match original. Expected:\n"
            f"\"{msg}\"\n\nFound:\n\"{in_stdout}\""
        )
        assert in_mem == msg, (
            "content written to StringIO object doesn't match original. "
            f"Expected:\n\"{msg}\"\n\nFound:\n\"{in_stdout}\""
        )
        assert in_file == msg, (
            "content written to file doesn't match original. Expected:\n"
            f"\"{msg}\"\n\nFound:\n\"{in_file}\""
        )
    finally:
        if not mem_stream.closed:
            mem_stream.close()
        if not file_stream.closed:
            file_stream.close()
        if tmpfile.is_file():
            tmpfile.unlink()

In [6]:
def test_capture_stdout_not_closing():
    """
    passing 'closing=True' should prevent streams from closing when 
    exiting context block
    """
    msg = ("Strictly speaking, I didn't do the theiving. That would be the "
           "pirates. I just moved what they stole from one place to another")
    try:
        with davos.core.core.capture_stdout(StringIO(), closing=False) as mem_stream:
            print(msg, end='')
                
        assert not mem_stream.closed, (
            "StringIO stream was closed despite passing 'closing=False'"
        )
    finally:
        mem_stream.close()

In [7]:
@mark.ipython_pre7
def test_check_conda_ipython_pre7():
    """
    conda is not available on Google Colaboratory and not yet supported 
    for IPython<7.0.0
    """
    davos.core.core.check_conda()
    assert davos.config.conda_avail is False
    assert davos.config.conda_env is None
    assert davos.config.conda_envs_dirs is None

In [8]:
@mark.ipython_post7
def test_check_conda_ipython_post7():
    """basic test for expected values"""
    davos.core.core.check_conda()
    assert davos.config.conda_avail is True
    assert davos.config.conda_env == 'kernel-env'
    assert isinstance(davos.config.conda_envs_dirs, dict)

In [9]:
def test_check_conda_stdout_parsing():
    """
    check that 'conda_avail', 'conda_env', and 'conda_envs_dirs' config 
    fields are set correctly. Temporarily overloads 
    '_check_conda_avail_helper' and 'run_shell_command' functions with 
    functions defined below to compare expected values & results.
    """
    old_conda_avail = davos.config._conda_avail
    old_conda_env = davos.config._conda_env
    old_conda_envs_dirs = davos.config._conda_envs_dirs
    old__check_conda_avail_helper = davos.core.core._check_conda_avail_helper
    old_run_shell_command = davos.core.core.run_shell_command
    
    mock_conda_list_output = dedent("""\
    # packages in environment at /path/to/anaconda3/envs/mock-current-env:
    #
    # Name                    Version                   Build  Channel
    ipython                   X.X.X                    YYYYYY    ZZZZ
    etc...""")
    mock_conda_info_output = dedent("""\
    base /path/to/anaconda3
    mock-current-env /path/to/anaconda3/envs/mock-current-env
    mock-other-env /path/to/anaconda3/envs/mock-other-env""")
    
    expected_conda_envs_dirs = {
        'base': '/path/to/anaconda3',
        'mock-current-env': '/path/to/anaconda3/envs/mock-current-env',
        'mock-other-env': '/path/to/anaconda3/envs/mock-other-env'
    }
    
    def _mock_check_conda_avail_helper():
        return mock_conda_list_output
    
    def _mock_run_shell_command(command, live_stdout=None):
        if command == "conda info --envs | grep -E '^\w' | sed -E 's/ +\*? +/ /g'":
            return mock_conda_info_output
        else:
            return old_run_shell_command(command, live_stdout=live_stdout)
    
    try:
        davos.core.core._check_conda_avail_helper = _mock_check_conda_avail_helper
        davos.core.core.run_shell_command = _mock_run_shell_command
        
        davos.core.core.check_conda()
        
        assert davos.config.conda_avail is True
        assert davos.config.conda_env == 'mock-current-env', (
            f"Found: {davos.config.conda_env}"
        )
        assert davos.config.conda_envs_dirs == expected_conda_envs_dirs, (
            f"Expected: {expected_conda_envs_dirs}\nFound: "
            f"{davos.config.conda_envs_dirs}"
        )
    finally:
        davos.config._conda_avail = old_conda_avail
        davos.config._conda_env = old_conda_env
        davos.config._conda_envs_dirs = old_conda_envs_dirs
        davos.core.core._check_conda_avail_helper = old__check_conda_avail_helper
        davos.core.core.run_shell_command = old_run_shell_command

In [10]:
def test_check_conda_bad_info_cmd():
    """
    when parsing 'conda list ipython' output is successful but parsing 
    'conda info --envs' output fails, 'conda_avail' and 'conda_env' 
    config fields should still be assigned correctly, but 
    'conda_envs_dirs' field should be 'None'
    """
    old_conda_avail = davos.config._conda_avail
    old_conda_env = davos.config._conda_env
    old_conda_envs_dirs = davos.config._conda_envs_dirs
    old__check_conda_avail_helper = davos.core.core._check_conda_avail_helper
    old_run_shell_command = davos.core.core.run_shell_command
    
    mock_conda_list_output = dedent("""\
    # packages in environment at /path/to/anaconda3/envs/mock-current-env:
    #
    # Name                    Version                   Build  Channel
    ipython                   X.X.X                    YYYYYY    ZZZZ
    etc...""")
    
    def _mock_check_conda_avail_helper():
        return mock_conda_list_output
    
    def _mock_run_shell_command(command, live_stdout=None):
        if command == "conda info --envs | grep -E '^\w' | sed -E 's/ +\*? +/ /g'":
            raise Exception
        else:
            return old_run_shell_command(command, live_stdout=live_stdout)
    
    try:
        davos.core.core._check_conda_avail_helper = _mock_check_conda_avail_helper
        davos.core.core.run_shell_command = _mock_run_shell_command
        
        davos.core.core.check_conda()
        
        assert davos.config.conda_avail is True
        assert davos.config.conda_env == 'mock-current-env', (
            f"Found: {davos.config.conda_env}"
        )
        assert davos.config.conda_envs_dirs is None
    finally:
        davos.config._conda_avail = old_conda_avail
        davos.config._conda_env = old_conda_env
        davos.config._conda_envs_dirs = old_conda_envs_dirs
        davos.core.core._check_conda_avail_helper = old__check_conda_avail_helper
        davos.core.core.run_shell_command = old_run_shell_command

In [11]:
def test_check_conda_bad_list_cmd_raises():
    """
    when 'conda list ipython' output is parsed successfully, but yields 
    an environment name not in the dict parsed from the output of 
    'conda info --envs', 'check_conda' function should raise a 
    'DavosError'
    """
    old_conda_avail = davos.config._conda_avail
    old_conda_env = davos.config._conda_env
    old_conda_envs_dirs = davos.config._conda_envs_dirs
    old__check_conda_avail_helper = davos.core.core._check_conda_avail_helper
    old_run_shell_command = davos.core.core.run_shell_command
    
    mock_conda_list_output = dedent("""\
    # packages in environment at bad-env-path:
    #
    # Name                    Version                   Build  Channel
    ipython                   X.X.X                    YYYYYY    ZZZZ
    etc...""")
    mock_conda_info_output = dedent("""\
    base /path/to/anaconda3
    mock-current-env /path/to/anaconda3/envs/mock-current-env
    mock-other-env /path/to/anaconda3/envs/mock-other-env""")
    
    expected_conda_envs_dirs = {
        'base': '/path/to/anaconda3',
        'mock-current-env': '/path/to/anaconda3/envs/mock-current-env',
        'mock-other-env': '/path/to/anaconda3/envs/mock-other-env'
    }
    
    def _mock_check_conda_avail_helper():
        return mock_conda_list_output
    
    def _mock_run_shell_command(command, live_stdout=None):
        if command == "conda info --envs | grep -E '^\w' | sed -E 's/ +\*? +/ /g'":
            return mock_conda_info_output
        else:
            return old_run_shell_command(command, live_stdout=live_stdout)
    
    try:
        davos.core.core._check_conda_avail_helper = _mock_check_conda_avail_helper
        davos.core.core.run_shell_command = _mock_run_shell_command
        
        with raises(davos.core.exceptions.DavosError):
            davos.core.core.check_conda()

    finally:
        davos.config._conda_avail = old_conda_avail
        davos.config._conda_env = old_conda_env
        davos.config._conda_envs_dirs = old_conda_envs_dirs
        davos.core.core._check_conda_avail_helper = old__check_conda_avail_helper
        davos.core.core.run_shell_command = old_run_shell_command

In [12]:
def test_get_previously_imported_pkgs():
    """
    should return names of packages imported during this interpreter 
    session, whether from within the current notebook (davos, requests)
    or elsewhere (IPython, urllib3), but ignore all other names 
    (scikit-learn, selenium). Uses mock 'pip-install'-like output to set 
    up expected results. Tests against real output in 
    'test_regexps.ipynb'.
    """
    mock_stdout = ("Successfully installed davos-0.0.0 IPython-1.1.1 "
              "requests-2.2.2 scikit-learn-3.3.3 selenium-4.4.4 urllib3-5.5.5")
    expected = ['davos', 'IPython', 'requests', 'urllib3']
    result = davos.core.core.get_previously_imported_pkgs(mock_stdout, 'pip')
    assert result == expected, f"Expected:\n'{expected}'\nFound:\n'{result}'"

In [13]:
def test_get_previously_imported_pkgs_conda_raises():
    """passing 'installer-"conda"' should raise 'NotImplementedError'"""
    with raises(NotImplementedError):
        davos.core.core.get_previously_imported_pkgs('', 'conda')

In [14]:
def test_import_name_pkg():
    """test importing a top-level package."""
    try:
        assert not is_imported('scipy')
        scipy = davos.core.core.import_name('scipy')
        assert isinstance(scipy, types.ModuleType), (
            f"type(scipy): {type(scipy)}\nMRO: {type(scipy).mro()}"
        )
        assert not hasattr(scipy, 'stats'), (
            "scipy.stats submodule should not be loaded"
        )
        assert 'scipy' in sys.modules
    finally:
        for mod_name in tuple(sys.modules.keys()):
            if 'scipy' in mod_name:
                sys.modules.pop(mod_name)

In [15]:
def test_import_name_module():
    """test importing a module from a package."""
    try:
        assert not is_imported('scipy')
        stats = davos.core.core.import_name('scipy.stats')
        assert isinstance(stats, types.ModuleType), (
            f"type(stats): {type(stats)}\nMRO: {type(stats).mro()}"
        )
        assert not hasattr(stats, 'tests'), (
            "scipy.stats.tests submodule should not be loaded"
        )
        assert 'scipy' in sys.modules
        assert 'scipy.stats' in sys.modules
    finally:
        for mod_name in tuple(sys.modules.keys()):
            if 'scipy' in mod_name:
                sys.modules.pop(mod_name)

In [15]:
def test_import_name_function():
    """test importing a function by its qualified name"""
    try:
        assert not is_imported('scipy')
        ttest_ind = davos.core.core.import_name('scipy.stats.ttest_ind')
        assert isinstance(ttest_ind, types.FunctionType), (
            f"type(scipy): {type(ttest_ind)}\nMRO: {type(ttest_ind).mro()}"
        )
        assert ttest_ind.__module__ == 'scipy.stats.stats', (
            f"Expected: 'scipy.stats.stats'\nFound: '{ttest_ind.__module__}'"
        )
        assert 'scipy' in sys.modules
        assert 'scipy.stats' in sys.modules
        assert 'scipy.stats.stats' in sys.modules
    finally:
        for mod_name in tuple(sys.modules.keys()):
            if 'scipy' in mod_name:
                sys.modules.pop(mod_name)

In [None]:
run_tests()