From 5d7eccbb15beee702b468f6b5a4b7b65829d00f9 Mon Sep 17 00:00:00 2001 From: Mark Harfouche Date: Mon, 23 Dec 2013 20:34:59 -0800 Subject: [PATCH 1/3] Reorganized the code into two different files. I don't know if this is a good idea. Added support for direct access to the numpy array. Before the array would be a ctype, then cast as a list, then as a numpy array. This was slow. You can do the whole thing in numpy. More consistent parameter return values as tuples and not lists (I think it makes more sense). --- picoscope.py | 493 ++++++++++++++++++++++----------------------------- ps6000.py | 182 +++++++++++++++++++ 2 files changed, 397 insertions(+), 278 deletions(-) create mode 100644 ps6000.py diff --git a/picoscope.py b/picoscope.py index 3ab74d3..7bc4a4b 100644 --- a/picoscope.py +++ b/picoscope.py @@ -1,4 +1,12 @@ # -*- coding: utf-8 +# Edited by Mark Harfouche +# +# My idea is that this file should be a python only file with no use of ctypes +# or knowledge that it is being used +# All ctypes stuff should be in the daughter files +# +# I think that the names of the functions should resemple the picoscope function +# names as much as possible # # Colin O'Flynn, Copyright (C) 2013. All Rights Reserved. # @@ -16,13 +24,8 @@ # which was adapted from http://www.picotech.com/support/topic4926.html -import math -import time import inspect -#Probably bad? -from ctypes import * - import numpy as np class CaseInsensitiveDict(dict): @@ -34,7 +37,7 @@ def __getitem__(self, key): class PSBase(object): #Specify Library - LIBNAME = "ps4000.dll" + LIBNAME = "ps6000" ###You must reimplement this in device specific classes #Maximum Limits @@ -52,22 +55,221 @@ class PSBase(object): {"rangeV":500E-3, "apivalue":5, "rangeStr":"500 mV"}, {"rangeV":1.0, "apivalue":6, "rangeStr":"1 V"}, {"rangeV":2.0, "apivalue":7, "rangeStr":"2 V"}, + {"rangeV":5.0, "apivalue":8, "rangeStr":"5 V"}, ] - ###You must reimplement this in device specific classes + ###You must reimplement this in device specific classes NUM_CHANNELS = 2 + CHANNELS = {"A":0, "B":1} ###You must reimplement this in device specific classes - CHANNEL_COUPLINGS = {"DC":1, "AC":0} #Other option: DC50:2 + CHANNEL_COUPLINGS = {"DC50": 2, "DC":1, "AC":0} - THRESHOLD_TYPE = CaseInsensitiveDict( - {"Above":0, + # For some reason this isn't working with me :S + #THRESHOLD_TYPE = CaseInsensitiveDict( + THRESHOLD_TYPE = {"Above":0, "Below":1, "Rising":2, "Falling":3, - "RiseOrFall":4}) + "RiseOrFall":4} + + ### getUnitInfo parameter types + UNIT_INFO_TYPES = { "DRIVER_VERSION" : 0, + "USB_VERSION" : 1, + "HARDWARE_VERSION" : 2, + "VARIANT_INFO" : 3, + "BATCH_AND_SERIAL" : 4, + "CAL_DATE" : 5, + "KERNEL_VERSION" : 6, + "DIGITAL_HARDWARE_VERSION" : 7, + "ANALOGUE_HARDWARE_VERSION": 8} + + def __init__(self): + # Should be defined in child + self.lib = None + self.handle = None + + self.CHRange = [None]*self.NUM_CHANNELS + self.CHOffset = [None]*self.NUM_CHANNELS + + def checkResult(self, ec): + """Check result of function calls, raise exception if not 0""" + if ec == 0: + return + + else: + ecName = self.errorNumToName(ec) + ecDesc = self.errorNumToDesc(ec) + raise IOError('Error calling %s: %s (%s)'%(inspect.stack()[1][3], ecName, ecDesc)) + + def errorNumToName(self, num): + """Convert error number to name""" + for t in self.ERROR_CODES: + if t[0] == num: + return t[1] + + def errorNumToDesc(self, num): + """Convert error number to description""" + for t in self.ERROR_CODES: + if t[0] == num: + try: + return t[2] + except IndexError: + return "" + + def getUnitInfo(self, info): + """ returns a string containing the requested information """ + if not isinstance(info, int): + info = self.UNIT_INFO_TYPES[info] + return self._lowLevelGetUnitInfo(info) + + def printUnitInfo(self): + """ Prints the unit information in a human readible way """ + print("Driver version " + self.getUnitInfo("DRIVER_VERSION")) + print("USB version " + self.getUnitInfo("USB_VERSION")) + print("Hardware version " + self.getUnitInfo("HARDWARE_VERSION")) + print("Found Picoscope " + self.getUnitInfo("VARIANT_INFO")) + print("Serial number " + self.getUnitInfo("BATCH_AND_SERIAL")) + print("Calibrated on " + self.getUnitInfo("CAL_DATE")) + print("Kernel version " + self.getUnitInfo("KERNEL_VERSION")) + print("Digital version " + self.getUnitInfo("DIGITAL_HARDWARE_VERSION")) + print("Analog version " + self.getUnitInfo("ANALOGUE_HARDWARE_VERSION")) + + + def setChannel(self, chNum=0, coupling="AC", VRange=2.0, VOffset=0.0, enabled=True, BWLimited=False): + """ Sets up a specific channel """ + if enabled: + enabled = 1 + else: + enabled = 0 + + coupling = self.CHANNEL_COUPLINGS[coupling] + + try: + VRangeAPI = (item for item in self.CHANNEL_RANGE if item["rangeV"] == VRange).next() + VRangeAPI = VRangeAPI["apivalue"] + except StopIteration: + rstr = "" + for t in self.CHANNEL_RANGE: rstr += "%f (%s), "%(t["rangeV"], t["rangeStr"]) + raise ValueError("%f is invalid range. Valid ranges: %s"%(VRange, rstr)) + + if BWLimited: + BWLimited = 1 + else: + BWLimited = 0 + + self._lowLevelSetChannel(chNum, enabled, coupling, VRangeAPI, VOffset, BWLimited) + + # if all was successful, save the parameters + self.CHRange[chNum] = VRange + self.CHOffset[chNum] = VOffset + + def runBlock(self, pretrig=0.0): + """Runs a single block, must have already called setSampling for proper setup""" + self._lowLevelRunBlock(int(self.maxSamples*pretrig), int(self.maxSamples*(1-pretrig)),self.timebase, self.oversample, self.segmentIndex) + + def isReady(self): + """Check if scope done""" + return self._lowLevelIsReady() + + def setSamplingInterval(self, sampleInterval, noSamples, oversample=0, segmentIndex=0): + """Returns [actualSampleInterval, maxSamples]""" + self.oversample = oversample + self.segmentIndex = segmentIndex + self.timebase = self.getTimeBaseNum(sampleInterval) + + m = self._lowLevelGetTimebase(self.timebase, noSamples, oversample, segmentIndex) + + self.sampleInterval = m[0] + self.sampleRate = 1.0/m[0] + self.maxSamples = m[1] + return (self.sampleInterval, self.maxSamples) + + def setSamplingFrequency(self, sampleFreq, noSamples, oversample=0, segmentIndex=0): + """Returns [actualSampleFreq, maxSamples]""" + self.setSamplingInterval(1.0/sampleFreq, noSamples, oversample, segmentIndex) + return (self.sampleRate, self.maxSamples) + + def setSampling(self, sampleFreq, noSamples, oversample=0, segmentIndex=0): + """Kept for backwards compatibilit, use setSamplingInterval or + setSamplingFrequency instead""" + return self.setSamplingFrequency(sampleFreq, noSamples, oversample, segmentIndex) + + def setSimpleTrigger(self, trigSrc, threshold_V=0, direction="Rising", delay=0, timeoutms=100, enabled=True): + """Simple trigger setup, only allows level""" + + if enabled: + enabled = 1 + else: + enabled = 0 + + + direction = self.THRESHOLD_TYPE[direction] + if not isinstance(trigSrc, int): + trigSrc = self.CHANNELS[trigSrc] + + if trigSrc >= self.NUM_CHANNELS: + print("We do not support AUX triggering yet...") + return -1 + + a2v = self.CHRange[trigSrc] / self.MAX_VALUE + threshold_adc = int(threshold_V / a2v) + + self._lowLevelSetSimpleTrigger(enabled, trigSrc, threshold_adc, direction, delay, timeoutms) + + def flashLed(self, times=5, start=False, stop=False): + """Flash the front panel LEDs""" + + if start: + times = -1 + + if stop: + times = 0 + + self._lowLevelFlashLed(times) + + def getDataV(self, channel, numSamples=0, startIndex=0, downSampleRatio=1, downSampleMode=0): + (data, numSamplesReturned, overflow) = self.getDataRaw(channel, numSamples, startIndex, downSampleRatio, downSampleMode) + + a2v = self.CHRange[channel] / self.MAX_VALUE + data = data * a2v + self.CHOffset[channel] + return (data, numSamplesReturned, overflow) + + def getDataRaw(self, channel, numSamples=0, startIndex=0, downSampleRatio=1, downSampleMode=0): + + if not isinstance(channel, int): + trigSrc = self.CHANNELS[channel] + + if numSamples == 0: + # maxSamples is probably huge, 1Gig Sample can be HUGE.... + numSamples = min(self.maxSamples, 4096) + + data = np.empty(numSamples, dtype=np.int16) + + self._lowLevelSetDataBuffer(channel, data, downSampleMode) + (numSamplesReturned, overflow) = self._lowLevelGetValues(numSamples, startIndex, downSampleRatio, downSampleMode) + + # No we should not warn about this. + # I'm going to assume the user is advanced and knows to check this himself + # if things are weird with data. + #if overflow != 0: + #print "WARNING: Overflow detected. Should we raise exception?" + + return (data, numSamplesReturned, overflow) + + def open(self, sn=None): + """Open the scope, if sn is None just opens first one found""" + + self._lowLevelOpenUnit(sn) + + def close(self): + """Close the scope""" + self._lowLevelCloseUnit() + + def __del__(self): + self.close() ###Error codes - copied from PS6000 programmers manual. I think they are fairly unviersal though, just ignore ref to 'PS6000'... #To get formatting correct do following copy-replace in Programmers Notepad @@ -75,8 +277,8 @@ class PSBase(object): #2. Copy/replace '\r' with '"],\r' (enable slash expressions when doing this) #3. Copy/replace '^([0-9A-F]{2} ){1}' with '0x\1, "' (enable regex when doing this) #4. Copy/replace '^([0-9A-F]{3} ){1}' with '0x\1, "' (enable regex when doing this) - #5. Copy/repplace '0x' with '[0x' - + #5. Copy/repplace '0x' with '[0x' + ERROR_CODES = [[0x00 , "PICO_OK", "The PicoScope 6000 is functioning correctly."], [0x01 , "PICO_MAX_UNITS_OPENED", "An attempt has been made to open more than PS6000_MAX_UNITS."], [0x02 , "PICO_MEMORY_FAIL", "Not enough memory could be allocated on the host machine."], @@ -188,268 +390,3 @@ class PSBase(object): [0x122 , "PICO_CHANNEL_DISABLED_DUE_TO_USB_POWERED", "USB Power not sufficient to power all channels."], ] - def __init__(self): - """Load DLL etc""" - self.lib = windll.LoadLibrary(self.LIBNAME) - self.handle = None - - self.CHRange = [None]*self.NUM_CHANNELS - self.CHOffset = [None]*self.NUM_CHANNELS - - def checkResult(self, ec): - """Check result of function calls, raise exception if not 0""" - if ec == 0: - return - - else: - ecName = self.errorNumToName(ec) - ecDesc = self.errorNumToDesc(ec) - raise IOError('Error calling %s: %s (%s)'%(inspect.stack()[1][3], ecName, ecDesc)) - - def errorNumToName(self, num): - """Convert error number to name""" - for t in self.ERROR_CODES: - if t[0] == num: - return t[1] - - def errorNumToDesc(self, num): - """Convert error number to description""" - for t in self.ERROR_CODES: - if t[0] == num: - try: - return t[2] - except IndexError: - return "" - - def setChannel(self, chNum=0, coupling="AC", VRange=2.0, VOffset=0.0, enabled=True, BWLimited=False): - """ Sets up a specific channel """ - - #Will save these if successful - VRangeNum = VRange - VOffsetNum = VOffset - - if enabled: - enabled = 1 - else: - enabled = 0 - - coupling = self.CHANNEL_COUPLINGS[coupling] - - try: - VRange = (item for item in self.CHANNEL_RANGE if item["rangeV"] == VRange).next() - VRange = VRange["apivalue"] - except StopIteration: - rstr = "" - for t in self.CHANNEL_RANGE: rstr += "%f (%s), "%(t["rangeV"], t["rangeStr"]) - raise ValueError("%f is invalid range. Valid ranges: %s"%(VRange, rstr)) - - if BWLimited: - BWLimited = 1 - else: - BWLimited = 0 - - VOffset =c_float(VOffset) - - self._lowLevelSetChannel(chNum, enabled, coupling, VRange, VOffset, BWLimited) - - self.CHRange[chNum] = VRangeNum - self.CHOffset[chNum] = VOffsetNum - - def runBlock(self, pretrig=0.0): - """Runs a single block, must have already called setSampling for proper setup""" - self._lowLevelRunBlock(int(self.maxSamples*pretrig), int(self.maxSamples*(1-pretrig)),self.timebase, self.oversample, self.segmentIndex) - - def isReady(self): - """Check if scope done""" - return self._lowLevelIsReady() - - def setSampling(self, sampleFreq, noSamples, oversample=1, segmentIndex=0): - """Returns [actualSampleFreq, actualNoSamples]""" - - self.oversample = oversample - self.segmentIndex = segmentIndex - tb = self.getTimeBaseNum(1.0/sampleFreq) - self.timebase = tb - - m = self._lowLevelGetTimebase(tb, noSamples, oversample, segmentIndex) - self.sampleRate = 1.0/m[0] - self.maxSamples = m[1] - return [1.0/m[0], m[1]] - - def setTriggerSimple(self, trigSrc, threshold=0, direction="Rising", delay=0, timeoutms=100, enabled=True): - """Simple trigger setup, only allows level""" - - if enabled: - enabled = 1 - else: - enabled = 0 - - direction = self.THRESHOLD_TYPE[direction] - self._lowLevelSetSimpleTrigger(enabled, trigSrc, threshold, direction, delay, timeoutms) - - def flashLed(self, times=5, start=False, stop=False): - """Flash the front panel LEDs""" - - if start: - times = -1 - - if stop: - times = 0 - - self._lowLevelFlashLed(times) - - - def getData(self, channel, numSamples=0, startIndex=0, downSampleRatio=1, downSampleMode=0): - if numSamples == 0: - numSamples = self.maxSamples - - dataPointer = ( c_short * (numSamples) )() - - self._lowLevelSetDataBuffer(channel, dataPointer, numSamples, downSampleMode) - [numSamplesReturned, overflow] = self._lowLevelGetValues(numSamples, startIndex, downSampleRatio, downSampleMode) - - - if overflow != 0: - print "WARNING: Overflow detected. Should we raise exception?" - - newData = np.asarray(list(dataPointer)) - a2v = self.CHRange[channel] / self.MAX_VALUE - newData = newData * a2v + self.CHOffset[channel] - return newData - - - - return self._lowLevelGetData(channel, numSamples,startIndex,downSampleRatio,downSampleMode) - - def open(self, sn=None): - """Open the scope, if sn is None just opens first one found""" - handlePointer = c_short() - if sn is None: - serialNullTermStr = c_char_p(None) - else: - serialNullTermStr = c_char_p( bytes(sn,'ascii') ) - - self._lowLevelOpenUnit( handlePointer, serialNullTermStr ) - self.handle = handlePointer - - def close(self): - """Close the scope""" - self._lowLevelCloseUnit() - - def __del__(self): - self.close() - - #### The following are low-level functions which should be reimplemented in all classes - - - - -class PS6000(PSBase): - CHANNEL_COUPLINGS = {"AC":0, "DC":1, "DC50":2} - NUM_CHANNELS = 4 - LIBNAME = "ps6000.dll" - - def _lowLevelSetChannel(self, chNum, enabled, coupling, VRange, VOffset, BWLimited): - m = self.lib.ps6000SetChannel(self.handle, chNum, enabled, coupling, VRange, VOffset, BWLimited) - self.checkResult(m) - - def _lowLevelOpenUnit(self, handle, sn): - m = self.lib.ps6000OpenUnit(byref(handle), sn) - self.checkResult(m) - - def _lowLevelCloseUnit(self): - m = self.lib.ps6000CloseUnit(self.handle) - self.checkResult(m) - - def _lowLevelFlashLed(self, times): - m = self.lib.ps6000FlashLed(self.handle, times) - self.checkResult(m) - - def _lowLevelSetSimpleTrigger(self, enabled, trigsrc, threshold, direction, delay, auto): - m = self.lib.ps6000SetSimpleTrigger(self.handle, enabled, trigsrc, threshold, direction, delay, auto) - self.checkResult(m) - - def _lowLevelRunBlock(self, numPreTrigSamples, numPostTrigSamples, timebase, oversample, segmentIndex): - timeIndisposedMs = c_long() - pParameter = c_void_p() - m = self.lib.ps6000RunBlock( self.handle, numPreTrigSamples, numPostTrigSamples, timebase, oversample, byref(timeIndisposedMs), segmentIndex, None, byref(pParameter) ) - self.checkResult(m) - - def _lowLevelIsReady(self): - ready = c_short() - m = self.lib.ps6000IsReady( self.handle, byref(ready) ) - if ready.value: - isDone = True - else: - isDone = False - self.checkResult(m) - return isDone - - def _lowLevelGetTimebase(self, tb, noSamples, oversample, segmentIndex): - maxSamples = c_long() - sampleRate = c_float() - - m = self.lib.ps6000GetTimebase2(self.handle, tb, noSamples, byref(sampleRate), oversample, byref(maxSamples), segmentIndex) - self.checkResult(m) - return [sampleRate.value/1.0E9, maxSamples.value] - - def getTimeBaseNum(self, sampleTimeS): - """Convert sample time in S to something to pass to API Call""" - maxSampleTime = (((2**32 - 1) - 4) / 156250000) - - if sampleTimeS < 6.4E-9: - if sampleTimeS < 200E-12: - st = 0 - else: - st = math.floor(1/math.log(2, (sampleTimeS * 5E9))) - - else: - #Otherwise in range 2^32-1 - if sampleTimeS > maxSampleTime: - sampleTimeS = maxSampleTime - - st = math.floor((sampleTimeS * 156250000) + 4) - - return int(st) - - def _lowLevelSetDataBuffer(self, channel, dataPointer, numSamples, downSampleMode): - m = self.lib.ps6000SetDataBuffer( self.handle, channel, byref(dataPointer), numSamples, downSampleMode ) - self.checkResult(m) - - def _lowLevelGetValues(self,numSamples,startIndex,downSampleRatio,downSampleMode): - numSamplesReturned = c_long() - numSamplesReturned.value = numSamples - overflow = c_short() - m = self.lib.ps6000GetValues( self.handle, startIndex, byref(numSamplesReturned), downSampleRatio, downSampleMode, self.segmentIndex, byref(overflow) ) - self.checkResult(m) - return [numSamplesReturned.value, overflow.value] - -def examplePS6000(): - print "Attempting to open..." - ps = PS6000() - ps.open() - - #Use to ID unit - #ps.flashLed(10) - #time.sleep(5) - - #Example of 'nice' printing of errors - #ps.setChannel(0, "AC", 1, VOffset=-100) - - #Example of simple capture - res = ps.setSampling(100E6, 1000) - print "Sampling @ %f MHz, %d samples"%(res[0]/1E6, res[1]) - ps.setChannel(0, "AC", 50E-3) - - ps.runBlock() - while(ps.isReady() == False): time.sleep(0.01) - - #TODO: Doesn't work on larger arrays - data = ps.getData(0, 4096) - - print data[0:100] - - ps.close() - -if __name__ == "__main__": - examplePS6000() diff --git a/ps6000.py b/ps6000.py new file mode 100644 index 0000000..adc226b --- /dev/null +++ b/ps6000.py @@ -0,0 +1,182 @@ +#/usr/bin/env python2.7 +# vim: set ts=4 sw=4 tw=0 et : + +import math +import time +import inspect + +# to load the proper dll +import platform + +from ctypes import byref, c_long, c_void_p, c_short, c_float, create_string_buffer, c_ulong, POINTER, cdll + +from picoscope import PSBase + +#### The following are low-level functions which should be reimplemented in all classes +class PS6000(PSBase): + LIBNAME = "ps6000" + + NUM_CHANNELS = 4 + CHANNELS = {"A":0, "B":1, "C":2, "D":3, + "EXTERNAL":4, "MAX_CHANNELS":4, "TRIGGER_AUX":5, "MAX_TRIGGER_SOURCES":6} + + def __init__(self): + super(PS6000, self).__init__() + + if platform.system() == 'Linux': + self.lib = cdll.LoadLibrary("lib" + self.LIBNAME + ".so") + else: + """Load DLL etc""" + # This was windll before, I don't know what the difference is. + # can't test it now + self.lib = cdll.LoadLibrary(self.LIBNAME + ".dll") + + def _lowLevelSetChannel(self, chNum, enabled, coupling, VRange, VOffset, BWLimited): + #VOffset = ctypes.c_float(VOffset) + m = self.lib.ps6000SetChannel(self.handle, chNum, enabled, coupling, VRange, + c_float(VOffset), BWLimited) + self.checkResult(m) + + def _lowLevelOpenUnit(self, sn): + handlePointer = c_short() + if sn is not None: + serialNullTermStr = create_string_buffer(sn) + + # Passing None is the same as passing NULL + m = self.lib.ps6000OpenUnit(byref(handlePointer), sn) + self.checkResult(m) + self.handle = handlePointer + + def _lowLevelCloseUnit(self): + m = self.lib.ps6000CloseUnit(self.handle) + self.checkResult(m) + + def _lowLevelGetUnitInfo(self, info): + s = create_string_buffer(256) + requiredSize = c_short(0); + + m = self.lib.ps6000GetUnitInfo(self.handle, byref(s), len(s), byref(requiredSize), info); + if requiredSize.value > len(s): + s = create_string_buffer(requiredSize.value + 1) + m = self.lib.ps6000GetUnitInfo(self.handle, byref(s), len(s), byref(requiredSize), info); + + self.checkResult(m) + return s.value + + def _lowLevelFlashLed(self, times): + m = self.lib.ps6000FlashLed(self.handle, times) + self.checkResult(m) + + def _lowLevelSetSimpleTrigger(self, enabled, trigsrc, threshold_adc, direction, delay, auto): + m = self.lib.ps6000SetSimpleTrigger(self.handle, enabled, trigsrc, threshold_adc, + direction, delay, auto) + self.checkResult(m) + + def _lowLevelRunBlock(self, numPreTrigSamples, numPostTrigSamples, timebase, oversample, segmentIndex): + timeIndisposedMs = c_long() + pParameter = c_void_p() + m = self.lib.ps6000RunBlock( self.handle, numPreTrigSamples, numPostTrigSamples, + timebase, oversample, byref(timeIndisposedMs), segmentIndex, None, byref(pParameter) ) + self.checkResult(m) + + def _lowLevelIsReady(self): + ready = c_short() + m = self.lib.ps6000IsReady( self.handle, byref(ready) ) + if ready.value: + isDone = True + else: + isDone = False + self.checkResult(m) + return isDone + + def _lowLevelGetTimebase(self, tb, noSamples, oversample, segmentIndex): + """ returns [timeIntervalSeconds, maxSamples] """ + maxSamples = c_long() + sampleRate = c_float() + + m = self.lib.ps6000GetTimebase2(self.handle, tb, noSamples, byref(sampleRate), + oversample, byref(maxSamples), segmentIndex) + self.checkResult(m) + + return (sampleRate.value/1.0E9, maxSamples.value) + + def getTimeBaseNum(self, sampleTimeS): + """Convert sample time in S to something to pass to API Call""" + maxSampleTime = (((2**32 - 1) - 4) / 156250000) + + if sampleTimeS < 6.4E-9: + if sampleTimeS < 200E-12: + st = 0 + else: + st = math.floor(1/math.log(2, (sampleTimeS * 5E9))) + + else: + #Otherwise in range 2^32-1 + if sampleTimeS > maxSampleTime: + sampleTimeS = maxSampleTime + + st = math.floor((sampleTimeS * 156250000) + 4) + + return int(st) + + def _lowLevelSetDataBuffer(self, channel, data, downSampleMode): + """ data should be a numpy array""" + dataPtr = data.ctypes.data_as(POINTER(c_short)) + numSamples = len(data) + m = self.lib.ps6000SetDataBuffer( self.handle, channel, dataPtr, + numSamples, downSampleMode ) + self.checkResult(m) + + def _lowLevelGetValues(self,numSamples,startIndex,downSampleRatio,downSampleMode): + numSamplesReturned = c_long() + numSamplesReturned.value = numSamples + overflow = c_short() + m = self.lib.ps6000GetValues( self.handle, startIndex, byref(numSamplesReturned), + downSampleRatio, downSampleMode, self.segmentIndex, byref(overflow) ) + self.checkResult(m) + return (numSamplesReturned.value, overflow.value) + +def examplePS6000(): + import textwrap + print(textwrap.fill("This demo will use the AWG to generate a gaussian pulse and measure " + + "it on Channel A. To run this demo connect the AWG output of the PS6000 channel A.")) + + print("Attempting to open Picoscope 6000...") + + # see page 13 of the manual to understand how to work this beast + ps = PS6000() + ps.open() + + ps.printUnitInfo() + + #Use to ID unit + ps.flashLed(4) + time.sleep(2) + + ps.setChannel(0, 'DC', 2.0) + + #Example of simple capture + #res = ps.setSampling(100E6, 1000) + #print("Sampling @ %f MHz, maximum samples = %d"%(res[0]/1E6, res[1])) + #print("Sampling delta = %f ns"%(1/res[0]*1E9)) + res = ps.setSamplingInterval(10E-9, 1000) + print("Sampling @ %f MHz, maximum samples = %d"%(1/res[0]/1E6, res[1])) + print("Sampling delta = %f ns"%(res[0]*1E9)) + + ps.setSimpleTrigger('A', 1.0, 'Rising') + + ps.runBlock() + while(ps.isReady() == False): time.sleep(0.01) + + #TODO: Doesn't work on larger arrays + (data, numSamplesReturned, overflow) = ps.getDataV(0, 4096) + + print("We read %d samples from the requested %d"%(numSamplesReturned, 4096)) + print("Overflow value is %d"%(overflow)) + print("The first 100 datapoints are ") + print(data[0:100]) + + ps.close() + +if __name__ == "__main__": + examplePS6000() From 0d089533c859818ebcea7aa67440dbda5419b791 Mon Sep 17 00:00:00 2001 From: Mark Harfouche Date: Mon, 23 Dec 2013 22:20:23 -0800 Subject: [PATCH 2/3] Modified how the setSamplingInterval function works. As well as the getTimebase function. The getTimebase now returns two things, the raw timebase number required by the picoscope, and a human readible time interval. This used by the setSampling interval function to compute the total number of samples needed. It returns this to the user. Gah, the setSigGenBuiltIn function is messed up, and doesn't really work. I will have to followup on the bug report I posted earlier. --- picoscope.py | 63 ++++++++++++++++++++++++++++++++++++++----- ps6000.py | 75 ++++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 117 insertions(+), 21 deletions(-) diff --git a/picoscope.py b/picoscope.py index 7bc4a4b..ca1a782 100644 --- a/picoscope.py +++ b/picoscope.py @@ -25,6 +25,7 @@ import inspect +import time import numpy as np @@ -64,6 +65,15 @@ class PSBase(object): ###You must reimplement this in device specific classes CHANNEL_COUPLINGS = {"DC50": 2, "DC":1, "AC":0} + has_sig_gen = False + + + # If we don't get this CaseInsentiveDict working, I would prefer to stick + # with their spelling of archaic C all caps for this. I know it is silly, + # but it removes confusion for certain things like + # DC_VOLTAGE = DCVoltage or DcVoltage or DC_Voltage + # or even better + # SOFT_TRIG = SoftwareTrigger vs SoftTrig # For some reason this isn't working with me :S #THRESHOLD_TYPE = CaseInsensitiveDict( @@ -136,15 +146,23 @@ def printUnitInfo(self): print("Analog version " + self.getUnitInfo("ANALOGUE_HARDWARE_VERSION")) - def setChannel(self, chNum=0, coupling="AC", VRange=2.0, VOffset=0.0, enabled=True, BWLimited=False): + def setChannel(self, channel='A', coupling="AC", VRange=2.0, VOffset=0.0, enabled=True, BWLimited=False): """ Sets up a specific channel """ if enabled: enabled = 1 else: enabled = 0 + if not isinstance(channel, int): + chNum = self.CHANNELS[channel] + else: + chNum = channel + coupling = self.CHANNEL_COUPLINGS[coupling] + # I don't know if I like the fact that you are comparing floating points + # I think this should just be a string, because then we know the user is + # in charge of properly formatting it try: VRangeAPI = (item for item in self.CHANNEL_RANGE if item["rangeV"] == VRange).next() VRangeAPI = VRangeAPI["apivalue"] @@ -166,28 +184,43 @@ def setChannel(self, chNum=0, coupling="AC", VRange=2.0, VOffset=0.0, enabled=Tr def runBlock(self, pretrig=0.0): """Runs a single block, must have already called setSampling for proper setup""" - self._lowLevelRunBlock(int(self.maxSamples*pretrig), int(self.maxSamples*(1-pretrig)),self.timebase, self.oversample, self.segmentIndex) + + # getting max samples is riddiculous. 1GS buffer means it will take so long + nSamples = min(self.noSamples, self.maxSamples) + + self._lowLevelRunBlock(int(nSamples*pretrig), int(nSamples*(1-pretrig)), self.timebase, + self.oversample, self.segmentIndex) def isReady(self): """Check if scope done""" return self._lowLevelIsReady() - def setSamplingInterval(self, sampleInterval, noSamples, oversample=0, segmentIndex=0): - """Returns [actualSampleInterval, maxSamples]""" + def waitReady(self): + while(self.isReady() == False): time.sleep(0.01) + + + def setSamplingInterval(self, sampleInterval, duration, oversample=0, segmentIndex=0): + """Returns [actualSampleInterval, noSamples, maxSamples]""" self.oversample = oversample self.segmentIndex = segmentIndex - self.timebase = self.getTimeBaseNum(sampleInterval) + (self.timebase, timebase_dt) = self.getTimeBaseNum(sampleInterval) + print timebase_dt + + noSamples = int(round(duration / timebase_dt)) m = self._lowLevelGetTimebase(self.timebase, noSamples, oversample, segmentIndex) self.sampleInterval = m[0] self.sampleRate = 1.0/m[0] self.maxSamples = m[1] - return (self.sampleInterval, self.maxSamples) + self.noSamples = noSamples + return (self.sampleInterval, self.noSamples, self.maxSamples) def setSamplingFrequency(self, sampleFreq, noSamples, oversample=0, segmentIndex=0): """Returns [actualSampleFreq, maxSamples]""" - self.setSamplingInterval(1.0/sampleFreq, noSamples, oversample, segmentIndex) + sampleInterval = 1.0 * sampleFreq + duration = noSamples * sampleInterval + self.setSamplingInterval(sampleInterval, duration, oversample, segmentIndex) return (self.sampleRate, self.maxSamples) def setSampling(self, sampleFreq, noSamples, oversample=0, segmentIndex=0): @@ -219,6 +252,7 @@ def setSimpleTrigger(self, trigSrc, threshold_V=0, direction="Rising", delay=0, self._lowLevelSetSimpleTrigger(enabled, trigSrc, threshold_adc, direction, delay, timeoutms) + def flashLed(self, times=5, start=False, stop=False): """Flash the front panel LEDs""" @@ -230,6 +264,9 @@ def flashLed(self, times=5, start=False, stop=False): self._lowLevelFlashLed(times) + def getData(self, channel, numSamples=0, startIndex=0, downSampleRatio=1, downSampleMode=0): + return self.getDataV(channel, numSamples, startIndex, downSampleRatio, downSampleMode) + def getDataV(self, channel, numSamples=0, startIndex=0, downSampleRatio=1, downSampleMode=0): (data, numSamplesReturned, overflow) = self.getDataRaw(channel, numSamples, startIndex, downSampleRatio, downSampleMode) @@ -259,6 +296,16 @@ def getDataRaw(self, channel, numSamples=0, startIndex=0, downSampleRatio=1, dow return (data, numSamplesReturned, overflow) + def setSigGenSimple(self, offsetVoltage=0, pkToPk=2, waveType=None, frequency=1E6, + shots=1, triggerType=None, triggerSource=None): + """ I don't expose all the options from setSigGenBuiltIn so I'm not + calling it that. Their signal generator can actually do quite fancy + things that I don't care for""" + if self.has_sig_gen == False: + raise NotImplementedError() + self._lowLevelSetSigGenSimple(offsetVoltage, pkToPk, waveType, frequency, + shots, triggerType, triggerSource) + def open(self, sn=None): """Open the scope, if sn is None just opens first one found""" @@ -267,6 +314,8 @@ def open(self, sn=None): def close(self): """Close the scope""" self._lowLevelCloseUnit() + def stop(self): + self._lowLevelStop() def __del__(self): self.close() diff --git a/ps6000.py b/ps6000.py index adc226b..9e3a59f 100644 --- a/ps6000.py +++ b/ps6000.py @@ -18,7 +18,15 @@ class PS6000(PSBase): NUM_CHANNELS = 4 CHANNELS = {"A":0, "B":1, "C":2, "D":3, - "EXTERNAL":4, "MAX_CHANNELS":4, "TRIGGER_AUX":5, "MAX_TRIGGER_SOURCES":6} + "External":4, "MaxChannels":4, "TriggerAux":5} + + + has_sig_gen = True + WAVE_TYPES = {"Sine":0, "Square":1, "Triangle":2, "RampUp":3, "RampDown":4, + "Sinc":5, "Gaussian":6, "HalfSine":7, "DCVoltage": 8, "WhiteNoise": 9} + + SIGGEN_TRIGGER_TYPES = {"Rising":0, "Falling":1, "GateHigh":2, "GateLow":3} + SIGGEN_TRIGGER_SOURCES = {"None":0, "ScopeTrig":1, "AuxIn":2, "ExtIn":3, "SoftTrig":4, "TriggerRaw":5} def __init__(self): super(PS6000, self).__init__() @@ -51,6 +59,10 @@ def _lowLevelCloseUnit(self): m = self.lib.ps6000CloseUnit(self.handle) self.checkResult(m) + def _lowLevelStop(self): + m = self.lib.ps6000Stop(self.handle) + self.checkResult(m) + def _lowLevelGetUnitInfo(self, info): s = create_string_buffer(256) requiredSize = c_short(0); @@ -117,7 +129,14 @@ def getTimeBaseNum(self, sampleTimeS): st = math.floor((sampleTimeS * 156250000) + 4) - return int(st) + st = int(st) + + if st < 5: + dt = float(2.**st)/5E9 + else: + dt = float(st - 4.) / 156250000. + + return (st, dt) def _lowLevelSetDataBuffer(self, channel, data, downSampleMode): """ data should be a numpy array""" @@ -136,6 +155,27 @@ def _lowLevelGetValues(self,numSamples,startIndex,downSampleRatio,downSampleMode self.checkResult(m) return (numSamplesReturned.value, overflow.value) + def _lowLevelSetSigGenSimple(self, offsetVoltage, pkToPk, waveType, frequency, shots, + triggerType, triggerSource): + if waveType is None: + waveType = self.WAVE_TYPES["Sine"] + if triggerType is None: + triggerType = self.SIGGEN_TRIGGER_TYPES["Rising"] + if triggerSource is None: + triggerSource = self.SIGGEN_TRIGGER_SOURCES["None"] + + if not isinstance(waveType, int): + waveType = self.WAVE_TYPES[waveType] + if not isinstance(triggerType, int): + triggerType = self.SIGGEN_TRIGGER_TYPES[triggerType] + if not isinstance(triggerSource, int): + triggerSource = self.SIGGEN_TRIGGER_SOURCES[triggerSource] + + m = self.lib.ps6000SetSigGenBuiltIn(self.handle, int(offsetVoltage * 1E6), + int(pkToPk * 1E6), waveType, c_float(frequency), c_float(frequency), + 0, 0, 0, 0, shots, 0, triggerType, triggerSource, 0) + self.checkResult(m) + def examplePS6000(): import textwrap print(textwrap.fill("This demo will use the AWG to generate a gaussian pulse and measure " + @@ -155,23 +195,30 @@ def examplePS6000(): ps.setChannel(0, 'DC', 2.0) - #Example of simple capture - #res = ps.setSampling(100E6, 1000) - #print("Sampling @ %f MHz, maximum samples = %d"%(res[0]/1E6, res[1])) - #print("Sampling delta = %f ns"%(1/res[0]*1E9)) - res = ps.setSamplingInterval(10E-9, 1000) - print("Sampling @ %f MHz, maximum samples = %d"%(1/res[0]/1E6, res[1])) - print("Sampling delta = %f ns"%(res[0]*1E9)) + (interval, nSamples, maxSamples) = ps.setSamplingInterval(10E-9, 1E-3) + print("Sampling interval = %f ns"%(interval*1E9)); + print("Taking samples = %d"%nSamples) + print("Maximum samples = %d"%maxSamples) - ps.setSimpleTrigger('A', 1.0, 'Rising') + ps.setChannel('A', 'DC', 50E-3, 0.0, True, False) + ps.setSimpleTrigger('A', 0.0, 'Rising') + + + # Technically, this should generate a a 2.2 V peak to peak waveform, but there is a bug + # with the picoscope, causing it to only generate a useless waveform.... + ps.setSigGenSimple(0, 2.2, "Square", 10E6) ps.runBlock() - while(ps.isReady() == False): time.sleep(0.01) + #while(ps.isReady() == False): time.sleep(0.01) + ps.waitReady() + + # returns a numpy array + (data, numSamplesReturned, overflow) = ps.getDataV(0, nSamples) - #TODO: Doesn't work on larger arrays - (data, numSamplesReturned, overflow) = ps.getDataV(0, 4096) + # call this when you are done taking data + ps.stop() - print("We read %d samples from the requested %d"%(numSamplesReturned, 4096)) + print("We read %d samples from the requested %d"%(numSamplesReturned, nSamples)) print("Overflow value is %d"%(overflow)) print("The first 100 datapoints are ") print(data[0:100]) From 9beebbc24ad4d74ed193141ba3d9bf40d9e75bd1 Mon Sep 17 00:00:00 2001 From: Mark Harfouche Date: Mon, 23 Dec 2013 23:37:46 -0800 Subject: [PATCH 3/3] I have no clue why it is working now :S Also fixed some other bugs --- picoscope.py | 4 ++-- ps6000.py | 5 +---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/picoscope.py b/picoscope.py index ca1a782..6f76b75 100644 --- a/picoscope.py +++ b/picoscope.py @@ -270,14 +270,14 @@ def getData(self, channel, numSamples=0, startIndex=0, downSampleRatio=1, downSa def getDataV(self, channel, numSamples=0, startIndex=0, downSampleRatio=1, downSampleMode=0): (data, numSamplesReturned, overflow) = self.getDataRaw(channel, numSamples, startIndex, downSampleRatio, downSampleMode) - a2v = self.CHRange[channel] / self.MAX_VALUE + a2v = self.CHRange[channel] / float(self.MAX_VALUE) data = data * a2v + self.CHOffset[channel] return (data, numSamplesReturned, overflow) def getDataRaw(self, channel, numSamples=0, startIndex=0, downSampleRatio=1, downSampleMode=0): if not isinstance(channel, int): - trigSrc = self.CHANNELS[channel] + channel = self.CHANNELS[channel] if numSamples == 0: # maxSamples is probably huge, 1Gig Sample can be HUGE.... diff --git a/ps6000.py b/ps6000.py index 9e3a59f..0d9f6f5 100644 --- a/ps6000.py +++ b/ps6000.py @@ -86,9 +86,8 @@ def _lowLevelSetSimpleTrigger(self, enabled, trigsrc, threshold_adc, direction, def _lowLevelRunBlock(self, numPreTrigSamples, numPostTrigSamples, timebase, oversample, segmentIndex): timeIndisposedMs = c_long() - pParameter = c_void_p() m = self.lib.ps6000RunBlock( self.handle, numPreTrigSamples, numPostTrigSamples, - timebase, oversample, byref(timeIndisposedMs), segmentIndex, None, byref(pParameter) ) + timebase, oversample, byref(timeIndisposedMs), segmentIndex, None, None) self.checkResult(m) def _lowLevelIsReady(self): @@ -193,8 +192,6 @@ def examplePS6000(): ps.flashLed(4) time.sleep(2) - ps.setChannel(0, 'DC', 2.0) - (interval, nSamples, maxSamples) = ps.setSamplingInterval(10E-9, 1E-3) print("Sampling interval = %f ns"%(interval*1E9)); print("Taking samples = %d"%nSamples)