In [1]:
import asyncio
import concurrent.futures
import logging
import sys
import time

import numpy as np

import lsst.sal as sal

class Butler():
    """A totally fake butler"""
    
    def __init__(self):
        pass
    
    def get(self, dataType, dataId):
        if dataType == "raw":
            return np.zeros((100, 200)) + dataId["ccd"]
        else:
            raise RuntimeError("Unknown dataType: %s" % dataType)
            
butler = Butler()

# Configure logging to show the name of the thread where the log message originates.
logging.basicConfig(
    level=logging.INFO,
    format='%(threadName)10s %(name)18s: %(message)s',
    stream=sys.stdout,
)

In [2]:
def turnOnLaser():
    """Ask SAL to turn on the laser
    
    Can block;  could rewrite to use asyncio (cf. waitForCompletion_cameraIntegrate)
    """
    myData = sal.camera_command_turnOnLaserC()
    cmdId = cameraMgr.issueCommand_turnOnLaser(myData)
        
    return cameraMgr.waitForCompletion_turnOnLaser(cmdId, timeout=10)    

async def turnOffLaser():
    """Turn off the laser"""

    await asyncio.sleep(0.1)

    print("%s Laser is off" % (time.asctime(),), file=sys.stderr)
    sys.stderr.flush()

def setFilter(filterName):
    """Ask SAL to set the filter
    
    Can block;  could rewrite to use asyncio (cf. waitForCompletion_cameraIntegrate)
    """
    myData = sal.camera_command_setFilterC(filterName)
    cmdId = cameraMgr.issueCommand_setFilter(myData)
    
    return cameraMgr.waitForCompletion_setFilter(cmdId, timeout=10)

async def setLaserWavelength(wavelength):
    """Set the laser wavelength"""

    print("%s Setting wavelength to %g" % (time.asctime(), wavelength), file=sys.stderr)
    sys.stderr.flush()
    
    await asyncio.sleep(2.0)

    if wavelength > 1000:
        raise RuntimeError("%g is too large (> 1000nm)" % wavelength)

    print("%s Set laser to %g" % (time.asctime(), wavelength), file=sys.stderr)
    sys.stderr.flush()

    
async def checkDiodeCurrent(sem, dt=0.5):
    """Check the diode current every dt seconds, until sem is released
    
    The set of currents is returned in checkDiodeCurrent.currents
    
    N.b. neither thread safe nor reentrant
    """
    checkDiodeCurrent.currents = []
    while True:
        if not sem.locked():
            break
            
        checkDiodeCurrent.currents.append(time.clock()%10)

        await asyncio.sleep(dt)

async def cameraIntegrate(sem, expTime, pollTime=0.1):
    """Integrate for the specified exposure time
    
    sem is a semaphore used to indicate that the exposure is finished    
    """
    myData = sal.camera_command_cameraIntegrateC(expTime)
    cmdId = cameraMgr.issueCommand_cameraIntegrate(myData)
    
    ret = await cameraMgr.waitForCompletion_cameraIntegrate(cmdId)
    sem.release()
    
    return ret

In [3]:
async def run_blocking(executor, loop, func, *args):
    """Run a blocking task using executor"""
    return await loop.run_in_executor(executor, func, *args)

def runTasks(executor, actions, timeout=None, throwExceptions=True):
    """Run a set of actions defining tasks which may or may not block
    
    Return all completed tasks
    
    executor: Executor to handle blocking tasks (i.e. those that don't use asyncio)
    actions:  List of actions to perform, each element is (func, *args)
    
    For each action, if it's a coroutine simply add it to the list of tasks.
    If it isn't, assume that it can block and execute it using the executor
    
    If timeout isn't None and is exceeded, pending tasks are cancelled
    """
    loop = asyncio.get_event_loop()
    
    tasks = []
    for a in actions:
        try:
            func, args = a[0], a[1:]
        except TypeError:
            func, args = a, []

        if asyncio.iscoroutinefunction(func):
            tasks.append(func(*args))
        else:
            tasks.append(run_blocking(executor, loop, func, *args))

    if not tasks:
        return set()
    
    done, pending = loop.run_until_complete(asyncio.wait(tasks, timeout=timeout))

    for task in pending:
        print("Task %s is still pending; cancelling" % task._coro)  # _coro is Internal detail, but only debugging
        task.cancel()
    
    if throwExceptions:
        for task in done:
            task.result()

    return done

In [5]:
print("Initialising SAL\n", file=sys.stderr)
cameraMgr = sal.SAL_camera()

cameraMgr.salCommand("setFilter")    # Enable the camera commands
cameraMgr.salCommand("turnOnLaser")
cameraMgr.salCommand("cameraIntegrate")

cameraMgr.salEvent("exposureId")
#
# Turn on lamps
#
wavelength = 656 if True else 1260

print("%s Step 0" % (time.asctime()), file=sys.stderr)
with concurrent.futures.ThreadPoolExecutor() as executor:
    done = runTasks(executor,
                    [
                        (setFilter, 'g'),
                         turnOnLaser,
                        (setLaserWavelength, wavelength),
                    ])
#
# Take an exposure
#
cameraSem = asyncio.Semaphore(0)
exposureIdEv = sal.camera_logevent_exposureIdC()

print("\n%s Step 1" % (time.asctime()), file=sys.stderr)
with concurrent.futures.ThreadPoolExecutor() as executor:
    runTasks(executor,
             [
                 (checkDiodeCurrent, cameraSem, 0.5),
                 (cameraIntegrate, cameraSem, 2),
             ])
print("Diode currents:", checkDiodeCurrent.currents, file=sys.stderr)
#
# Retrieve data
#
print("\n%s Step 2" % (time.asctime()), file=sys.stderr)
if cameraMgr.getEvent_SummaryState(exposureIdEv) != 0:
    raise RuntimeError("Visit number is not available")

dataId = dict(visit=exposureIdEv.visit, ccd=1)
exp = butler.get("raw", dataId)

print("Mean of exposure is %.1f" % exp.mean(), file=sys.stderr)
#
# Clean up
#
print("\n%s Step 3" % (time.asctime()), file=sys.stderr)
with concurrent.futures.ThreadPoolExecutor() as executor:
    runTasks(executor,
             [
                  turnOffLaser,
             ])

cameraMgr.salShutdown()
print("\nSAL is shutdown", file=sys.stderr)

Initialising SAL

Mon Apr 23 16:16:10 2018 Step 0
Mon Apr 23 16:16:10 2018 Setting wavelength to 656
Mon Apr 23 16:16:10 2018 Setting g
Mon Apr 23 16:16:10 2018 turning laser on
Mon Apr 23 16:16:12 2018 Set filter to g
Mon Apr 23 16:16:12 2018 Set laser to 656
Mon Apr 23 16:16:12 2018 laser is warm

Mon Apr 23 16:16:12 2018 Step 1
Mon Apr 23 16:16:12 2018 opening shutter
Mon Apr 23 16:16:14 2018 Shutter closed
Diode currents: [3.020164, 3.027743, 3.029497, 3.031387, 3.033149]

Mon Apr 23 16:16:15 2018 Step 2
Mean of exposure is 1.0

Mon Apr 23 16:16:15 2018 Step 3
Mon Apr 23 16:16:15 2018 Laser is off

SAL is shutdown
