# <font color='gray'> Smoke View for gen3_sdk</font>



<font color='gray'>Import the gen3_sdk library from the path and import all the dependancies that are going to be used in the project like serial library for connection, matplotlib for plotting </font>

In [None]:
import sys,os,serial,threading, time,struct, glob
if sys.platform == "linux" or sys.platform == "linux2":
    sys.path.append('./../../../bin/linux/python/')
elif sys.platform == "darwin":
    sys.path.append('./../../../bin/macOS/python/')
elif sys.platform == "win32":
    if ((struct.calcsize("P") * 8) == 64):
        sys.path.append('./../../../bin/windows/x64/python/')  # 32 bit python
    else:
        sys.path.append('./../../../bin/windows/Win32/python/')  # 64 bit python
elif sys.platform == "win64":
    sys.path.append('./../../../bin/windows/x64/python/')
import gen3_sdk
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'svg'   # For clarity of graph image
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets


In [None]:
# Status for the command streams
dict = {0: 'M2M2_APP_COMMON_STATUS_OK',
        1: 'M2M2_APP_COMMON_STATUS_ERROR',
        2: 'M2M2_APP_COMMON_STATUS_STREAM_STARTED',
        3: 'M2M2_APP_COMMON_STATUS_STREAM_STOPPED',
        4: 'M2M2_APP_COMMON_STATUS_STREAM_IN_PROGRESS',
        5: 'M2M2_APP_COMMON_STATUS_STREAM_DEACTIVATED',
        6: 'M2M2_APP_COMMON_STATUS_STREAM_COUNT_DECREMENT',
        7: 'M2M2_APP_COMMON_STATUS_STREAM_NOT_STARTED',
        8: 'M2M2_APP_COMMON_STATUS_STREAM_NOT_STOPPED',
        9: 'M2M2_APP_COMMON_STATUS_SUBSCRIBER_ADDED',
       10: 'ADI_SDK_PACKET_TIMED_OUT'}


#  <font color='brown'>TX Callback </font>

 The SDK requires the user provide functions for transmitting to and receiving data from the hardware.

Transmission is done via this callback function – which is called by the SDK when data is to be transmitted to the watch.

SDK will form the m2m2 packets and will call this API to transmit the data over the physical layer 


In [None]:
class serial_tx_cb(gen3_sdk.watch_phy_callback):
    def __init__(self, serial_object):
        gen3_sdk.watch_phy_callback.__init__(self)
        self.serial = serial_object

    def call(self, data):
        self.serial.write(data)
        
    def sys_alert_call(self,alert):
        print("SDK ALERT : {}".format(dict[alert]))

#  <font color='brown'> RX Callback </font>

For reception, a thread is generally used to receive data from the physical layer.

When the m2m2 packets is fully received from the physical layer, it is dispatched to the SDK.

In [None]:
class serial_rx_cb():
    def __init__(self, serial_obj):
        self.serial = serial_obj
        self.thread_run = True
        self.thread_rx = True
        self.rx_thread = threading.Thread(target=self.rx_fn)
        self.rx_thread.setDaemon(True)
        self.rx_thread.start()

    def  close(self):
        self.thread_run = False
        self.thread_rx = False

    def rx_fn(self):
        while (self.thread_rx):
            try:
                hdr = ser.read(8)
                if (hdr) :
                    length = (hdr[4] << 8) + (hdr[5])
                    #print(length)
                    body = ser.read((length ) - 8)
                    pkt = hdr + body
                    active_watch.dispatch(pkt)
            except serial.SerialException as e:
                print(e)

    def thread_start(self):
        self.serial_rx_thread.start()

#  <font color='brown'> Serial Connection</font>

User need to connect the VSM watch with our system for communication

So here we list down the number of serial ports that are available for connection.

Enter the correct serial port of the watch for communication.


In [None]:
# Find a list of available serial ports
ser = serial.Serial()
result = []
if sys.platform.startswith('win'):
    ports = ['COM%s' % (i + 1) for i in range(256)]
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
    # this excludes your current terminal "/dev/tty"
    ports = glob.glob('/dev/tty[A-Za-z]*')
elif sys.platform.startswith('darwin'):
    ports = glob.glob('/dev/tty.*')
else:
    raise EnvironmentError('Unsupported platform')
for port in ports:
    try:
        s = serial.Serial(port)
        s.close()
        result.append(port)
    except (OSError, serial.SerialException):
        pass

print("Available serial ports are:")
for p in result:
    print("==> {}".format(p))
    
def connect(arg):
    try:
        if(ser.isOpen()== True):
            print("Port Already Connected,Please disconnect and try again")

        print('Connecting to Motherboard...')
        ser.baudrate =921600
        ser.port = arg
        ser.open()
        print('Connected to Motherboard: ' + ser.port)

    except serial.serialutil.SerialException as e:
        print("Error opening the serial device!")
        print("The port might be already selected,or have given an incorrect serial device identifier.")
        print("Error was:\n\t{}".format(e))
        return
    
if sys.platform.startswith('win'):
    port = input("Enter the port (ex COM30) and press Enter: ")
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
    # this excludes your current terminal "/dev/tty"
    port = input("Enter the port (ex /dev/tty.usbserial-DM3HUW9M) and press Enter: ")
elif sys.platform.startswith('darwin'):
    port = input("Enter the port (ex /dev/tty.usbserial-DM3HUW9M) and press Enter: ")
else:
    raise EnvironmentError('Unsupported platform')
connect(port)
rx_cb = serial_rx_cb(ser)

#  <font color='brown'> Creating SDK instance</font>

Create an instance for the watch class with passing the transmission callback through the function.

Then set the platform of the watch to python

In [None]:
active_watch = gen3_sdk.watch(serial_tx_cb(ser).__disown__())
active_watch.set_platform(gen3_sdk.watch.python)

#  <font color='brown'> Load configuration</font>

Loading the cofiguration for adpd4000 from a dcfg file

Read the register address and value from the file.

Then will write the address and value through SDK

In [None]:
import string

def LoadCfg(filename):
    line_number = 0;
    with open(filename) as file:
        for line in file:
            line_number = line_number + 1
            line = line.lstrip();
            line = line[:line.find('#')]
            if (len(line) != 0):
                reg_addr_pair = line.split(' ')
                if (len(reg_addr_pair) < 2):
                    print ("Error read config file: {0}", filename)
                    raise Exception('LoadCfgV1: {}'.format(line_number, line))
                    return None #Is this needed?
                reg_op = []
                reg_op.append([int(reg_addr_pair[0], 16), int(reg_addr_pair[1], 16)])
                ret = active_watch.adpd_app.register_write(reg_op)
                
#Uncomment the below codes to print the address and value that was written by the SDK

#                 for i in ret:
#                     print("address: {} value: {}".format(hex(i[0]), hex(i[1])))
                    


#  <font color='brown'> CTR Calculation</font>


In [None]:
import math

LoadCfg('config\ADPD188BIZ-SK_Smoke_OpenAir_01.dcfg')

CAPACITANCE = 6.3 * math.pow(10, -12);
VOLTAGE = 1.2;
CONSTANT = 8192;
UNIT_CONVERSION_ENABLED = 1;

ENABLED_SLOT = {
    'SLOTA' : 0,
    'SLOTB' : 1,
}

RegisterClass = {
    'BIT_POSITION_0' : 0,
    'BIT_POSITION_1' : 1,
    'BIT_POSITION_2' : 2,
    'BIT_POSITION_3' : 3,
    'BIT_POSITION_4' : 4,
    'BIT_POSITION_5' : 5,
    'BIT_POSITION_6' : 6,
    'BIT_POSITION_7' : 7,
    'BIT_POSITION_8' : 8,
    'BIT_POSITION_9' : 9,
    'BIT_POSITION_10' : 10,
    'BIT_POSITION_11' : 11,
    'BIT_POSITION_12' : 12,
    'BIT_POSITION_13' : 13,
    'BIT_POSITION_14' : 14,
    'BIT_POSITION_15' : 15,
}

bpfLookUpTable =  [  #200K #100K #50K     #Pulse Width
                    [0.79, 0.89, 0.91 ],  # 1 µs
                    [0.83, 0.89, 0.90 ],  # 2 µs
                    [0.78, 0.81, 0.82 ],  # 3 µs
                    [0.69, 0.71, 0.72 ] ] # 4 µs
    
    

AFE_TRIM_A = 0x42
AFE_TRIM_B = 0x44
PULSE_PERIOD_A = 0x31
PULSE_PERIOD_B = 0x36
PULSE_OFFSET_A = 0x30
PULSE_OFFSET_B = 0x35
AFE_TEST_A = 0x43
AFE_TEST_B = 0x45
PD_SELECT = 0x14

KILO_OHM_400 = 400000;
KILO_OHM_200 = 200000;
KILO_OHM_100 = 100000;
KILO_OHM_50 = 50000;
KILO_OHM_25 = 25000;


MAX_CTR_PULSEWIDTH = 4;
DEFAULT_BPFGain = 1;
BPFEnableRegValue = 0xADA5;
BPFDisableRegValue_1 = 0xAE65;
BPFDisableRegValue_2 = 0xB065;

TIA_GAIN_200K = 0;
TIA_GAIN_100K = 1;
TIA_GAIN_50K = 2;
TIA_GAIN_25K = 3;

ledSelectA = 0
ledSelectB = 0

LED1Coarse = 0x23
LED2Coarse = 0x24
LED3Coarse = 0x22
LEDFine = 0x25

tiaGainAIndex = 0;
tiaGainBIndex = 0;

def GetIntegratorOhmsValue(x):
    return {
        
        0 : KILO_OHM_400,
        1 : KILO_OHM_200,
        2 : KILO_OHM_100,
    }.get(x,KILO_OHM_100)

def GetTIAGainValue(x):
    return {
        
        0 : KILO_OHM_200,
        1 : KILO_OHM_100,
        2 : KILO_OHM_50,
        3 : KILO_OHM_25,
    }.get(x,0)

def getBitValues(x, startBit, endBit):
    XTemp = x;
    XTemp <<= 15 - endBit;
    XTemp &= 0xffff
    XTemp >>= 15 - endBit + startBit;
    XTemp &= 0xffff
    return XTemp;


def GetFinalLedCurrent_X(ledSelect):
    finalLedcurrent = 0;
    regLedXCoarse = 0
    regLedXfine = 0
    regLedXScale = 0
    if (ledSelect != 0):
        if(ledSelect == 1):
            regValue = active_watch.adpd_app.register_read([LED1Coarse])
            for i in regValue:
                regLedXCoarse = getBitValues(i[1], RegisterClass['BIT_POSITION_0'], RegisterClass['BIT_POSITION_3']);
                
            regValue = active_watch.adpd_app.register_read([LEDFine])
            for i in regValue:
                regLedXfine = getBitValues(i[1], RegisterClass['BIT_POSITION_0'], RegisterClass['BIT_POSITION_4']);
                
            regValue = active_watch.adpd_app.register_read([LED1Coarse])
            for i in regValue:
                regLedXScale = getBitValues(i[1], RegisterClass['BIT_POSITION_13'], RegisterClass['BIT_POSITION_13']);
        elif(ledSelect == 2):
            regValue = active_watch.adpd_app.register_read([LED2Coarse])
            for i in regValue:
                regLedXCoarse = getBitValues(i[1], RegisterClass['BIT_POSITION_0'], RegisterClass['BIT_POSITION_3']);
                
            regValue = active_watch.adpd_app.register_read([LEDFine])
            for i in regValue:
                regLedXfine = getBitValues(i[1], RegisterClass['BIT_POSITION_6'], RegisterClass['BIT_POSITION_10']);
                
            regValue = active_watch.adpd_app.register_read([LED2Coarse])
            for i in regValue:
                regLedXScale = getBitValues(i[1], RegisterClass['BIT_POSITION_13'], RegisterClass['BIT_POSITION_13']);
        elif(ledSelect == 3):
            regValue = active_watch.adpd_app.register_read([LED3Coarse])
            for i in regValue:
                regLedXCoarse = getBitValues(i[1], RegisterClass['BIT_POSITION_0'], RegisterClass['BIT_POSITION_3']);
                
            regValue = active_watch.adpd_app.register_read([LEDFine])
            for i in regValue:
                regLedXfine = getBitValues(i[1], RegisterClass['BIT_POSITION_11'], RegisterClass['BIT_POSITION_15']);
                
            regValue = active_watch.adpd_app.register_read([LED3Coarse])
            for i in regValue:
                regLedXScale = getBitValues(i[1], RegisterClass['BIT_POSITION_13'], RegisterClass['BIT_POSITION_13']);

        LEDXCOARSE = 50.3 + 19.8 * regLedXCoarse;
        LEDXFINE = 0.74 + 0.022 * regLedXfine;
        LEDXSCALE = 0;

        regValue = active_watch.adpd_app.register_read([0x08])
        chipid = 0
        for i in regValue:
            chipid = i[1]
        
        if (chipid >= 0x0916):   #added to support 0x0A16
            LEDXSCALE = 0.1 + 0.9 * regLedXScale;
        else:
            LEDXSCALE = 0.4 + 0.6 * regLedXScale;
        finalLedcurrent = (LEDXCOARSE * LEDXFINE * LEDXSCALE);

    return finalLedcurrent;

def GetBPFValueFromLookUpTable(LedPulse, TIAGain):
    bpfGainValue = 0;

    if (LedPulse <= MAX_CTR_PULSEWIDTH):
        if (LedPulse != 0):
            LedPulse = LedPulse - 1;
        else:
            LedPulse = MAX_CTR_PULSEWIDTH - 1;

        if (TIAGain == TIA_GAIN_25K):
            TIAGain = TIA_GAIN_50K;

        bpfGainValue = bpfLookUpTable[LedPulse][TIAGain];

    return bpfGainValue;


def GetPulseNumber(ActiveSlot): 
    
    if(ActiveSlot == ENABLED_SLOT['SLOTA']):
        regValue = active_watch.adpd_app.register_read([PULSE_PERIOD_A])
        for i in regValue:
            numPulseA = getBitValues(i[1], RegisterClass['BIT_POSITION_8'], RegisterClass['BIT_POSITION_15']);
        return numPulseA;
    else:
        regValue = active_watch.adpd_app.register_read([PULSE_PERIOD_B])
        for i in regValue:
            numPulseB = getBitValues(i[1], RegisterClass['BIT_POSITION_8'], RegisterClass['BIT_POSITION_15']);
        return numPulseB;
        
def GetIntegratorResisterValue(ActiveSlot):
    if(ActiveSlot == ENABLED_SLOT['SLOTA']):
        regValue = active_watch.adpd_app.register_read([AFE_TRIM_A])
        for i in regValue:
            iResistorValueA = getBitValues(i[1], RegisterClass['BIT_POSITION_8'], RegisterClass['BIT_POSITION_9']);
        return GetIntegratorOhmsValue(iResistorValueA);
    else:
        regValue = active_watch.adpd_app.register_read([AFE_TRIM_B])
        for i in regValue:
            iResistorValueB = getBitValues(i[1], RegisterClass['BIT_POSITION_8'], RegisterClass['BIT_POSITION_9']);
        return GetIntegratorOhmsValue(iResistorValueB);
    
def GetTIAFeedbackResisterValue(ActiveSlot):
    if(ActiveSlot == ENABLED_SLOT['SLOTA']):
        regValue = active_watch.adpd_app.register_read([AFE_TRIM_A])
        tiaGainAIndex = 0
        for i in regValue:
            tiaGainAIndex = getBitValues(i[1], RegisterClass['BIT_POSITION_0'], RegisterClass['BIT_POSITION_1']);
        return GetTIAGainValue(tiaGainAIndex);
    else:
        regValue = active_watch.adpd_app.register_read([AFE_TRIM_B])
        tiaGainBIndex = 0
        for i in regValue:
            tiaGainBIndex = getBitValues(i[1], RegisterClass['BIT_POSITION_0'], RegisterClass['BIT_POSITION_1']);
        return GetTIAGainValue(tiaGainBIndex);
    
def GetPulseWidth(ActiveSlot):
    if(ActiveSlot == ENABLED_SLOT['SLOTA']):
        regValue = active_watch.adpd_app.register_read([PULSE_OFFSET_A])
        for i in regValue:
            PulseWidthA = getBitValues(i[1], RegisterClass['BIT_POSITION_8'], RegisterClass['BIT_POSITION_12']);
        return PulseWidthA;
    else:
        regValue = active_watch.adpd_app.register_read([PULSE_OFFSET_B])
        for i in regValue:
            PulseWidthB = getBitValues(i[1], RegisterClass['BIT_POSITION_8'], RegisterClass['BIT_POSITION_12']);
        return PulseWidthB;
    
def GetBPFGainValue_LUT(ActiveSlot):
    bpfGain = 0;
    regValue = 0;
    if(ActiveSlot == ENABLED_SLOT['SLOTA']):
        regValue = active_watch.adpd_app.register_read([AFE_TEST_A])
        for i in regValue:
            if (i[1] == BPFEnableRegValue):
                bpfGain = GetBPFValueFromLookUpTable(ledPulseWidthA, tiaGainAIndex);
            else:
                bpfGain = DEFAULT_BPFGain;
    else:
        regValue = active_watch.adpd_app.register_read([AFE_TEST_B])
        for i in regValue:
            if (i[1] == BPFEnableRegValue):
                bpfGain = GetBPFValueFromLookUpTable(ledPulseWidthA, tiaGainBIndex);
            else:
                bpfGain = DEFAULT_BPFGain;
    return bpfGain
    
def GetFinalLedCurrent(ActiveSlot):
    finalLedCurrent_X = 0;
    regValue = active_watch.adpd_app.register_read([PD_SELECT])
    global ledSelectA,ledSelectB
    if(ActiveSlot == ENABLED_SLOT['SLOTA']):
        for i in regValue:
            ledSelectA = getBitValues(i[1], RegisterClass['BIT_POSITION_0'], RegisterClass['BIT_POSITION_1']);
        finalLedCurrent_X = GetFinalLedCurrent_X(ledSelectA);
    else:
        for i in regValue:
            ledSelectB = getBitValues(i[1], RegisterClass['BIT_POSITION_2'], RegisterClass['BIT_POSITION_3']);
        finalLedCurrent_X = GetFinalLedCurrent_X(ledSelectB);
    return finalLedCurrent_X

#######################___SLOT_A___###################

numberOfPulsesA = GetPulseNumber(ENABLED_SLOT['SLOTA']);

#     //Getting Integrator Resistor value
integratorValueA = GetIntegratorResisterValue(ENABLED_SLOT['SLOTA']);

#     //Getting TIA Feedback Rresistor Ohm value
tiaGainA = GetTIAFeedbackResisterValue(ENABLED_SLOT['SLOTA']);

#     //Getting Led Pulse width for SlotA
ledPulseWidthA = GetPulseWidth(ENABLED_SLOT['SLOTA']);

#     //Getting BPF Gain based on ledPulseWidthA value
bpfGain_A = GetBPFGainValue_LUT(ENABLED_SLOT['SLOTA']);


finalLedCurrent_A = GetFinalLedCurrent(ENABLED_SLOT['SLOTA'])

#######################___SLOT_B____###################

numberOfPulsesB = GetPulseNumber(ENABLED_SLOT['SLOTB']);

#     //Getting Integrator Resistor value
integratorValueB = GetIntegratorResisterValue(ENABLED_SLOT['SLOTB']);

#     //Getting TIA Feedback Rresistor Ohm value
tiaGainB = GetTIAFeedbackResisterValue(ENABLED_SLOT['SLOTB']);

#     //Getting Led Pulse width for SlotA
ledPulseWidthB = GetPulseWidth(ENABLED_SLOT['SLOTB']);

#     //Getting BPF Gain based on ledPulseWidthA value
bpfGain_B = GetBPFGainValue_LUT(ENABLED_SLOT['SLOTB']);

finalLedCurrent_B = GetFinalLedCurrent(ENABLED_SLOT['SLOTB'])

def Calculate_CTR(Sample, ActiveSlot):
    CalculatedCTR = 0;
    I_PD = 0;
    constant = 0;
    sample_constant = 0;
    R4_2Rf = 0;
    bpfGainFactor = 0;
    t_PW = 0;
    sample_numpulse = 0;
    
    fc_LSB = (CAPACITANCE * VOLTAGE) / CONSTANT;
    
    if(ActiveSlot == ENABLED_SLOT['SLOTA']):
        
        constant = fc_LSB;
        sample_constant = constant * Sample;
        R4_2Rf = integratorValueA / (2 * tiaGainA);
        bpfGainFactor = 1 / bpfGain_A;
        t_PW = 1 / (ledPulseWidthA * (math.pow(10, -6)));
        sample_numpulse = sample_constant / numberOfPulsesA;
        I_PD = sample_numpulse * R4_2Rf * bpfGainFactor * t_PW;
        if (finalLedCurrent_A != 0):
            CalculatedCTR = (I_PD * (math.pow(10, 9))) / finalLedCurrent_A;
        else:
            CalculatedCTR = 0;
        
    else:
        constant = fc_LSB;
        sample_constant = constant * Sample;
        R4_2Rf = integratorValueB / (2 * tiaGainB);
        bpfGainFactor = 1 / bpfGain_B;
        t_PW = 1 / (ledPulseWidthB * (math.pow(10, -6)));
        sample_numpulse = sample_constant / numberOfPulsesB;
        I_PD = sample_numpulse * R4_2Rf * bpfGainFactor * t_PW;
        if (finalLedCurrent_B != 0):
            CalculatedCTR = (I_PD * (math.pow(10, 9))) / finalLedCurrent_B;
        else:
            CalculatedCTR = 0;
    return CalculatedCTR
  

#  <font color='brown'> PTR Calculation</font>


In [None]:
BLUE_LED = 1
IR = 3

BLUE_PDET_AMPS_PER_WATT = 0.26;
IR_PDET_AMPS_PER_WATT = 0.41;
BLUE_NOMINAL_WATTS_PER_AMP = 0.38;
IR_NOMINAL_WATTS_PER_AMP = 0.22;

A0 = 9.8976E-01;
A1 = -5.1448E-03;
A2 = 2.0287E-05;
A3 = -2.9645E-08;

partToPartScalerBlue = 1.0;
partToPartScalerIR = 1.0;
IR_DERATING_FACTOR = 1.0;

def getNominalWattsPerAmp(ActiveSlot):
    NOMINAL_WATTS_PER_AMP_X = 0;
    if(ActiveSlot == ENABLED_SLOT['SLOTA']):
        if (ledSelectA == BLUE_LED):
            NOMINAL_WATTS_PER_AMP_X = BLUE_NOMINAL_WATTS_PER_AMP;
        elif (ledSelectA == IR):
            NOMINAL_WATTS_PER_AMP_X = IR_NOMINAL_WATTS_PER_AMP;
    else:
        if (ledSelectB == BLUE_LED):
            NOMINAL_WATTS_PER_AMP_X = BLUE_NOMINAL_WATTS_PER_AMP;
        elif (ledSelectB == IR):
            NOMINAL_WATTS_PER_AMP_X = IR_NOMINAL_WATTS_PER_AMP;
    return NOMINAL_WATTS_PER_AMP_X

def getDeratingFactorBlue(ActiveSlot):
    blueDeratingFactor = 0;
    ILED_MA = 0;
    if(ActiveSlot == ENABLED_SLOT['SLOTA']):
        ILED_MA = finalLedCurrent_A
    else:
        ILED_MA = finalLedCurrent_B
    
    blueDeratingFactor = A0 + (A1 * ILED_MA) + (A2 * math.pow(ILED_MA, 2)) + (A3 * math.pow(ILED_MA, 3));
    return blueDeratingFactor
    
def getPartToPartScaler(ActiveSlot):
    partToPartSacler_X = 0;
    if(ActiveSlot == ENABLED_SLOT['SLOTA']):
        if (ledSelectA == BLUE_LED):
            partToPartSacler_X = partToPartScalerBlue;
        elif (ledSelectA == IR):
            partToPartSacler_X = partToPartScalerIR;
    else:
        if (ledSelectB == BLUE_LED):
            partToPartSacler_X = partToPartScalerBlue;
        elif (ledSelectB == IR):
            partToPartSacler_X = partToPartScalerIR;
    return partToPartSacler_X


def getPdetAmpsPerWatt(ActiveSlot):
    PDET_AMPS_PER_WATT_X = 0
    if(ActiveSlot == ENABLED_SLOT['SLOTA']):
        if (ledSelectA == BLUE_LED):
            PDET_AMPS_PER_WATT_X = BLUE_PDET_AMPS_PER_WATT;
        elif (ledSelectA == IR):
            PDET_AMPS_PER_WATT_X = IR_PDET_AMPS_PER_WATT;
    else:
        if (ledSelectB == BLUE_LED):
            PDET_AMPS_PER_WATT_X = BLUE_PDET_AMPS_PER_WATT;
        elif (ledSelectB == IR):
            PDET_AMPS_PER_WATT_X = IR_PDET_AMPS_PER_WATT;
    return PDET_AMPS_PER_WATT_X

def getLedWattsPerAmp(ActiveSlot):
    LedWattsPerAmp_X = 0;
    if(ActiveSlot == ENABLED_SLOT['SLOTA']):
        if (ledSelectA == BLUE_LED):
            LedWattsPerAmp_X = getNominalWattsPerAmp(ActiveSlot) * getDeratingFactorBlue(ActiveSlot) * getPartToPartScaler(ActiveSlot)
        elif (ledSelectA == IR):
            LedWattsPerAmp_X = getNominalWattsPerAmp(ActiveSlot) * IR_DERATING_FACTOR * getPartToPartScaler(ActiveSlot);
    else:
        if (ledSelectB == BLUE_LED):
            LedWattsPerAmp_X = getNominalWattsPerAmp(ActiveSlot) * getDeratingFactorBlue(ActiveSlot) * getPartToPartScaler(ActiveSlot);
        elif (ledSelectB == IR):
            LedWattsPerAmp_X = getNominalWattsPerAmp(ActiveSlot) * IR_DERATING_FACTOR * getPartToPartScaler(ActiveSlot);
    return LedWattsPerAmp_X


def CTR_to_PTR(CTR,ActiveSlot):
    CalculatedPTR = 0
    PdetAmpsPerWatt = getPdetAmpsPerWatt(ActiveSlot);
    LedWattsPerAmp = getLedWattsPerAmp(ActiveSlot);
    if (PdetAmpsPerWatt == 0 or LedWattsPerAmp == 0):
        return CalculatedPTR;
    CalculatedPTR = CTR * (1 / PdetAmpsPerWatt) * (1 / LedWattsPerAmp);
    return CalculatedPTR;
    

#  <font color='brown'> ADPD Stream Callback</font>

SDK will provide the streams in the callback function. 

So before starting adpd streams, we need to initialize the adpd_cb callback to receive the adpd data streams.

In [None]:
slotA_arr = []
slotB_arr = []

class adpd_cb(gen3_sdk.adpd_stream_callback):
    def __init__(self):
        gen3_sdk.adpd_stream_callback.__init__(self)
        slotA_arr.clear()
        slotB_arr.clear()
        
    def call(self, data, sequence_num):
        for d in data:
            if(d.datatype == gen3_sdk.RAW_DATA_ADPD_B_SUM_32b):
                ctr_a = Calculate_CTR(d.sum32_adpd_data, ENABLED_SLOT['SLOTA'])
                ptr_a = CTR_to_PTR(ctr_a,ENABLED_SLOT['SLOTA'])
                slotA_arr.append([d.sum32_adpd_data, ctr_a, ptr_a])
            elif(d.datatype == gen3_sdk.RAW_DATA_ADPD_A_SUM_32b) :
                ctr_b = Calculate_CTR(d.sum32_adpd_data, ENABLED_SLOT['SLOTB'])
                ptr_b = CTR_to_PTR(ctr_b,ENABLED_SLOT['SLOTB'])
                slotB_arr.append([d.sum32_adpd_data, ctr_b, ptr_b])

def deserialize_adpd(slot_arr):
    adpd = []
    ctr = []
    ptr = []
    for x in slot_arr:
        adpd.append(x[0])
        ctr.append(x[1])
        ptr.append(x[2])
    return adpd,ctr,ptr
    
    
        

#  <font color='brown'> Commands to start ADPD</font>


There are two command structure for starting any streams,

  1) L1 commands - Each L1 command is mapped to an equivalent M2M2 command
        
  Ex :<font color='olive'>active_watch.adpd_app.adpd_stream.subscribe(adpd_cb().__disown__())
    
   active_watch.adpd_app.adpd_stream.start() </font>
                 
  2) L2 commands - L2 commands encapsulate a group of L1 commands to make using the API more easier.
        
  Ex :<font color='olive'> active_watch.start_adpd(adpd_cb().__disown__()) </font>

In [None]:

if(ser.isOpen() == True):
        
    print("ADPD stream started")
    
    LoadCfg('config\ADPD188BIZ-SK_Smoke_OpenAir_01.dcfg')
    active_watch.adpd_app.calibrate_clock()
    active_watch.adpd_app.set_slot(active_watch.adpd_app.SLOTMODE_SUM32b, active_watch.adpd_app.SLOTMODE_SUM32b)
    active_watch.adpd_app.adpd_stream.subscribe(adpd_cb().__disown__())
    active_watch.adpd_app.adpd_stream.start()

    print("Waiting for the samples to be Collected")
    time.sleep(5)

    active_watch.stop_adpd()

    print("ADPD stream stopped")
else:
    print("Serial port is not connected")



#  <font color='brown'> Plotting</font>

Plot the adpd data, CTR data and PTR data based on the graph selection.

In [None]:

adpd_a,ctr_a,ptr_a = deserialize_adpd(slotA_arr)

samples_slotA = []
for index in range(len(adpd_a)):
    samples_slotA.append(index)

adpd_b,ctr_b,ptr_b = deserialize_adpd(slotB_arr)

samples_slotB = []
for index in range(len(adpd_b)):
    samples_slotB.append(index)
        

def f(Select_Graph):
    if Select_Graph == 'CTR':
        #########################__PLOTTING_FOR_CTR__####################
        plt.plot(samples_slotA,ctr_a,label = "SLOT_A_SUM32", color = "red")
        plt.plot(samples_slotB,ctr_b,label = "SLOT_B_SUM32", color = "green")
        plt.title('CTR')
        plt.ylabel('amplitude')
        plt.xlabel('Samples')
        plt.legend()
        plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
        plt.show()
        
    elif Select_Graph == 'PTR':
        #########################__PLOTTING_FOR_PTR__####################
        plt.plot(samples_slotA,ptr_a,label = "slot_A_SUM32", color = "brown")
        plt.plot(samples_slotB,ptr_b,label = "slot_B_SUM32", color = "black")
        plt.title('PTR')
        plt.ylabel('amplitude')
        plt.xlabel('Samples')
        plt.legend()
        plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
        plt.show()
    else:
        #########################__PLOTTING_FOR_ADPDData__####################
        plt.plot(samples_slotA,adpd_a,label = "slot_A_SUM32", color = "blue")
        plt.plot(samples_slotB,adpd_b,label = "slot_B_SUM32", color = "orange")
        plt.title('ADPD')
        plt.ylabel('amplitude')
        plt.xlabel('Samples')
        plt.legend()
        plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
        plt.show()
        
Graph_view  = ['ADPD','CTR','PTR']

interact(f , Select_Graph = Graph_view)



#  <font color='brown'> Disconnect</font>

Disconnect the serial port and stop the receiver thread.

In [None]:
rx_cb.close()
ser.close()
print("Disconnected")