In [1]:
import asyncio, subprocess, os

def execute_test(command, timeout):
       
    ssh_env = os.environ.copy()
    
    ssh_env['X509_USER_CERT'] = 'cert'
    ssh_env['X509_USER_KEY'] = 'key'
    
    env_value_modified = dict(ssh_env)
    
    proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                     shell=True, env=ssh_env)
    
    try:
        stdout, stderr = proc.communicate(timeout=timeout)
    except subprocess.TimeoutExpired:
        proc.kill()
        stdout, stderr = proc.communicate()
    
    returncode = proc.returncode
    
    env_value_blank = dict(os.environ)
    
    return (proc,(stdout, stderr, returncode), env_value_blank, env_value_modified)

async def async_execute_test(command, timeout):
    
    # Have to change the environment manually before running the process.
    # asyncio.subprocess.Process does not have an `env` attribute, the way subprocess.Popen does.
    backup_env = os.environ.copy()
    # Want a copy by value, not reference
    ssh_env = backup_env.copy()
    
    ssh_env['X509_USER_CERT'] = 'cert'
    ssh_env['X509_USER_KEY'] = 'key'
    
    os.environ.update(ssh_env)
    
    env_value_modified = dict(os.environ)
    
    try:
        proc = await asyncio.create_subprocess_shell(command, 
                                                    stdout=asyncio.subprocess.PIPE, 
                                                    stderr=asyncio.subprocess.PIPE)
        try:
            stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
        except asyncio.TimeoutError:
            proc.kill()
            stdout, stderr = await proc.communicate()
        returncode = proc.returncode
    
    # Even if the above fails, we still want our environment variables reset and cleared of sensitive information.
    finally:
        os.environ.clear()
        os.environ.update(backup_env)
        
    env_value_blank = dict(os.environ)
    
    return (proc, (stdout, stderr, returncode), env_value_blank, env_value_modified)

In [2]:
def execute_dummy(command):
    
    ssh_env = os.environ.copy()
    
    ssh_env['X509_USER_CERT'] = 'cert'
    ssh_env['X509_USER_KEY'] = 'key'
    
    proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                     shell=True, env=ssh_env)
    
    try:
        stdout, stderr = proc.communicate(timeout=10)
    except subprocess.TimeoutExpired:
        proc.kill()
        stdout, stderr = proc.communicate()
    
    returncode = proc.returncode
    
    return (stdout, stderr, returncode)

async def async_execute_dummy(command):

    # Have to change the environment manually before running the process.
    # asyncio.subprocess.Process does not have an `env` attribute, the way subprocess.Popen does.
    backup_env = os.environ.copy()
    # Want a copy by value, not reference
    ssh_env = backup_env.copy()
    
    ssh_env['X509_USER_CERT'] = 'cert'
    ssh_env['X509_USER_KEY'] = 'key'
    
    os.environ.update(ssh_env)
    
    try:
        proc = await asyncio.create_subprocess_shell(command, 
                                                    stdout=asyncio.subprocess.PIPE, 
                                                    stderr=asyncio.subprocess.PIPE)
        try:
            stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
        except asyncio.TimeoutError:
            proc.kill()
            stdout, stderr = await proc.communicate()
        returncode = proc.returncode
    
    # Even if the above fails, we still want our environment variables reset and cleared of sensitive information.
    finally:
        os.environ.clear()
        os.environ.update(backup_env)
    
    return (stdout, stderr, returncode)

In [3]:
loop = asyncio.get_event_loop()

try:
    stdout, stderr, returncode =                   execute_dummy(      command='/bin/sh echo "test"')
    
    test_results_no_timeout =                      execute_test(       command='/bin/sh echo "test"', 
                                                                       timeout=10)
    
    test_results_timeout =                         execute_test(       command='/bin/sh echo "test"', 
                                                                       timeout=0.0000000000000000000000000001)
    
    async_stdout, async_stderr, async_returncode = loop.run_until_complete(
                                                   async_execute_dummy(command='/bin/sh echo "test"'))
    
    async_test_results_no_timeout =                loop.run_until_complete(
                                                   async_execute_test( command='/bin/sh echo "test"', 
                                                                       timeout=10))
    
    async_test_results_timeout =                   loop.run_until_complete(
                                                   async_execute_test( command='/bin/sh echo "test"', 
                                                                       timeout=0.00000000000000000000000000001))
        
finally:
    loop.close()

In [4]:
##### All basic smoke tests seem to pass

### Test that the asynchronous and synchronous versions give the same final result without any timeout

# stdout is the same
assert     stdout==async_stdout
# stderr is the same
assert     stderr==async_stderr
# returncode is the same
assert returncode==async_returncode

# initial environment variables are the same
assert test_results_no_timeout[2] == async_test_results_no_timeout[2]
# modified environment variables are the same
assert test_results_no_timeout[3] == async_test_results_no_timeout[3]

### Test that the asynchronous and synchronous versions give the same final result with a timeout
# stdout, stderr, and returncode are all the same
assert test_results_timeout[1] == async_test_results_timeout[1]

# initial environment variables are the same
assert test_results_timeout[2] == async_test_results_timeout[2]
# modified environment variables are the same
assert test_results_timeout[3] == async_test_results_timeout[3]

### Test that the test functions give the same final results as the original dummies

# Test that the definition of the synchronous test function is correct
assert (stdout, stderr, returncode) == test_results_no_timeout[1]

# Test that the definition of the asynchronous test function is correct
assert (async_stdout, async_stderr, async_returncode) == async_test_results_no_timeout[1]

### Test that the correct object type for the process is being used

# Test that the synchronous function uses the correct object type for the process
sync_process_no_timeout = test_results_no_timeout[0]
sync_process_timeout    = test_results_timeout[0]

assert type(sync_process_no_timeout) == subprocess.Popen
assert type(   sync_process_timeout) == subprocess.Popen

# Test that the asynchronous function uses the correct object type for the process
async_process_no_timeout = async_test_results_no_timeout[0]
async_process_timeout    = async_test_results_timeout[0]

assert type(async_process_no_timeout) == asyncio.subprocess.Process
assert type(   async_process_timeout) == asyncio.subprocess.Process

### Test that the processes have the basic expected attributes

# Test that the synchronous processes have the basic expected attributes
assert sync_process_no_timeout.stdin is None
assert    sync_process_timeout.stdin is None

assert isinstance(sync_process_no_timeout.stdout, subprocess.io.BufferedReader)
assert isinstance(   sync_process_timeout.stdout, subprocess.io.BufferedReader)

assert isinstance(sync_process_no_timeout.stderr, subprocess.io.BufferedReader)
assert isinstance(   sync_process_timeout.stderr, subprocess.io.BufferedReader)

assert type(sync_process_no_timeout.pid) is int
assert type(sync_process_timeout.pid) is int

assert type(sync_process_no_timeout.returncode) is int
assert type(   sync_process_timeout.returncode) is int

# Test that the asynchronous processes have the basic expected attributes
assert async_process_no_timeout.stdin is None
assert    async_process_timeout.stdin is None

assert isinstance(async_process_no_timeout.stdout, asyncio.streams.StreamReader)
assert isinstance(   async_process_timeout.stdout, asyncio.streams.StreamReader)

assert isinstance(async_process_no_timeout.stderr, asyncio.streams.StreamReader)
assert isinstance(   async_process_timeout.stderr, asyncio.streams.StreamReader)

assert type(async_process_no_timeout.pid) is int
assert type(async_process_timeout.pid) is int

assert type(async_process_no_timeout.returncode) is int
assert type(   async_process_timeout.returncode) is int



In [5]:
# Basic boundary tests seem to pass

# Test that the synchronous function fails in the expected way
assert test_results_timeout[1][0] == test_results_timeout[1][1] == b''
assert test_results_timeout[1][2] == -9

# Test that the asynchronous function fails in the expected way
assert async_test_results_timeout[1][0] == async_test_results_timeout[1][1] == b''
assert async_test_results_timeout[1][2] == -9