In [None]:
from __future__ import unicode_literals

import sys, os
BIN = os.path.expanduser("../../../")
sys.path.append(BIN)

import numpy as np
from scipy.constants import m_p, c, e, pi
import matplotlib.pyplot as plt
%matplotlib inline

import copy
import itertools

from test_tools import Machine, generate_objects, BunchTracker, track, compare_traces
from test_tools import compare_beam_projections, plot_debug_data

from PyHEADTAIL.feedback.feedback import OneboxFeedback, Kicker, PickUp
from PyHEADTAIL.feedback.processors.multiplication import ChargeWeighter
from PyHEADTAIL.feedback.processors.linear_transform import Averager
from PyHEADTAIL.feedback.processors.misc import Bypass
from PyHEADTAIL.feedback.processors.register import Register, TurnDelay, TurnFIRFilter
from PyHEADTAIL.feedback.processors.convolution import Lowpass, Gaussian, FIRFilter
from PyHEADTAIL.feedback.processors.resampling import DAC, HarmonicADC, BackToOriginalBins
from PyHEADTAIL.feedback.processors.addition import NoiseGenerator
np.random.seed(0)

# 008 Detailed bunch-by-bunch feedback model

In this test/example, signal processors for a detailed feedback model are tested

## Basic parameters and elements for the simulations

In [None]:
%%capture

n_macroparticles = 1000
n_slices = 50
n_segments = 5
n_sigma_z = 3
# n_sigma_z = 6

n_turns = 3*120

machine = Machine(n_segments= n_segments)

# harmonic frequency of the bunches (f_RF/every_n_bucket_filled)
f_RF = 1./(machine.circumference/c/(float(machine.h_RF)))
f_harmonic = f_RF/10.


first_index = 10 #[buckets]
batch_spacing = 100  #[buckets]
n_batches = 2
n_bunches_per_batch = 20
bunch_spacing = 10 #[buckets]

batch_separation = batch_spacing+n_bunches_per_batch* bunch_spacing

filling_scheme = []
for j in xrange(n_batches):
    for i in xrange(n_bunches_per_batch):
        filling_scheme.append(first_index + i * bunch_spacing + j*batch_separation)


print filling_scheme

beam_ref, slicer_ref,trans_map, long_map = generate_objects(machine, n_macroparticles, n_slices,n_sigma_z,
                                                             filling_scheme=filling_scheme)

In [None]:
print 'f_RF: ' + str(f_RF)
print 'f_harmonic: ' + str(f_harmonic)
print('Number of bunches: ' + str(len(beam_ref.split())))

## Initial bunch kick
Creates an artificially  kicked beam which will be damped by using different feedback implementations. 

In [None]:
bunch_list = beam_ref.split()

n_bunches = len(bunch_list)

amplitude = 0.003
f_osc = 1e6
wavelength = 1./f_osc*c

kick_x = 0.003*(-1.0+2*np.random.rand(n_bunches))
kick_y = 0.003*(-1.0+2*np.random.rand(n_bunches))

for i in xrange(n_bunches):
    bunch_list[i].x = bunch_list[i].x + amplitude * np.sin(2. * np.pi * bunch_list[i].mean_z() / wavelength)
    bunch_list[i].y = bunch_list[i].y + amplitude * np.sin(2. * np.pi * bunch_list[i].mean_z() / wavelength)

beam_ref = sum(bunch_list)

## Feedback settings

In [None]:
feedback_gain = 0.1
# feedback_gain = (0.1,0.4)

# delay (a number of turns) before the pickup signal is used to the correction kick calculations.
delay = 1

# a number of values used to calculate the correct signal
n_values = 2

RMS_noise_level = 1e-6


# feedback settings
fc=1e6 # The cut off frequency of the power amplifier
ADC_f = 40e9 # multiplier of the sampling rate from the harmonic frequency
ADC_n_samples = 50
ADC_bits = 16
ADC_range = (-3e-3,3e-3)

DAC_bits = 14
DAC_range = (-3e-3,3e-3)

RMS_noise_level = 1e-6

## Reference data
Tracks a bunch by using an ideal bunch-by-bunch feedback presented in the first test (001_ideal_feedbacks.ipynb). The data is used as baseline data for the detailed feedback model. 

In [None]:
beam_ref_data = copy.deepcopy(beam_ref)
tracker_ref_data = BunchTracker(beam_ref_data)
slicer_ref_data = copy.deepcopy(slicer_ref)

processors_bunch_x = [
    ChargeWeighter(normalization = 'segment_average'),
    Averager()
]
processors_bunch_y = [
    ChargeWeighter(normalization = 'segment_average'),
    Averager()
]

feedback_map = OneboxFeedback(feedback_gain,slicer_ref_data,processors_bunch_x,processors_bunch_y, mpi=True)
one_turn_map = [i for i in trans_map] + [feedback_map] #  + [long_map]

track(n_turns, beam_ref_data,one_turn_map ,tracker_ref_data)

## Detailed feedback model
Signal processors required for a detailed model of the transverse feedback system are tested here. Note that the tested system does not describe accurately any existing system, and the details of the models might affect significantly to the simulation results.

### FIR filter coefficients for the bandwidth phase correction
The bandwidth of this model is limited by using a 1st order lowpass filter, which has non-linear phase response. The phase of the filter is linearized by using a FIR filter. The filter coefficients are from Ref.  https://accelconf.web.cern.ch/accelconf/e08/papers/thpc122.pdf . Note that this is a test example and the coefficients are designed nor tuned to work perfectly with this model.

In [None]:
# Coefficients from 
FIR_filter = [0.0096,  0.0192,  0.0481,  0.0673,  0.0769,  0.1154,
                0.1442,  0.1442,  0.2115,  0.2403,  0.2596,  0.3077,
                0.3558,  0.3846,  0.4519,  0.5192,  0.6346,  0.75,
                0.9519,  1.2019,  1.6346,  2.6346,  7.0192, -5.1923,
                -1.4135, -0.6827, -0.3942, -0.2308, -0.1442, -0.096,
                -0.0192, -0.0096]
FIR_filter = np.array(FIR_filter)
FIR_filter = FIR_filter/sum(FIR_filter)

### Turn-by-turn FIR filter for the betatron phase correction
The betatron phase correction is implemented manually by using a turn-by-turn FIR filter. Coefficients are from Ref. http://accelconf.web.cern.ch/AccelConf/IPAC2011/papers/mopo013.pdf, but they are not optimized to this model.

In [None]:
# the total (group) delay to the middle coefficient is four turns, i.e.
phase_shift_x = -4. * machine.Q_x * 2.* pi
turn_FIR_filter_x = [-2. * np.sin(phase_shift_x)/(pi * 3.),
                   0,
                   -2. * np.sin(phase_shift_x)/(pi * 1.),
                   np.cos(phase_shift_x),
                   2. * np.sin(phase_shift_x)/(pi * 1.),
                   0,
                   2. * np.sin(phase_shift_x)/(pi * 3.)
                   ]

phase_shift_y = -4. * machine.Q_y * 2.* pi
turn_FIR_filter_y = [-2. * np.sin(phase_shift_y)/(pi * 3.),
                   0,
                   -2. * np.sin(phase_shift_y)/(pi * 1.),
                   np.cos(phase_shift_y),
                   2. * np.sin(phase_shift_y)/(pi * 1.),
                   0,
                   2. * np.sin(phase_shift_y)/(pi * 3.)
                   ]


### The model
This model includes elements for digital signal processing (ADC, FIR filters and DAC) and power amplifier/kicker bandwidth limitation. A model for pickup could be build, but in this test a perfect 𝛥/𝛴-signal is used (read more about this choice from the previous test).

In [None]:
beam_detailed = copy.deepcopy(beam_ref)
tracker_detailed = BunchTracker(beam_detailed)
slicer_detailed = copy.deepcopy(slicer_ref)

processors_detailed_x = [
        Bypass(debug=True),
        ChargeWeighter(normalization = 'segment_average', debug=False),
        NoiseGenerator(RMS_noise_level, debug=False),
        HarmonicADC(2*f_RF/10., ADC_bits, ADC_range,
                    n_extras=10, debug=False),
        FIRFilter(FIR_filter, zero_tap = 23, debug=False),
        TurnFIRFilter(turn_FIR_filter_x, machine.Q_x, delay = 1, debug=True),
        DAC(ADC_bits, ADC_range, debug=True),
        Lowpass(fc, f_cutoff_2nd=4*fc, debug=True),
        BackToOriginalBins(debug=True),
]

processors_detailed_y = [
        Bypass(debug=True),
        ChargeWeighter(normalization = 'segment_average', debug=True),
        NoiseGenerator(RMS_noise_level, debug=True),
        HarmonicADC(2*f_RF/10., ADC_bits, ADC_range,
                    n_extras=10, debug=True),
        FIRFilter(FIR_filter, zero_tap = 23, debug=True),
        TurnFIRFilter(turn_FIR_filter_y, machine.Q_y, delay = 1, debug=True),
        DAC(ADC_bits, ADC_range, debug=True),
        Lowpass(fc, f_cutoff_2nd=4*fc, debug=True),
        BackToOriginalBins(debug=True),
]


feedback_map = OneboxFeedback(feedback_gain,slicer_detailed,
                              processors_detailed_x,processors_detailed_y, mpi = True)
one_turn_map = [feedback_map] + [i for i in trans_map] # + [long_map]

track(n_turns, beam_detailed, one_turn_map, tracker_detailed)
plot_debug_data(processors_detailed_x, source = 'output')

In [None]:
compare_traces([tracker_ref_data,tracker_detailed],
               ['Ideal bunch-by-slice\nfeedback', 'Minimal model'])
compare_beam_projections([ beam_ref_data, beam_detailed], 
               ['Ideal bunch-by-slice\nfeedback', 'Minimal model'])

Jani Komppula, CERN, 2017