This code optimizes DynaCAN eddy-current compensation waveforms for the Berkeley system using a combination of computational and experimental models for the eddy currents.

Notes for fixes needed in pyfunk (May 2014):

* TODO: deal with infinite waveforms in convolution

* NOTE: THE FIRST WAVEFORM ON May 15, 2014 WAS CREATED WHILE THE VOLTAGE CALCULATION ERRANEOUSLY RETURNED 0. THIS MAY HAVE AFFECTED THE RESULT

In [1]:
%matplotlib qt5
import matplotlib.pyplot as plt
import np
import sys

In [2]:
from __future__ import unicode_literals, division
from coils.eddycurrents0 import diagonalize, transient_amplitudes, SplitBoxRoom, UNIT
from coils.coils import ScalarCoilPlate, DipoleSet
from plottools import mpl_run_dynamic
import sequencer.pyfunk as pf

#from scipy.optimize import minimize
from scipy.optimize import fmin_bfgs
plt.ion()

In [3]:
#%qtconsole

In [4]:
class FunctionBasis(object):
    def __init__(self, basis_waveforms):
        self._basis = list(basis_waveforms)
    
    def get_waveform(self, coefficients):
        basis = self
        class LinearCombination(pf.Waveform):
            def __init__(self, coefficients):
                self._coefficients = coefficients
                self._basis = basis._basis
                self.bound_tags = \
                    tuple((max((b.bound_tags[0] for b in self._basis), key = lambda x: x.value),
                           min((b.bound_tags[1] for b in self._basis), key = lambda x: x.value)))
                
            def sample(self, positions):
                return np.sum([bf.sample(positions) * c 
                                   for bf, c in zip(self._basis, self._coefficients)], axis = 0)
        return LinearCombination(coefficients)

def inv_step_response(system, coil, field_p, field_dir, length, mode_selection = None):
    """Get inverse step response of an eddy-current system to pulsed coil."""
    
    def mode_responses():
        from functools import partial
        fun = lambda t, tau: np.exp(-t/tau)
        for lambda_, tau in zip(system.lambdas, system.tau):
            yield pf.Function(partial(fun, tau=tau), (0.0, length))
        
    basis = FunctionBasis(mode_responses())
    
    coil_c = coil.mutual_inductances(system).flatten()
    field_c = system.generated_fields(field_p).T.dot(field_dir.flatten())
    
    mode_input = system.V.T.dot(coil_c) / system.lambdas
    mode_output = system.V.T.dot(field_c)
    
    return basis.get_waveform(mode_input * mode_output)

def calc_response(pulse, resp_function):
    """Get system response to pulse, based on given inverse step response."""
    return -pulse.differentiate().convolve(resp_function)

def make_box(ramp_time, ramp_type = pf.LinearRamp):
    """Create a polarizing pulse with a given rampdown time and ramp type."""
    return pf.RampedBox(1.0, (-0.2, -ramp_time), (0.0, ramp_time), (pf.LinearRamp, ramp_type))


Functions for dealing with experimentally measured system responses.

In [5]:
class PeriodicWaveform(pf.Waveform):
    """A periodic waveform that repeats a given bounded waveform."""
    
    def __init__(self, period_waveform, repeat_range = (None, None), fix_levels = False):
        """Repeat periodic_waveform over repeat_range.
        
        Arguments:
            period_waveform: bounded waveform to repeat
            repeat_range: integer range (first and one before last index)
                None means extend to + or - infinity. (None, None) means
                repeat indefinitely in both directions. (0,1) simply gives
                an alias to period_waveform, and (-1,0,2) prepends one
                period_waveform before and appends one after period_waveform.
            fix_levels: Shifts period_waveform with a linear function to
                remove discontinuity between periods
        
        TODO merge this functionality as a boundary condition type
            (periodic boundary condition) This would mean finite repeat
            ranges should be emulated with ClipWaveform.
        
        TODO allow tags in repeat_range using some kind of RoundingTag to
        round to integer
        """
        from sequencer.tags import SumTag, MINUS_INF, PLUS_INF
        self._waveform = period_waveform
        self._repeat_range = tuple(None if r is None else int(round(r)) 
                                       for r in repeat_range)
        self._fix_levels = fix_levels
        
        def bound_gen():
            for n, t, r, inf in zip(("left_bound", "right_bound"),
                                    self._waveform.bound_tags, 
                                    self._repeat_range,
                                    (MINUS_INF, PLUS_INF)):
                if r is None:
                    yield inf
                else:
                    yield SumTag(n, t, r * self._waveform.duration)

        self.bound_tags = tuple(bound_gen())
        
    def sample(self, positions):
        inside = np.logical_and(positions >= self.bound_tags[0].value,
                                positions <= self.bound_tags[1].value)
        pos = positions[inside] - float(self._waveform.bound_tags[0])
        dur = self._waveform.duration
        ind = np.floor(pos / dur)
        pos = (pos % dur) + float(self._waveform.bound_tags[0])
        ret = np.empty(positions.shape)
        
        # delegate out-of-range samples to self._waveform
        # there should be a more elegant way when boundary types are defined
        lval_out = self._waveform.value_at(self._waveform.bound_tags[0].value - 1000.0)
        rval_out = self._waveform.value_at(self._waveform.bound_tags[1].value + 1000.0)
        
        # values at bounds
        lval = self._waveform.value_at(self._waveform.bound_tags[0].value)
        rval = self._waveform.value_at(self._waveform.bound_tags[1].value)
        
        ret[inside] = self._waveform.sample(pos)
        if self._fix_levels:
            ltime = self._waveform.bound_tags[0].value
            ret[inside] -= (rval - lval) * ((pos - ltime)/dur - 0.5)
            # adjust also outside value, although this may or may not make sense
            lval_out += (rval - lval) / 2
            rval_out -= (rval - lval) / 2
        
        ret[positions < self.bound_tags[0].value] = lval_out
        ret[positions > self.bound_tags[1].value] = rval_out
            
        return ret
    
    #TODO implement min, max, mean, etc.
    
def remove_periodic_noise(dirty_data, zero_signal_part, frequency):
    """Subtract periodic noise from a waveform.
    
    Arguments:
        dirty_data: The waveform that contains the noise.
        zero_signal_part: A part of the signal that contains the noise,
            but no signal (at least on average). The longer the better,
            while just one period of the noise is enough in principle.
            This waveform must be referenced to the same time variable
            so that the noise is aligned (n x period offset ok).
        frequency: Base frequency (1/period) of the periodic noise.
            Typically this is 50 or 60.
    
    Return:
        dirty_data - <estimate of periodic noise>
    """
    period = 1./frequency
    n_periods = int(np.floor(zero_signal_part.duration / period))
    if n_periods < 1:
        raise ValueError("The given zero_signal waveform should contain" \
                         "at least one period of noise.")
    
    zsp_left = zero_signal_part.bound_tags[0].value
    noise_wfs = (zero_signal_part.clip(zsp_left + i * period, 
                                       zsp_left + (i + 1)*period)
                     for i in xrange(n_periods))
    clips = [PeriodicWaveform(c, fix_levels = True) for c in noise_wfs]
    
    #TODO rewrite this using built-in pyfunk stuff (as soon as they exist)
    class SubAvgClips(pf.Waveform):
        def __init__(self, waveform, clips):
            self._waveform = waveform
            self._clips = clips
            self.bound_tags = tuple(self._waveform.bound_tags)
            
        def sample(self, positions):
            left_out = positions < self.bound_tags[0].value
            right_out = positions > self.bound_tags[1].value
            outside = np.logical_or(left_out, right_out)
            inside = np.logical_not(outside)
            ret = np.empty(positions.shape)
            ret[outside] = self._waveform.sample(positions[outside])
            ret[inside] = (self._waveform.sample(positions[inside]) - 
                           1.0/len(self._clips) *
                           np.sum([c.sample(positions[inside]) 
                                       for c in self._clips],axis = 0))
            
            return ret
    
    return SubAvgClips(dirty_data, clips)

def shift_to_zero(waveform, zero_range):
    return waveform - waveform.clip(*zero_range).mean()

def load_as_waveform(d, end_point = 200e-3):
    data = np.loadtxt(d["file"])
    wf = pf.Samples(data[:,1]*1e-9, data[1,0] - data[0,0])
    cleaned = remove_periodic_noise(wf, wf.clip(150e-3, None), 60)
    shifted = shift_to_zero(cleaned, (end_point - 2*1.0/60, end_point))
    return (shifted *( 1.0 / d["current"])).clip(d["t_lock"], end_point).clip(-20e-3) 
        #.delay(d["ramp_time"]/2)

Define the shielded room and coils (mostly for computational modeling).

In [17]:
newcube = SplitBoxRoom(detail_scale = UNIT.FOOT,
                       thickness = 10e-3)
diagonalize(newcube)

# polarizing coil
p_n = 240.0
p_r = 0.19
p_I = 200.0

p_center = np.array([0,0,-3.0 * 0.0254])
p_dipole = np.array([0,0,1.]) * p_n * np.pi * p_r**2
p_coil = DipoleSet(p_center, p_dipole)

# DynaCAN coil
c_side = 1.85
c_turns = 30
c_res = 100
c_R = 30./40*1.2
c_L = (30./40)**2*10e-3
c_center = -4*UNIT.FOOT + 1.125
c_coil = ScalarCoilPlate(c_center,
                         (np.array([c_side,0,0]), np.array([0,c_side,0])),
                         np.ones((1,c_res,c_res)) * c_turns,
                         label = "dynacan_coil")

Load experimental data traces and process them (if used).

In [18]:
use_experimental_isrs = False

# info about source data text files, each of the form:
#     first column = time; second column = amplitude

data =  [{"file": "huidata/Compcoil_1msRamp_SQUIDLock4msdelay.txt", 
          "current": 0.79,
          "coil": "canc",
          "ramp_time": 1e-3,
          "t_lock": 4e-3,
          "t_reliable": 0e-3},
         {"file": "huidata/Compcoil_10msRamp_SQUIDLock4msdelay.txt", 
          "current": 0.79,
          "coil": "canc",
          "ramp_time": 10e-3,
          "t_lock": 4e-3,
          "t_reliable": 0e-3}] + \
        [{"file": "huidata/Group2_BothCoils_LockAt10ms/Compcoil_{}msRamp_SQUIDLock10msdelay.txt"\
                      .format(ramp_time),
          "current": 2.05,
          "coil": "canc",
          "ramp_time": 1e-3 * ramp_time,
          "t_lock": 10e-3,
          "t_reliable": 0e-3} for ramp_time in (1, 2, 10)] + \
        [{"file": "huidata/Group2_BothCoils_LockAt10ms/Bpcoil_{}msRamp_SQUIDLock10msdelay.txt"\
                      .format(ramp_time),
          "current": 2.05,
          "coil": "pol",
          "ramp_time": 1e-3 * ramp_time,
          "t_lock": 10e-3,
          "t_reliable": 0e-3} for ramp_time in (1, 2, 10)] + \
        [{"file": "huidata/Group1_BpCoilOnly_LockAt3ms/Bpcoil_{}msRamp_SQUIDLock3msdelay.txt"\
                      .format(ramp_time),
          "current": 2.05,
          "coil": "pol",
          "ramp_time": 1e-3 * ramp_time,
          "t_lock": 3e-3,
          "t_reliable": 0e-3} for ramp_time in (1, 2, 10)]

if use_experimental_isrs:        
    for d in data:
        d["wf"] = load_as_waveform(d)

    p_resp_i = 8#9
    c_resp_i = 2#3

    p_resp_expr = data[p_resp_i]["wf"] \
        .delay(data[p_resp_i]["ramp_time"] / 2).clip().to_samples()
    c_resp_expr = data[c_resp_i]["wf"] \
        .delay(data[c_resp_i]["ramp_time"] / 2).clip().to_samples()


Define parameters for optimizing _DynaCAN_ compensation. Then define the objective function and start the optimization.

In [19]:
field_p = np.array([0,0,0])
field_dir = np.array([0,0,1.])

p_resp_calc = inv_step_response(newcube, p_coil, 
                                field_p, field_dir, 100e-3)
c_resp_calc = inv_step_response(newcube, c_coil, 
                                field_p, field_dir, 100e-3)

.....................

In [23]:
p_ramp_time = 11.6e-3
p_pulse = p_I * make_box(11.6e-3, pf.QuarterCosineRamp)

c_duration = 40e-3

pis = [1,2,3,4,5]
pows = [1,2,1,2,1]
tautaus = np.arange(2,25,3)*1e-3
#tautaus = np.array([1.2, 5.5, 9, 20])*1e-3
#tautaus = np.array([1,2,4,8,12,18, 23])*1e-3
n_basis = tautaus.size

#basis_wfs = [pf.Function(lambda x, tau = tau: np.exp(-x**4/tau**4)*(-x), (-c_duration, 0))
#                 for tau in tautaus]
basis_wfs = [pf.Function(lambda x, n = n: np.sin(-x/c_duration * n * np.pi), (-c_duration, 0))
                 for n in range(1,40)]

basis_wfs = [b / b.max() for b in basis_wfs]
basis = FunctionBasis(basis_wfs)

p_trans_calc = calc_response(p_pulse, p_resp_calc)
if use_experimental_isrs:
    p_trans_expr = calc_response(p_pulse, p_resp_expr)

penalty_instants_calc = np.arange(3.,13) * 1e-3
p_trans_at_instants_calc = p_trans_calc.sample(penalty_instants_calc)
if use_experimental_isrs:
    penalty_instants_expr = np.arange(13.,24) * 1e-3
    p_trans_at_instants_expr = p_trans_expr.sample(penalty_instants_expr)

max_V = 200.0 # maximum cancellation coil voltage

calc_weight = 5e5**2
expr_weight = 1e6**2
V_weight = 1e-1
E_weight = 0.0 # 1e-4

def penalty(x):
    waveform = basis.get_waveform(x).to_samples()
    
    c_trans_calc = calc_response(waveform, c_resp_calc)
    c_trans_at_instants_calc = c_trans_calc.sample(penalty_instants_calc)
    tot_calc = c_trans_at_instants_calc + p_trans_at_instants_calc
    value = 0.0
    value += np.sqrt(np.mean(np.abs(tot_calc)**4)) * calc_weight
    
    if use_experimental_isrs:
        c_trans_expr = calc_response(waveform, c_resp_expr)
        c_trans_at_instants_expr = c_trans_expr.sample(penalty_instants_expr)
        tot_expr = c_trans_at_instants_expr + p_trans_at_instants_expr
 
        value += np.sqrt(np.mean(np.abs(tot_expr)**4)) * expr_weight
    
    max_voltage = (c_R * waveform + c_L * waveform.differentiate()).abs().max()
    overvoltage = max_voltage - max_V
    
    energy = c_R * (waveform ** 2).def_integral()
    if overvoltage < 0:
        overvoltage = 0
    
    value += overvoltage**2 * V_weight
    value += energy * E_weight
    
#    value += P_weight * (waveform**2).max()
    return value

dynfig = plt.figure("Optimization")
dynax = dynfig.add_subplot(2,1,1)
t_dyn = np.linspace(-c_duration, 0, 200)
dynline, = dynax.plot(t_dyn, t_dyn * 0)
transax = dynfig.add_subplot(2,1,2)
p_trans_calc.clip(0.0).plot() # Leave non-canceled version for reference
if use_experimental_isrs:
    p_trans_expr.clip(12e-3).plot()
transline_c, = p_trans_calc.clip(0.0).plot()
if use_experimental_isrs:
    transline_e, = p_trans_expr.clip(12e-3).plot()
dynfig.canvas.manager.window.raise_()

def opt_waveform(dyn_update = lambda : None):
    def callback(x):
        callback.count += 1
        if callback.count % 1 == 0:
            #print callback.count
            waveform = basis.get_waveform(x)
            
            dynline.set_ydata(waveform.sample(t_dyn))
            
            plotdata = [(transline_c, p_trans_calc, c_resp_calc)]
            if use_experimental_isrs:
                plotdata.append((transline_e, p_trans_expr, c_resp_expr))

            for transline, p_trans, c_resp in plotdata:
                c_trans = calc_response(waveform, c_resp)
                t_trans = transline.get_xdata()
                trans_y = p_trans.sample(t_trans) + c_trans.sample(t_trans)
                transline.set_ydata(trans_y)
            
            dynax.set_title("Iteration #: {}".format(callback.count))
        dyn_update()
        
    callback.count = 0
    x0 = np.zeros(n_basis)
    from scipy.optimize import fmin_bfgs
    result = fmin_bfgs(penalty, x0, fprime = None, args = (),
                       gtol = 1e-5, maxiter = 300, disp = 1, 
                       callback = callback)
    x = result
    return basis.get_waveform(x), x

from plottools import mpl_run_dynamic

wf, x = mpl_run_dynamic(opt_waveform)



Optimization terminated successfully.
         Current function value: 0.000023
         Iterations: 16
         Function evaluations: 230
         Gradient evaluations: 23


In [21]:
from datetime import datetime
t_dyn = np.linspace(-c_duration, 0.000, int(c_duration/.1e-3))
np.savetxt(datetime.now().isoformat()[:10] + "canc40ms.txt", wf.sample(t_dyn))

In [None]:
def load_waveform(filename, length = 40e-3):
    data = np.loadtxt(filename)
    return pf.Samples(data, length/data.size, -length)

def analyze_waveform(wf):
    c_trans_calc = calc_response(wf, c_resp_calc)
    if use_experimental_isrs:
        c_trans_expr = calc_response(wf, c_resp_expr)

    dynfig = plt.figure("Analyze waveform")
    dynax = dynfig.add_subplot(2,1,1)
    wf.plot()
    transax = dynfig.add_subplot(2,1,2)
    p_trans_calc.clip(0.0).plot() # Leave non-canceled version for reference
    p_trans_expr.clip(12e-3).plot()
    transline_c, = (c_trans_calc + p_trans_calc).clip(0.0).plot()
    if use_experimental_isrs:
        transline_e, = (c_trans_expr + p_trans_expr).clip(12e-3).plot()    
    
#analyze_waveform(load_waveform("2014-05-14canc40ms_D.txt"))
#analyze_waveform(load_waveform("2014-05-14canc40ms_C.txt"))