In [1]:
%load_ext autoreload
%autoreload 2

In [4]:
from blond import beam
import inspect
inspect.getmembers(beam, inspect.isclass)


[('Beam', blond.beam.beam.Beam),
 ('Particle', blond.beam.beam.Particle),
 ('Positron', blond.beam.beam.Positron),
 ('Proton', blond.beam.beam.Proton)]

In [1]:
from __future__ import division, print_function

import os
from builtins import range

import matplotlib as mpl
import numpy as np

from blond.beam.beam import Beam, Proton
from blond.beam.distributions import bigaussian, parabolic
from blond.beam.profile import CutOptions, FitOptions, Profile
from blond.input_parameters.rf_parameters import RFStation
#  BLonD Imports
from blond.input_parameters.ring import Ring
from blond.monitors.monitors import BunchMonitor
from blond.plots.plot import Plot
from blond.trackers.tracker import RingAndRFTracker

from blond.utils import timing

---------- Using the C++ computational backend ----------


In [2]:

# Simulation parameters -------------------------------------------------------
# Bunch parameters
N_b = 1e9           # Intensity
N_p = 50000         # Macro-particles
tau_0 = 0.4e-9          # Initial bunch length, 4 sigma [s]

# Machine and RF parameters
C = 26658.883        # Machine circumference [m]
p_i = 450e9         # Synchronous momentum [eV/c]
p_f = 460.005e9      # Synchronous momentum, final
h = 35640            # Harmonic number
V = 6e6                # RF voltage [V]
dphi = 0             # Phase modulation/offset
gamma_t = 55.759505  # Transition gamma
alpha = 1. / gamma_t / gamma_t        # First order mom. comp. factor

# Tracking details
N_t = 2000           # Number of turns to track
dt_plt = 200         # Time steps between plots


# Simulation setup ------------------------------------------------------------
print("Setting up the simulation...")
print("")


# Define general parameters
ring = Ring(C, alpha, np.linspace(p_i, p_f, 2001), Proton(), N_t)

# Define beam and distribution
beam = Beam(ring, N_p, N_b)


# Define RF station parameters and corresponding tracker
rf = RFStation(ring, [h], [V], [dphi])
long_tracker = RingAndRFTracker(rf, beam)


bigaussian(ring, rf, beam, tau_0 / 4, reinsertion=True, seed=1)
# parabolic(ring, rf, beam, tau_0, seed=1)


# Need slices for the Gaussian fit
profile = Profile(beam, CutOptions(n_slices=100),
                  FitOptions(fit_option='gaussian'))


Setting up the simulation...



In [3]:
import importlib
import inspect
from blond.utils.track_iteration import TrackIteration

class Assembler:
    '''
    Assembler class
    '''

    @staticmethod
    def is_trackable(elem):
        return hasattr(elem, 'track') and callable(elem.track)

    @staticmethod
    def is_dictionary(elem):
        return isinstance(elem, dict)
    
    @staticmethod
    def is_callable_tuple(elem):
        # Check if elem is a tuple in the form (func, (func_args)) 
        return isinstance(elem, tuple) and len(elem) == 2 and callable(elem[0]) and isinstance(elem[1], tuple)
    
    @staticmethod
    def split_args_kwargs(arguments):
        '''
        Split arguments into positional and keyword arguments
        Arguments can be in the form:
        - dictionary
        - tuple in the form (positional_args, keyword_args)
        - tuple in the form (positional_args)
        Warning! If only positional arguments are given, and the last argument is a dictionary,
        it will be interpreted as keyword arguments
        '''
        if Assembler.is_dictionary(arguments):
            return (), arguments
        elif isinstance(arguments, tuple) and len(arguments) == 2 and isinstance(arguments[0], tuple) and isinstance(arguments[1], dict):
            return arguments[0], arguments[1]
        elif isinstance(arguments, tuple):
            return arguments, {}

    @staticmethod
    def discover_blond_distributions():
        '''
        Discover all blond distributions
        '''
        


        modules = ['blond.beam.distributions', 'blond.beam.distributions_multibunch']

        blond_distributions = {}
        for module_name in modules:
            try:
                module = importlib.import_module(module_name)
            except (ModuleNotFoundError, ImportError) as e:
                print(f'Error importing module {module_name}: {e}')
                continue

            for name, member in inspect.getmembers(module):
                if inspect.isfunction(member) and not name.startswith('_') :
                    blond_distributions[name] = member
        
        # print('Discovered the following blond distributions:')
        # for name in blond_distributions.keys():
        #     print(f'- {name}')
        return blond_distributions
    
    @staticmethod
    def discover_blond_classes():
        '''
        Discover all blond classes
        '''
        
        import importlib
        import inspect

        modules = [
            'blond.beam.beam',
            'blond.beam.coasting_beam',
            'blond.beam.distributions_multibunch',
            'blond.beam.distributions',
            'blond.beam.profile',
            'blond.beam.sparse_slices',
            'blond.gpu.butils_wrap_cupy',
            'blond.impedances.impedance',
            'blond.impedances.impedance_sources',
            'blond.impedances.induced_voltage_analytical',
            'blond.impedances.music',
            'blond.input_parameters.rf_parameters_options',
            'blond.input_parameters.rf_parameters',
            'blond.input_parameters.ring_options',
            'blond.input_parameters.ring',
            'blond.llrf.beam_feedback',
            'blond.llrf.cavity_feedback',
            'blond.llrf.impulse_response',
            'blond.llrf.notch_filter',
            'blond.llrf.offset_frequency',
            'blond.llrf.rf_modulation',
            'blond.llrf.rf_noise',
            'blond.llrf.signal_processing',
            'blond.monitors.monitors',
            'blond.plots.plot_beams',
            'blond.plots.plot_impedance',
            'blond.plots.plot_llrf',
            'blond.plots.plot_parameters',
            'blond.plots.plot',
            'blond.plots.plot_slices',
            'blond.synchrotron_radiation.synchrotron_radiation',
            'blond.toolbox.action',
            'blond.toolbox.diffusion',
            'blond.toolbox.filters_and_fitting',
            'blond.toolbox.logger',
            'blond.toolbox.next_regular',
            'blond.toolbox.parameter_scaling',
            'blond.toolbox.tomoscope',
            'blond.trackers.tracker',
            'blond.trackers.utilities',
            'blond.utils.bmath',
            'blond.utils.butils_wrap_cpp',
            'blond.utils.butils_wrap_python',
            'blond.utils.data_check',
            'blond.utils.exceptions',
            'blond.utils.mpi_config',
            'blond.utils.timing',
            'blond.utils.track_iteration',
        ]

        blond_classes = {}
        for module_name in modules:
            try:
                module = importlib.import_module(module_name)
            except (ModuleNotFoundError, ImportError) as e:
                print(f'Error importing module {module_name}: {e}')
                continue
            
            # print(f'In module {module_name}, discovered the following blond classes:')
            for name, member in inspect.getmembers(module):
                if inspect.isclass(member) and not name.startswith('_') :
                    # print(f'- {name}')
                    blond_classes[name] = member
        
        # for name in blond_classes.keys():
        #     print(f'- {name}')
        return blond_classes


    def __init__(self, element_list=[]):
        '''
        Initialize assembler

        element_list: list of elements to be tracked
        '''

        self.objects = {}
        self.pipeline = []
        self.pipeline_tracker = None
        self.element_list = element_list
        self.active_objects = {}
        self.is_built=False

        self.blond_distributions = Assembler.discover_blond_distributions()
        self.blond_classes = Assembler.discover_blond_classes()


    def init_object_from_dict(self, elem_class, elem_args, elem_kwargs):
        return self.blond_classes[elem_class](*elem_args, **elem_kwargs)
    

    def promote_to_attributes(self, elem_dict):
        
        # delete all old references from self.__dict__
        for key in self.active_objects.keys():
            if key in self.__dict__:
                del self.__dict__[key]
        
        # add the new objects to the self.__dict__
        self.__dict__.update(elem_dict)
        self.active_objects = elem_dict
    
    def replace_object_references(self, args, kwargs):
        # Check for values that are pointing to previously initialized objects
        # and replace them with references to the initialized objects
        
        # First check kwargs
        for k, v in kwargs.items():
            if isinstance(v, str) and v in self.objects:
                kwargs[k] = self.objects[v]
        # Then check args. Convert to list to support assignment
        args = list(args)
        for i, arg in enumerate(args):
            if isinstance(arg, str) and arg in self.objects:
                args[i] = self.objects[arg]
        return tuple(args), kwargs

    
    def build_pipeline(self):
        # elements can be either dictionaries
        # Or objects of certain classes
        # or custom objects with track method

        for elem in self.element_list:
            if Assembler.is_dictionary(elem):
                assert len(elem) == 1, 'Dictionary elements must be in the form: {classname: {arg1: val1, arg2:val2, ...}}'
                elem_class, (elem_all_args) = next(iter(elem.items()))
                elem_args, elem_kwargs = Assembler.split_args_kwargs(elem_all_args)
                print(f'Found dictionary element with {len(elem_args)} positional arguments and {len(elem_kwargs)} kwargs')
                # Replace reference kwargs objects
                elem_args, elem_kwargs = self.replace_object_references(elem_args, elem_kwargs)
                elem = self.init_object_from_dict(elem_class, elem_args, elem_kwargs)
            
            elem_class = type(elem).__name__
            print(f'Found element of type: {elem_class}')
            
            if Assembler.is_trackable(elem):
                # if trackable, need to add in pipeline
                self.pipeline.append(elem)
            
            # Since the assembler also has a record of all objects, need to store object in correct attribute 
            if elem_class in self.objects:
                if not isinstance(self.objects[elem_class], list):
                    self.objects[elem_class] = list(self.objects[elem_class])
                self.objects[elem_class].append(elem)
            else:
                self.objects[elem_class] = elem
        
        self.promote_to_attributes(self.objects)
        
        self.pipeline_tracker = TrackIteration(self.pipeline)
        self.is_built = True
            
            
    def track(self, num_turns=1, with_timing=False):
        if not self.is_built:
            print('Warning: Object not built. Call build method')
            return

        if with_timing:
            for turn in range(num_turns):
                for element in self.pipeline:
                    with timing.timed_region(region_name=type(element).__name__) as tr:
                        element.track()
        else:
            for turn in range(num_turns):
                for element in self.pipeline:
                        element.track()
            
    def insert_element(self, element):
        self.element_list.append(element)
        self.is_built = False

    def build_distribution(self, distribution):
        if Assembler.is_dictionary(distribution):
            assert len(distribution) == 1, 'Dictionary elements must be in the form: {disrtibution_type: {arg1: val1, arg2:val2, ...}}'
            distr_type, (distr_all_args) = next(iter(distribution.items()))
            assert distr_type in self.blond_distributions, f'Distribution type not recognized: {distr_type}'
            
            distr_args, distr_kwargs = Assembler.split_args_kwargs(distr_all_args)
            print(f'Found dictionary element with {len(distr_args)} positional arguments and {len(distr_kwargs)} kwargs')

            distr_args, distr_kwargs = self.replace_object_references(distr_args, distr_kwargs)
            self.blond_distributions[distr_type](*distr_args, **distr_kwargs)
        else:
            print(f'Distribution not recognized: {distribution}')
            
    def to_yaml(self):
        pass
    
    @classmethod
    def from_yaml(cls, file_name):
        import yaml
        
        # convert from yaml to dict
        with open(file_name, 'r') as file:
            element_dict = yaml.safe_load(file)
        
        # then call init method
        return cls(element_dict)
    
    def report_timing(self):
        timing.report()


In [4]:
print(isinstance((), tuple))
print(isinstance((1, 2), tuple))
print(isinstance((1, (1, 2)), tuple))
print(isinstance('gds', tuple))
print(isinstance(1, tuple))
# print(isinstance(1, ({'a': 1}), tuple))

print(len((1, (2, 3, 4))))
a = (print, (1, 2, 3))
a[0](a[1])

True
True
True
False
False
2
(1, 2, 3)


In [5]:
# Use case 1: Initialize from object list
assembler = Assembler(element_list=[ring, beam, rf, long_tracker, profile])
assembler.build_pipeline()
print(assembler.pipeline)
print(assembler.objects)

In module blond.beam.beam, discovered the following blond classes:
- Beam
- Electron
- Particle
- Positron
- Proton
In module blond.beam.coasting_beam, discovered the following blond classes:
- str
In module blond.beam.distributions_multibunch, discovered the following blond classes:
- Beam
- range
In module blond.beam.distributions, discovered the following blond classes:
- CutOptions
- Profile
- range
- str
In module blond.beam.profile, discovered the following blond classes:
- CutOptions
- FilterOptions
- FitOptions
- OtherSlicesOptions
- Profile
In module blond.beam.sparse_slices, discovered the following blond classes:
- CutOptions
- Profile
- SparseSlices
- range
Error importing module blond.gpu.butils_wrap_cupy: No module named 'cupy'
In module blond.impedances.impedance, discovered the following blond classes:
- InducedVoltageFreq
- InducedVoltageResonator
- InducedVoltageTime
- InductiveImpedance
- TotalInducedVoltage
- range
In module blond.impedances.impedance_sources, disco

In [6]:
# Use case 2: Initialize from dictionary
assembler2 = Assembler(element_list=[
    {'Ring': ((C, alpha), {'synchronous_data': np.linspace(p_i, p_f, 2001), 'Particle': Proton(), 'n_turns': N_t})},
    {'Beam': ('Ring', N_p, N_b)},
    {'RFStation': {'Ring': 'Ring', 'harmonic': [h], 'voltage': [V], 'phi_rf_d': [dphi]}},
    {'RingAndRFTracker': {'RFStation': 'RFStation', 'Beam': 'Beam'}},
    {'Profile': {'Beam': 'Beam', 'CutOptions': CutOptions(n_slices=100), 'FitOptions': FitOptions(fit_option='gaussian')}}
])

assembler2.build_pipeline()
assembler2.build_distribution(distribution={
    'bigaussian': {'Ring': 'Ring', 'RFStation': 'RFStation', 'Beam': 'Beam', 'sigma_dt': tau_0 / 4,
                   'reinsertion': True, 'seed': 1}
})

print(assembler2.pipeline)
print(assembler2.objects)

In module blond.beam.beam, discovered the following blond classes:
- Beam
- Electron
- Particle
- Positron
- Proton
In module blond.beam.coasting_beam, discovered the following blond classes:
- str
In module blond.beam.distributions_multibunch, discovered the following blond classes:
- Beam
- range
In module blond.beam.distributions, discovered the following blond classes:
- CutOptions
- Profile
- range
- str
In module blond.beam.profile, discovered the following blond classes:
- CutOptions
- FilterOptions
- FitOptions
- OtherSlicesOptions
- Profile
In module blond.beam.sparse_slices, discovered the following blond classes:
- CutOptions
- Profile
- SparseSlices
- range
Error importing module blond.gpu.butils_wrap_cupy: No module named 'cupy'
In module blond.impedances.impedance, discovered the following blond classes:
- InducedVoltageFreq
- InducedVoltageResonator
- InducedVoltageTime
- InductiveImpedance
- TotalInducedVoltage
- range
In module blond.impedances.impedance_sources, disco

  return A * np.exp(-(x - x0)**2 / 2. / sx**2)
  return A * np.exp(-(x - x0)**2 / 2. / sx**2)


In [None]:
custom_order = ['Ring', 'Beam', 'RFStation', 'Profile', 'SynchrotronRadiation', 'Feedbacks', 'RingAndRFTracker']

In [None]:
print(assembler2.Beam)

In [None]:
assembler.track(100, with_timing=True)

In [None]:
print(assembler.objects['RingAndRFTracker'].counter[0])

In [None]:
assembler.report_timing()

In [None]:
print(timing.report())

In [None]:
timing.times

In [6]:
import numpy as np
import inspect

inspect.getmembers(np, inspect.isfunction)


[('__dir__', <function numpy.__dir__()>),
 ('__getattr__', <function numpy.__getattr__(attr)>),
 ('_pyinstaller_hooks_dir', <function numpy._pyinstaller_hooks_dir()>),
 ('add_newdoc',
  <function numpy.core.function_base.add_newdoc(place, obj, doc, warn_on_python=True)>),
 ('all',
  <function numpy.all(a, axis=None, out=None, keepdims=<no value>, *, where=<no value>)>),
 ('allclose',
  <function numpy.allclose(a, b, rtol=1e-05, atol=1e-08, equal_nan=False)>),
 ('alltrue', <function numpy.alltrue(*args, **kwargs)>),
 ('amax',
  <function numpy.amax(a, axis=None, out=None, keepdims=<no value>, initial=<no value>, where=<no value>)>),
 ('amin',
  <function numpy.amin(a, axis=None, out=None, keepdims=<no value>, initial=<no value>, where=<no value>)>),
 ('angle', <function numpy.angle(z, deg=False)>),
 ('any',
  <function numpy.any(a, axis=None, out=None, keepdims=<no value>, *, where=<no value>)>),
 ('append', <function numpy.append(arr, values, axis=None)>),
 ('apply_along_axis',
  <func