In [None]:
import serial
import numpy as np
from time import sleep
import sys
import json

COM = 'COM9'# /dev/ttyACM0 (Linux)
BAUD = 19200
# ser.close()
ser = serial.Serial(COM, BAUD, timeout = .1)

print(ser.name)

In [None]:
def get_and_print_responses(serial_device, print_response = False):
    '''
        Retrieve messages written by the teensy to the 
        serial connection under the assumption that the output is 
        utf-8 encoded. If this assumption is violated the resulting 
        exception is printed and an empty string is returned.
    '''
    response = 'No response'
    ret = ''
    while response != '':
        response  = serial_device.readline()
        try:
            response = response.decode('utf-8') 
            ret += response   
        except Exception as e:
            print(f'Exception: {e} \nResponse: {response}')
            ret = ''
            
        if print_response:
            print(response)
    return ret

help(get_and_print_responses)
print(get_and_print_responses(ser))
print('--------------------------------------------------------------')
#ser.write(f"Q0\n".encode('utf8'))   # Switch it back-off    
sleep(0.5)  # Sleep a while to make sure data is ready for processing

print(get_and_print_responses(ser))
print('--------------------------------------------------------------')


def getDataSlice(serial_device,adc_type, start_pos):
    '''
    This function can be used to get a slice of the measured data.
    
    Parameters:
    
        serial_device: an instance of Serial providing the connection with 
                       the teensy board.
        
        adc_type: 1. V1 
                  2. V2
                  3. DAC stimulus
            
        start_pos: sample at which to start the data request. 
        
    Output:
    
        dataslice: A dictionary containing three key-value pairs: 'start', 'end' and 'data'.
                   the value of 'start' is the value of the parameter start_pos, the value
                   of 'end' is the index of the first sample that is not returned (to 
                   be used as start_pos in the next call), the value of the 'data' property 
                   contains the measured data as a list.
    '''
                 
    commandstring = f'D{adc_type:2}_{start_pos}\n'
    serial_device.write(commandstring.encode('utf8'))
    
    json_data = get_and_print_responses(ser, print_response = False)
    data_slice = json.loads(json_data)
    
    return data_slice   


adc_np_type_map = {1:np.int16,2:np.uint16,3:np.uint16}

def getData(serial_device, stimulus_parameters, adc_type):
    '''
    This function can be used to get all the measured data for a single subtype.
    
    Parameters:
    
        serial_device: an instance of Serial providing the connection with 
                       the teensy board.
        
        stimulus_parameters: contains metadata on the stimulus.
        
        adc_type: 1. V1 
                  2. V2
                  3. DAC stimulus
        
    Output:
    
        data: a numpy array of the correct type (see adc_np_type_map) containing
        the full measurement sequence for a single type. 
    '''
            
    start_pos = 0
    dtype = adc_np_type_map[adc_type]
    length = stimulus_parameters['length']

    # Allocate memory to store the result
    data = np.zeros((length,), dtype=dtype)
    
    while True:
        data_slice = getDataSlice(serial_device,adc_type, start_pos) 
        
        if data_slice['end']<=data_slice['start']: # This signals that we try to read beyond the end of the avaialble data
            break
            
        data[data_slice['start']:data_slice['end']]=np.array(data_slice['data'],dtype=dtype)
        
        start_pos = data_slice['end']    
        
    return data        

def get_stimulus_parameters(serial_device):
    '''
    This function can be used to get all the stimulus parameters.
    
    Parameters:
    
        serial_device: an instance of Serial providing the connection with 
                       the teensy board.
    
    Output:
    
        stimulus_parameters: dictionary with metadata on the stimulus,
                             available keys:
                                 'stimulus_parameters_valid': 0 (False) or 1 (True) indicates whether data 
                                                             in buffers are from measurement for these 
                                                             stimulus settings
                                 'length': length of the stimulus and the measured data
                                 'digital_amplitude': Amplitude of DAC input
                                 'f_stimulus': stimulus frequency DAC
                                 'f_sampling': sampling frequency DAC and ADC
                                 'stimulus_duration': stimulus duraction
                                 'ADC_averaging_number': averaging over samples in ADC
    '''
    
    jsonStimulusParameters = None
    stimulus_parameters = None
    try:
        # Get the metadata
        serial_device.write("D00\n".encode('utf8'))
        jsonStimulusParameters = get_and_print_responses(ser)
        stimulus_parameters = json.loads(jsonStimulusParameters)
    except Exception as e:
        print("jsonStimulusParameters: ", jsonStimulusParameters)
        print("stimulus_parameters: ",stimulus_parameters)
        raise(e)
    
    
    if not stimulus_parameters['stimulus_parameters_valid']:
        raise RuntimeError(f"Stimulus parameters where changed after the last measurement: {stimulus_parameters}")
        
    
    return stimulus_parameters    
 

In [None]:


def estimate_impedance(Rs, f_stim, f_samp, V1, V2, DAC):
    ''' Estimate the frequency response at the stimulus frequency.
        
        Parameters:
            Rs: shunt resistance
            f_stim: stimulus frequency
            f_samp: sampling frequency
            V1: measured V1 data
            V2: measured V2 data
            DAC: signal provided to DAC
            
        Output:
            Z: impedance at frequency f_stim
            V1_1, V1_2, V1_0: fit parameters for equation 6 from referenced paper
            V2_1, V2_2, V2_0: fit parameters for equation 7 from referenced paper
        
        'Electrochemical Impedance Spectroscopy System Based on a Teensy Board',
        Leila Es Sebar, Leonardo Iannucci, Emma Angelini, Sabrina Grassini, and Marco Parvis,
        IEEE TRANSACTIONS ON INSTRUMENTATION AND MEASUREMENT, VOL. 70, 2021

        Here we implement equations 6, 7 and 8 of this paper.

        Still needs a resistor and stimulus amplitude correction.

    '''
    
    
    if not len(V1) == len(V2) or not len(V1) == len(DAC):
        
        raise ValueError(f'Incompatible input lengths: len V1: {len(V1)}, len V2: {len(V2)}, len Dac: {len(DAC)}')
        
    tstamps = np.arange(0, len(DAC), 1)/f_samp
    
    sine = np.sin(2*np.pi*f_stim*tstamps)
    cosine = np.cos(2*np.pi*f_stim*tstamps)
    
    A = np.vstack([cosine, sine, np.ones(len(DAC))]).T
    V1_1, V1_2, V1_0 = np.linalg.lstsq(A, V1, rcond=None)[0]  # Fit according equation 6
    V2_1, V2_2, V2_0 = np.linalg.lstsq(A, V2, rcond=None)[0]  # Fit according equation 7
    
    Z = (V2_1 + 1j* V2_2)/(V1_1 + 1j* V1_2)*Rs/2 # Apply equation 8
    
    return Z, V1_1, V1_2, V1_0, V2_1, V2_2, V2_0 


In [None]:
def setStimulusParameters(serial_device, 
                    f_stimulus, 
                    DC_offset = 2048,
                    A_stimulus = 0.6,
                    f_sampling = 10000,    
                    print_response = False):
    
     # "A" Change the output amplitude used for the measurement
    serial_device.write(f"A{A_stimulus}\n".encode('utf8'))
    response = get_and_print_responses(ser)
    
    if print_response:
        print(response)    
    
    # "G<samplefreq>\n" Acquire data at <samplefreq>
    serial_device.write(f"G{f_sampling}\n".encode('utf8'))
    response = get_and_print_responses(ser)
    
    if print_response:
        print(response) 
    
    # "Y<DC_Offset>\n" set average value stimulus (has to be between positive)
    serial_device.write(f"Y{DC_offset}\n".encode('utf8'))
    response = get_and_print_responses(ser)
    
    if print_response:
        print(response) 
        
    # Set the stimulus frequency
    measurement_string = f"F{f_stimulus}\n"
    serial_device.write(measurement_string.encode('utf8'))
    response = get_and_print_responses(ser)
    
    if print_response:
        print(response)
    
    
    
def measureSpectrum(serial_device, 
                    f_min, f_max, f_step, 
                    DC_offset = 2048,
                    A_stimulus = 0.6,
                    f_sampling = 10000,
                    Rs= 1000,
                    process_measurement= lambda Rs, f_stim, f_samp, V1, V2, DAC: f_stim ):
    '''
    
    '''
    
    f_range = np.arange(f_min, f_max, f_step)
    spectrum = [None]*len(f_range)
    
    for f_index, f in enumerate(f_range):
        print(f"\n--------------------------------------------")
        
        setStimulusParameters(serial_device, 
                    f, 
                    DC_offset = DC_offset,
                    A_stimulus = A_stimulus,
                    f_sampling = f_sampling,
                    print_response = True)
        
        print(f"Measure at frequency {f}")
        
        # Execute Measurement
        ser.write("M\n".encode('utf8'))
        sleep(0.1)
        response = get_and_print_responses(ser)
    
        while True:
            sleep(0.1)
            # Get the metadata
            serial_device.write("D00\n".encode('utf8'))
            jsonStimulusParameters = get_and_print_responses(ser)
            try:
                stimulus_parameters = json.loads(jsonStimulusParameters)
            except Exception as e:
                print('JSON STRING:', jsonStimulusParameters)
                raise e
                
            print('.',end='')
            # Check if measurement is finished
            if stimulus_parameters['stimulus_parameters_valid']: 
                break
        
        V1 = getData(serial_device, stimulus_parameters, 1)
        V2 = getData(serial_device, stimulus_parameters, 2)
        DAC = getData(serial_device, stimulus_parameters, 3)  
        
        spectrum[f_index] = process_measurement(Rs, f, f_sampling, V1, V2, DAC)
        # sleep(0.1)
        print(stimulus_parameters)
    
    print(f"\n--------------------------------------------")
    return spectrum
        
spectrum = measureSpectrum(ser, 50, 2000, 50, process_measurement=estimate_impedance)

print(spectrum)

In [None]:
np.savez('spectrum_1kO_1kO_xxxxx-10nov_newcircuit.npz',spectrum)


In [None]:
import numpy as np
spectrum_storage=np.load('spectrum_1kO_1kO_xxxxx-8nov.npz')
spectrum = spectrum_storage['arr_0']

In [None]:

stimulus_parameters = get_stimulus_parameters(ser)  
print('------------stimulus_parameters----------------')
print(stimulus_parameters) 
print('-----------------------------------------------')
print(getDataSlice(ser,1, 0))
print('-----------------------------------------------')
print(getDataSlice(ser,2, 100))
print('-----------------------------------------------')
print(getDataSlice(ser,3, 2001))  
sleep(0.1)
print('-----------------------------------------------')
print(get_and_print_responses(ser))
V1 = getData(ser, stimulus_parameters, 1)
V2 = getData(ser, stimulus_parameters, 2)
DAC = getData(ser, stimulus_parameters, 3)

In [None]:
%pylab

plt.figure()
#plt.plot(np.array(V1),np.array(V2),'g.')
plt.plot(np.array(V1),'go-')
plt.plot(np.array(V2),'ro-')
#plt.plot(8*np.array(DAC), 'k.')

In [None]:
%pylab inline

reals = [np.real(pars[0]) for pars in spectrum]
imaginaries = [np.imag(pars[0]) for pars in spectrum]
frequencies = np.arange(50, 2000, 50)

Zomega = 1000 + 1000/(1+1j*2*pi*frequencies*1000*160*10e-9)
Zreals = [np.real(Z) for Z in Zomega]
Zimaginaries = [np.imag(Z) for Z in Zomega]
plt.figure()
plt.subplot(3,1,1)
plt.plot(reals, imaginaries,'.')

plt.subplot(3,1,2)
plt.plot(Zreals, Zimaginaries,'.')

plt.subplot(3,1,3)
plt.plot(frequencies, imaginaries,'.')

plt.figure()
plt.subplot(2,1,1)
plt.plot(reals, Zreals, '.')

plt.subplot(2,1,2)
plt.plot( imaginaries, Zimaginaries,'.')


In [None]:
plt.figure()
plt.plot(frequencies, reals, 'b.')
plt.plot(frequencies, imaginaries,'r.')

In [None]:
    
f_stimulus = stimulus_parameters['f_stimulus']
f_sampling = stimulus_parameters['f_sampling']
print(f'f_stimulus {f_stimulus}, f_sampling: {f_sampling}')
shift = 0
print(f'v1: {len(V1)}, v2: {len(V2)}, v0: {len(DAC)}')

Z, V1_1, V1_2, V1_0, V2_1, V2_2, V2_0 = process_measurement(f_stimulus, f_sampling, V1[shift:], V2[shift:], DAC[shift:])
print(f'v1: {V1_1}, v2: {V1_2}, v0: {V1_0}')

tstamps = np.arange(0, len(DAC), 1)/f_sampling
sine = np.sin(2*np.pi*f_stimulus*tstamps)
cosine = np.cos(2*np.pi*f_stimulus*tstamps)

plt.figure()
plt.plot(V1_1*cosine+V1_2*sine+V1_0,'r')
plt.plot(V1, 'ko')

In [None]:
# "A" Change the output amplitude used for the measuremnt
ser.write("A0.2\n".encode('utf8'))
response = get_and_print_responses(ser)
print(response)

In [None]:
# "B" return hardware parameters out: Boardname, bus frequency and CPU frequency
ser.write("B\n".encode('utf8'))
response = get_and_print_responses(ser)
print(response)

In [None]:
print(get_and_print_responses(ser))

In [None]:
# "F<freq>\n" Set the DAC output frequency used during a measurement
ser.write("F25\n".encode('utf8'))
response = get_and_print_responses(ser)
print(response)

In [None]:
# "R\n" Get the ADC resolutions
ser.write("R\n".encode('utf8'))
response = get_and_print_responses(ser)
print(response)

In [None]:
# "G<samplefreq>\n" Acquire data at <samplefreq>
ser.write("G1000\n".encode('utf8'))
ser.write("M\n".encode('utf8'))
response = get_and_print_responses(ser)
print(response)

In [None]:
jsonstr = '{"stimulus":{"length":2000,"f_stimulus":50000.00,"f_sampling":0.00,"ADC_averaging_number":1}}'
d=loads(jsonstring[:-4])
print(jsonstring, d)

In [None]:

response = get_and_print_responses(ser)
print(response)

In [None]:
# "Q<level>\n"  
for Vout in range(5, 4096, 409):
    print(f"Q{Vout}\n".encode('utf8'))
    ser.write(f"Q{Vout}\n".encode('utf8'))
    sleep(1)
    print(get_and_print_responses(ser))
    
ser.write(f"Q0\n".encode('utf8'))   # Switch it back-off
print(get_and_print_responses(ser))

In [None]:
ser.write(f"Q0\n".encode('utf8'))   # Switch it back-off    
get_and_print_responses(ser)

In [None]:
hex(4095)

In [None]:
# "Z\n" Perform OCP measurement
ser.write("Z\n".encode('utf8'))
get_and_print_responses(ser)

In [None]:
# "R\n" 
ser.write("R2600\n".encode('utf8'))
get_and_print_responses(ser)

In [None]:

get_and_print_responses(ser)

In [None]:
Vout =4095
print(f"T{Vout:04}\n".encode('utf8'))
ser.write(f"T{Vout:04}\n".encode('utf8'))
get_and_print_responses(ser)
