# Biologic Programs

In [1]:
import os
import math
import time

from abc import ABC
from collections import namedtuple

from .lib import ec_lib as ecl
from .lib import data_parser as dp

ModuleNotFoundError: No module named '__main__.lib'; '__main__' is not a package

In [None]:
DataSegment = namedtuple( 'DataSegment', [
    'data', 'info', 'values'
] )

In [None]:
#--- helper function ---
def cast( lst, kind ):
    """
    Casts esch element of a list into the given type.
    
    :param lst: List of elements.
    :param kind: Cast type.
    :returns: List with casted elements.
    """
    return [ ]

In [2]:
class BiologicProgram( ABC ): 
    """
    Abstract Class
    Represents a Biologic Program
    
    Stores data.
    """
    
    def __init__( self, device, channel, params, autoconnect = True ):
        """
        Initialize instance parameters.
        
        :param device: A BiologicDevice to run the program on.
        :param channel: The channel to run the program on.
        :param params: Dictionary of parameters to be passed to the program.
        :param autoconnect: Automatically connect and disconnect to device during run.
            [Default: True]
        """
        self.device  = device
        self.channel = channel
        self.params  = params
        self.autoconnect  = autoconnect
        self.field_titles = [] # column names for saving data
        
        self._techniques  = [] # program techniques
        self._data        = [] # data store
        self._fields      = None # program fields
        self._data_fields = None # technique fields
        
    
    
    #--- properties ---
    
    @property
    def data( self ):
        """
        :returns: Data collected from program.
        """
        return self._data
    
    
    @property
    def status( self ):
        """
        :returns: Status of the program.
        """
        pass 
    
    
    @property
    def fields( self ):
        """
        :returns: Fields object.
        """
        return self._fields
    
    
    @property
    def techniques( self ):
        """
        :returns: Technqiue(s) of the program
        """
        return self._techniques
    
    #--- public methods ---
    
    
    def run( self ):
        """
        Runs the program.
        """
        pass
    
    
    def save_data( self, file, append = False ):
        """
        Saves data to a CSV file.
        
        :param file: File path.
        :param append: True to append to file, false to overwrite.
            [Default: False]
        """
        mode = 'a' if append else 'w'
        
        with open( file, mode ) as file:
            if not append:
                # write header only if not appending
                file.write( ', '.join( self.field_titles ) )
                file.write( '\n' )

            for datum in self.data:
                file.write( ', '.join( map( str, datum ) ) )
                file.write( '\n' )
    
    
    #--- protected methods ---
    
    
    def _connect( self ):
        """
        Connects device if needed
        """
        if not self.device.is_connected():
            self.device.connect()
            
            
    def _disconnect( self ):
        """
        Disconnects device
        """
        if self.device.is_connected():
            self.device.disconnect()
            
    
    def _retrieve_data( self, interval = 1 ):
        """
        Retrieves data from the device until it is stopped.
        Data is parsed.
        
        :param interval: How often to colelct data in seconds.
            [Default: 1]      
        :returns: A list of DataSegments with properties [ data, info, values ].
        """
        data = []
        state = True
        while ( state is not ecl.ChannelState.STOP ):
            # wait until technique is complete
            time.sleep( interval ) # wait
            raw = self.device.get_data( self.channel )
            state = ecl.ChannelState( raw.values.State  )
            
            parsed = dp.parse( 
                raw.data, 
                raw.info, 
                self._data_fields 
            )
            
            data.append( DataSegment( parsed, raw.info, raw.values ) )

        return data

In [None]:
class OCV( BiologicProgram ):
    """
    Runs an open circuit voltage scan.
    """
    
    def __init__( self, device, channel, params, autoconnect = True ):
        """
        Params are
        time: Run time in seconds.
        time_interval: Maximum time between readings.
        voltage_interval: Maximum interval between voltage readings.
        """
        super().__init__( device, channel, params )
        
        self._techniques = [ 'ocv' ]
        self._fields = namedtuple( 'OCV_Datum', [ 'time', 'voltage' ] )
        self.field_titles = [ 'Time [s]', 'Voltage [V]' ]
        self._data_fields = dp.SP300_Fields.OCV
        
        
    def run( self ):
        params = {
            'Rest_time_T':     self.params[ 'time' ],
            'Record_every_dE': self.params[ 'voltage_interval' ],
            'Record_every_dT': self.params[ 'time_interval' ]
        }
        
        # run technique
        self._connect()
        self.device.load_technique( self.channel, 'ocv', params )
        self.device.start_channel( self.channel )
        data = self._retrieve_data()
        
        # parse data
        self._data = [
            self._fields(
                dp.calculate_time( # time
                    datum.t_high, 
                    datum.t_low, 
                    segment.info, 
                    segment.values 
                ),
                
                datum.voltage
            )
            
            for segment in data 
            for datum in segment.data   
        ]
        
        self._disconnect()

In [None]:
class CA( BiologicProgram ):
    """
    Runs a cyclic amperometry technqiue.
    """
    
    def __init__( self, device, channel, params, autoconnect = True ):
        """
        Params are
        voltages: List of voltages.
        durations: List of times in seconds.
        time_interval: Maximum time interval between points.
        current_interval: Maximum current change between points.
        """
        super().__init__( device, channel, params )
        
        self.params[ 'voltages' ] = [ 
            float( v ) for v in self.params[ 'voltages' ]
        ]
        
        self._technqiues = [ 'ca' ]
        self._fields = namedtuple( 'CA_Datum', [
            'time', 'voltage', 'current', 'power', 'cycle'
        ] )
        
        self.field_titles = [ 
            'Time [s]', 
            'Voltage [V]', 
            'Current [A]', 
            'Power [W]', 
            'Cycle' 
        ]
        
        self._data_fields = dp.SP300_Fields.CA
        
        
    def run( self ):
        steps = len( self.params[ 'voltages' ] )
        
        params = {
            'Voltage_step':      self.params[ 'voltages' ],
            'vs_inital':         [ True ]* steps,
            'Duration_step':     self.params[ 'durations' ],
            'Step_number':       steps - 1,
            'Record_every_dT':   self.params[ 'time_interval' ],
            'Record_every_dI':   self.params[ 'current_interval' ],
            'N_Cycles':          0
        }

        # run technique
        self._connect()
        self.device.load_technique( self.channel, 'ca', params )
        self.device.start_channel( self.channel )
        data = self._retrieve_data()

        # technqiue complete
        self._data = [
            self._fields(
                dp.calculate_time( 
                    datum.t_high, 
                    datum.t_low, 
                    segment.info, 
                    segment.values 
                ),

                datum.voltage,
                datum.current,
                datum.voltage* datum.current, # power
                datum.cycle
            )
            
            for segment in data
            for datum in segment.data
        ]
        
        self._disconnect()

In [None]:
class JV_Scan( BiologicProgram ):
    """
    Runs a JV scan.
    """
    def __init__( self, device, channel, params, autoconnect = True ):
        """
        Params are
        start: Start voltage. [ Defualt: 0 ]
        end: End voltage.
        step: Voltage step.
        rate: Scan rate in mV/s.
        average: Average over points. [Default: False]
        """
        # defaults
        if 'start' not in params:
            # start not provided, use default
            params[ 'start' ] = 0.0
            
        if 'average' not in params:
            # average not provided, use default
            params[ 'average' ] = False
        
        super().__init__( device, channel, params )
        
        self._techniques = [ 'cv' ]
        self._fields = namedtuple( 'CV_Datum', [
           'voltage', 'current', 'power' 
        ] )
        self.field_titles = [ 'Voltage [V]', 'Current [A]', 'Power [W]' ]
        self._data_fields = dp.SP300_Fields.CV
    
    
    def run( self ):
        # setup scan profile ( start -> end -> start )
        voltage_profile = [ self.params[ 'start' ] ]* 5
        voltage_profile[ 1 ] = self.params[ 'end' ]
        
        params = {
            'vs_initial':   [ True ]* 5,
            'Voltage_step': voltage_profile,
            'Scan_Rate':    [ self.params[ 'rate' ] ]* 5,
            'Scan_number':  2,
            'Record_every_dE':   self.params[ 'step' ],
            'Average_over_dE':   self.params[ 'average' ],
            'N_Cycles':          0,
            'Begin_measuring_I': 0.0, # start measurement at beginning of interval
            'End_measuring_I':   1.0 # finish measurement at end of interval
        }
        
        # run technique
        self._connect()
        self.device.load_technique( self.channel, 'cv', params )
        self.device.start_channel( self.channel )
        data = self._retrieve_data()

        # technique complete
        self._data = [ 
            self._fields( 
                datum.voltage, 
                datum.current, 
                datum.voltage* datum.current # power
            ) 
            
            for segment in data
            for datum in segment.data
        ]
        
        self._disconnect()

In [1]:
class MPP( BiologicProgram ):
    """
    Run MPP tracking.
    """
    def __init__( self, device, channel, params, autoconnect = True ):
        """
        Params are
        probe_step: Voltage step for probe.
        probe_interval: Time between JV scans or False.
        run_time: Run time in minutes.
        """
        super().__init__( device, channel, params )
        
        self._techniques = [ 'ocv', 'cv', 'ca' ]
        self.field_titles = []
        
        
    def run( self, data = 'data' ):
        """
        :param data: Data folder path. [Default: 'data']
        """
        # create folder path if needed
        if not os.path.exists( data ):
            os.makedirs( data )
            
        ocv_file = 'voc.csv'
        jv_file  = 'jv.csv'
        mpp_file = 'mpp.csv'
        
        self._connect()
        
        # ocv
        ocv_params = {
            'time': 1,
            'time_interval': 0.1,
            'voltage_interval': 0.001
        }
        
        ocv_pg = OCV( 
            self.device, 
            self.channel, 
            ocv_params, 
            autoconnect = False 
        )
        
        ocv_pg.run()
        ocv_pg.save_data(
            os.path.join( data, ocv_file )
        )
        
        voc = [ datum.voltage for datum in ocv_pg.data ]
        voc = sum( voc )/ len( voc )
        
        # jv
        jv_params = {
            'start': 0.0, # realtive to v_oc
            'end':   -voc, # scan to 0 V
            'step':  0.01,
            'rate':  10.0,
            'average': False
        }
        
        jv_pg = JV_Scan( 
            self.device, 
            self.channel, 
            jv_params,
            autoconnect = False
        )
        
        jv_pg.run()
        jv_pg.save_data( 
            os.path.join( data, jv_file )
        )
        
        # ca      
        
        
                
        # mpp tracking
        probe_step = ( 
            self.params[ 'probe_step' ] 
            if ( 'probe_step' in self.params ) 
            else 0.01 # 10 mV default
        )
        
        # mpp from initial jv scan
        mpp = min( jv_pg.data, key = lambda d: d.power ) # power of interest is negative
        v_mpp = mpp.voltage
        delta_v = v_mpp - voc # ended at v_oc, move to v_mpp
        
        # init file
        ca_titles = [ 
            'Time [s]', 
            'Voltage [V]', 
            'Current [A]', 
            'Power [W]', 
            'Cycle' 
        ]
        with open( os.path.join( data, mpp_file ), 'w' ) as file:
            # write header only if not appending
            file.write( ', '.join( ca_titles ) )
            file.write( '\n' )
       
        
        # run mpp 
        ca_fields = dp.SP300_Fields.CA
        probe_time = 5 # min( 0.1* self.params[ 'probe_interval' ], 5 ) # 5 second or 10% of hold time
        hold_time  = self.params[ 'probe_interval' ] - probe_time

        # run ca for whole run time 
        steps = len( self.params[ 'voltages' ] )
        ca_params = {
            'Voltage_step':      [ v_mpp ],
            'vs_inital':         [ False ],
            'Duration_step':     self.params[ 'run_time' ],
            'Step_number':       0,
            'Record_every_dT':   1.0, # 1 s
            'Record_every_dI':   0.001, # 1 mA
            'N_Cycles':          0
        }

        # run technique
        self._connect()
        self.device.load_technique( self.channel, 'ca', ca_params )
        self.device.start_channel( self.channel )
        
        state = True
        while ( state is not ecl.ChannelState.STOP ):
            # hold
            time.sleep( hold_time ) # wait
            hold_raw = self.device.get_data( self.channel )
            
            # probe
            probe_params = {
                'Voltage_step': [ probe_step ]
            }
            self.device.update_parameters( self.channel, 'ca', probe_params )
            
            time.sleep( probe_time )
            probe_raw = self.device.get_data( self.channel )
            state = ecl.ChannelState( probe_raw.values.State  )
            
            # parse data
            hold_data = dp.parse( 
                hold_raw.data, 
                hold_raw.info, 
                ca_fields 
            )
            
            probe_data = dp.parse(
                probe_raw.data,
                probe_raw.info,
                ca_fields
            )
            
            # compare probe and hold data
            hold_power  = hold_data[ -len( probe_data ): ] # take same number of points as probe data
            hold_power  = sum( hold_power )/ len( hold_power )
            probe_power = sum( probe_power )/ len( probe_power )
            
            # set new voltages
            probe_step *= -1 if ( probe_power < hold_power ) else 1
    
            # update voltage if probe moved in wrong direciton
            if probe_step is -1:
                hold_params = {
                    'Voltage_step': [ -2* probe_step ]
                }
                self.device.update_parameters( self.channel, 'ca', hold_params )
            
            # write data
            with open( os.path.join( data, mpp_file ), 'a' ) as file:
                # hold
                for datum in hold_data:
                    file.write( ', '.join( datum ) )
                    file.write( '/n' )
                    
                # probe
                for datum in probe_data:
                    file.write( ', '.join( datum ) )
                    file.write( '/n' )
        
        self._disconnect()

NameError: name 'BiologicProgram' is not defined

In [None]:
class MPP_JV():
    
    
    def run():
        jv_padding = 1
        if self.params[ 'jv_interval' ] is not False:
            # calculate number of JV scans to occur
            jv_scan_total = self.params[ 'run_time' ] / self.params[ 'jv_interval' ]
            jv_padding = math.ceil( math.log10( jv_scan_total ) )
         
        jv_scan_num = 0
        jv_base = 'jv-scan-{:0' + str( jv_padding ) + 'd}.csv'    
        
        
        
        if self.params[ 'jv_interval' ]:
            probe_interval = min( 0.1* self.params[ 'jv_interval' ], 1 ) # 1 second or 10% of hold time
            hold_interval = self.params[ 'jv_interval' ] - probe_interval
            
            
            
         # jv scan
        if self.params[ 'jv_interval' ]:
            jv_pg.run()
            jv_scan_num += 1
            jv_pg.save_data( 
                os.path.join( 
                    data, 
                    jv_base.format( jv_scan_num ) 
                )
            )

In [None]:
def mpp_base( delta_v, probe_step ):
            """
            A single mpp tracking sequence.
            
            :param delta_v: Initial voltage change.
            :param probe_step: Probe step magnitude in volts. [Default: 0.01]
            :returns: Next probe step.
            """
            voltage_threshold = probe_step* 0.2 # V
            
            probe_time = min( 0.1* self.params[ 'probe_interval' ], 1 ) # 1 second or 10% of hold time
            hold_time  = self.params[ 'probe_interval' ] - probe_time
            
            ca_params = {
                'voltages':  [ delta_v, delta_v + probe_step ],
                'durations': [ hold_time, probe_time ],
                'time_interval':    1.0,
                'current_interval': 0.001
            }

            ca_pg = CA( 
                self.device,
                self.channel,
                ca_params
            )
            
            ca_pg.run()
            ca_pg.save_data( 
                os.path.join( data, mpp_file ), 
                append = True 
            )
            
            # calculate new v_mpp
            # compare hold and probe voltage
            v_mpp = ca_pg.data[ 0 ].voltage
            
            hold_power = [ 
                datum.power 
                for datum in ca_pg.data
                if abs( datum.voltage - v_mpp ) < voltage_threshold
            ]

            probe_power = [ 
                datum.power 
                for datum in ca_pg.data
                if abs( datum.voltage - ( v_mpp + probe_step ) ) < voltage_threshold
            ]
            
            # back up if probe power fails
            if len( probe_power ) is 0:
                probe_power = ca_pg.data[ -1 ]

            hold_power  = hold_power[ :len( probe_power ) ] # normalize lengths
            hold_power  = sum( hold_power )  / len( hold_power )
            probe_power = sum( probe_power ) / len( probe_power )

            probe_step *= 1 if probe_power > hold_power else -1 
            return probe_step