Skip to content
77 changes: 52 additions & 25 deletions src/pypulseq/Sequence/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def set_block(self, block_index: int, *args: SimpleNamespace) -> None:
if prev_type == 't':
last = 0
elif prev_type == 'g':
last = prev_lib['data'][5]
last = prev_lib['data'][2] # v150

# Check whether the difference between the last gradient value and
# the first value of the new gradient is achievable with the
Expand All @@ -245,7 +245,7 @@ def set_block(self, block_index: int, *args: SimpleNamespace) -> None:
if next_type == 't':
first = 0
elif next_type == 'g':
first = next_lib['data'][4]
first = next_lib['data'][1] # v150
else:
first = 0

Expand All @@ -260,7 +260,7 @@ def set_block(self, block_index: int, *args: SimpleNamespace) -> None:
raise RuntimeError('First gradient in the the first block has to start at 0.')

if (
grad_to_check.stop[1] > self.system.max_slew * self.system.grad_raster_time
abs(grad_to_check.stop[1]) > self.system.max_slew * self.system.grad_raster_time
and abs(grad_to_check.stop[0] - duration) > 1e-7
):
raise RuntimeError("A gradient that doesn't end at zero needs to be aligned to the block boundary.")
Expand Down Expand Up @@ -331,9 +331,9 @@ def get_block(self, block_index: int) -> SimpleNamespace:
grad.channel = grad_channels[i][1]
if grad.type == 'grad':
amplitude = lib_data[0]
shape_id = lib_data[1]
time_id = lib_data[2]
delay = lib_data[3]
shape_id = lib_data[3] # change in v150
time_id = lib_data[4] # change in v150
delay = lib_data[5] # change in v150
shape_data = self.shape_library.data[shape_id]
compressed.num_samples = shape_data[0]
compressed.data = shape_data[1:]
Expand All @@ -343,22 +343,36 @@ def get_block(self, block_index: int) -> SimpleNamespace:
if time_id == 0:
grad.tt = (np.arange(1, len(g) + 1) - 0.5) * self.grad_raster_time
t_end = len(g) * self.grad_raster_time
grad.area = sum(grad.waveform) * self.grad_raster_time
elif time_id == -1:
# Gradient with oversampling by a factor of 2
grad.tt = 0.5 * (np.arange(1, len(g) + 1)) * self.grad_raster_time
if len(grad.tt) != len(grad.waveform):
raise ValueError(
f'Mismatch between time shape length ({len(grad.tt)}) and gradient shape length ({len(grad.waveform)}).'
)
if len(grad.waveform) % 2 != 1:
raise ValueError('Oversampled gradient waveforms must have odd number of samples')
t_end = (len(g) + 1) * self.grad_raster_time
grad.area = sum(grad.waveform[::2]) * self.grad_raster_time # remove oversampling
else:
t_shape_data = self.shape_library.data[time_id]
compressed.num_samples = t_shape_data[0]
compressed.data = t_shape_data[1:]
grad.tt = decompress_shape(compressed) * self.grad_raster_time

assert len(grad.waveform) == len(grad.tt)
if len(grad.tt) != len(grad.waveform):
raise ValueError(
f'Mismatch between time shape length ({len(grad.tt)}) and gradient shape length ({len(grad.waveform)}).'
)
t_end = grad.tt[-1]
grad.area = 0.5 * sum((grad.tt[1:] - grad.tt[:-1]) * (grad.waveform[1:] + grad.waveform[:-1]))

grad.shape_id = shape_id
grad.time_id = time_id
grad.delay = delay
grad.shape_dur = t_end
if len(lib_data) > 5:
grad.first = lib_data[4]
grad.last = lib_data[5]
grad.first = lib_data[1] # change in v150 - we always have first/last now
grad.last = lib_data[2] # change in v150 - we always have first/last now
else:
grad.amplitude = lib_data[0]
grad.rise_time = lib_data[1]
Expand All @@ -368,6 +382,8 @@ def get_block(self, block_index: int) -> SimpleNamespace:
grad.area = grad.amplitude * (grad.flat_time + grad.rise_time / 2 + grad.fall_time / 2)
grad.flat_area = grad.amplitude * grad.flat_time

# TODO: add optional grad ID from raw_block

setattr(block, grad_channels[i], grad)

# ADC
Expand Down Expand Up @@ -572,37 +588,45 @@ def register_grad_event(self, event: SimpleNamespace) -> Union[int, Tuple[int, L
"""
may_exist = True
any_changed = False

if event.type == 'grad':
amplitude = np.abs(event.waveform).max()
amplitude = np.max(np.abs(event.waveform))
if amplitude > 0:
fnz = event.waveform[np.nonzero(event.waveform)[0][0]]
amplitude *= np.sign(fnz) if fnz != 0 else 1 # Workaround for np.sign(0) = 0
amplitude *= np.sign(fnz) if fnz != 0 else 1

# Shape ID initialization
if hasattr(event, 'shape_IDs'):
shape_IDs = event.shape_IDs
else:
shape_IDs = [0, 0]
if amplitude != 0:
g = event.waveform / amplitude
else:
g = event.waveform

# Shape for waveform
g = event.waveform / amplitude if amplitude != 0 else event.waveform
c_shape = compress_shape(g)
s_data = np.concatenate(([c_shape.num_samples], c_shape.data))
shape_IDs[0], found = self.shape_library.find_or_insert(s_data)
may_exist = may_exist & found
may_exist = may_exist and found
any_changed = any_changed or found

# Check whether tt == np.arange(len(event.tt)) * self.grad_raster_time + 0.5
tt_regular = (np.floor(event.tt / self.grad_raster_time) == np.arange(len(event.tt))).all()
# Shape for timing
c_time = compress_shape(event.tt / self.grad_raster_time)
t_data = np.concatenate(([c_time.num_samples], c_time.data))

if not tt_regular:
c_time = compress_shape(event.tt / self.grad_raster_time)
t_data = np.concatenate(([c_time.num_samples], c_time.data))
if len(c_time.data) == 4 and np.allclose(c_time.data, [0.5, 1, 1, c_time.num_samples - 3]):
# Standard raster → leave shape_IDs[1] as 0
pass
elif len(c_time.data) == 3 and np.allclose(c_time.data, [0.5, 0.5, c_time.num_samples - 2]):
# Half-raster → set to -1 as special flag
shape_IDs[1] = -1
else:
shape_IDs[1], found = self.shape_library.find_or_insert(t_data)
may_exist = may_exist & found
may_exist = may_exist and found
any_changed = any_changed or found

data = (amplitude, *shape_IDs, event.delay, event.first, event.last)
# Updated data layout to match MATLAB v1.5.0 ordering
data = (amplitude, event.first, event.last, *shape_IDs, event.delay)

elif event.type == 'trap':
data = (
event.amplitude,
Expand All @@ -625,6 +649,9 @@ def register_grad_event(self, event: SimpleNamespace) -> Union[int, Tuple[int, L
if self.use_block_cache and any_changed:
self.block_cache.clear()

if hasattr(event, 'name'):
self.grad_id_to_name_map[grad_id] = event.name

if event.type == 'grad':
return grad_id, shape_IDs
elif event.type == 'trap':
Expand Down
4 changes: 3 additions & 1 deletion src/pypulseq/Sequence/read_seq.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ def read(self, path: str, detect_rf_use: bool = False, remove_duplicates: bool =
else: # 1.3.x and below
self.rf_library = __read_events(input_file, (1, 1, 1, 1e-6, 1, 1), event_library=self.rf_library)
elif section == '[GRADIENTS]':
if version_combined >= 1004000: # 1.4.x format
if version_combined >= 1005000: # 1.5.x format
self.grad_library = __read_events(input_file, (1, 1, 1, 1, 1, 1e-6), 'g', self.grad_library)
elif version_combined >= 1004000: # 1.4.x format
self.grad_library = __read_events(input_file, (1, 1, 1, 1e-6), 'g', self.grad_library)
else: # 1.3.x and below
self.grad_library = __read_events(input_file, (1, 1, 1e-6), 'g', self.grad_library)
Expand Down
3 changes: 2 additions & 1 deletion src/pypulseq/Sequence/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def __init__(self, system: Union[Opts, None] = None, use_block_cache: bool = Tru
self.signature_value = ''
self.rf_id_to_name_map = {}
self.adc_id_to_name_map = {}
self.grad_id_to_name_map = {}

self.block_durations = {}
self.extension_numeric_idx = []
Expand Down Expand Up @@ -1197,7 +1198,7 @@ def remove_duplicates(self, in_place: bool = False) -> Self:
for grad_id in seq_copy.grad_library.data:
if seq_copy.grad_library.type[grad_id] == 'g':
data = seq_copy.grad_library.data[grad_id]
new_data = (data[0],) + (mapping[data[1]], mapping[data[2]]) + data[3:]
new_data = data[0:3] + (mapping[data[3]], mapping[data[4]]) + (data[5],)
if data != new_data:
seq_copy.grad_library.update(grad_id, None, new_data)

Expand Down
10 changes: 5 additions & 5 deletions src/pypulseq/Sequence/write_seq.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,16 @@ def write(self, file_name: Union[str, Path], create_signature, remove_duplicates
output_file.write(
'# time_shape_id of 0 means default timing (stepping with grad_raster starting at 1/2 of grad_raster)\n'
)
output_file.write('# id amplitude amp_shape_id time_shape_id delay\n')
output_file.write('# .. Hz/m .. .. us\n')
output_file.write('# id amplitude first last amp_shape_id time_shape_id delay\n')
output_file.write('# .. Hz/m Hz/m Hz/m .. .. us\n')
output_file.write('[GRADIENTS]\n')
id_format_str = '{:.0f} {:12g} {:.0f} {:.0f} {:.0f}\n' # Refer lines 20-21
id_format_str = '{:.0f} {:12g} {:12g} {:12g} {:.0f} {:.0f} {:.0f}\n' # Refer lines 20-21
keys = np.array(list(self.grad_library.data.keys()))
for k in keys[arb_grad_mask]:
s = id_format_str.format(
k,
*self.grad_library.data[k][:3],
round(self.grad_library.data[k][3] * 1e6),
*self.grad_library.data[k][:5],
round(self.grad_library.data[k][5] * 1e6),
)
output_file.write(s)
output_file.write('\n')
Expand Down
42 changes: 32 additions & 10 deletions src/pypulseq/add_gradients.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,24 +86,29 @@ def add_gradients(
return grad

# Find out the general delay of all gradients and other statistics
delays, firsts, lasts, durs, is_trap, is_arb = [], [], [], [], [], []
delays, firsts, lasts, durs, is_trap, is_arb, is_osa = [], [], [], [], [], [], []
for ii in range(len(grads)):
if grads[ii].channel != channel:
raise ValueError('Cannot add gradients on different channels.')

delays.append(grads[ii].delay)
firsts.append(grads[ii].first)
lasts.append(grads[ii].last)
durs.append(calc_duration(grads[ii]))
is_trap.append(grads[ii].type == 'trap')
if is_trap[-1]:
is_arb.append(False)
is_osa.append(False)
firsts.append(0.0)
lasts.append(0.0)
else:
tt_rast = grads[ii].tt / system.grad_raster_time - 0.5
is_arb.append(np.all(np.abs(tt_rast - np.arange(len(tt_rast)))) < eps)
tt_rast = grads[ii].tt / system.grad_raster_time
is_arb.append(np.all(np.abs(tt_rast + 0.5 - np.arange(1, len(tt_rast) + 1))) < eps)
is_osa.append(np.all(np.abs(tt_rast - 0.5 * np.arange(1, len(tt_rast) + 1)) < eps))
firsts.append(grads[ii].first)
lasts.append(grads[ii].last)

# Check if we only have arbitrary grads on irregular time samplings, optionally mixed with trapezoids
if np.all(np.logical_or(is_trap, np.logical_not(is_arb))):
is_etrap = np.logical_and.reduce((np.logical_not(is_trap), np.logical_not(is_arb), np.logical_not(is_osa)))
if np.all(np.logical_or(is_trap, is_etrap)):
# Keep shapes still rather simple
times = []
for ii in range(len(grads)):
Expand Down Expand Up @@ -161,21 +166,35 @@ def add_gradients(
# Convert to numpy.ndarray for fancy-indexing later on
firsts, lasts = np.array(firsts), np.array(lasts)
common_delay = np.min(delays)
total_duration = np.max(durs)
durs = np.array(durs)

# Convert everything to a regularly-sampled waveform
waveforms = {}
max_length = 0

if np.any(is_osa):
target_raster = 0.5 * system.grad_raster_time
else:
target_raster = system.grad_raster_time

for ii in range(len(grads)):
g = grads[ii]
if g.type == 'grad':
if is_arb[ii]:
waveforms[ii] = g.waveform
if is_arb[ii] or is_osa[ii]:
if np.any(is_osa) and is_arb[ii]: # Porting MATLAB here, maybe a bit ugly
# Interpolate missing samples
idx = np.arange(0, len(g.waveform) - 0.5 + eps, 0.5)
wf = g.waveform
interp_waveform = 0.5 * (wf[np.floor(idx).astype(int)] + wf[np.ceil(idx).astype(int)])
waveforms[ii] = interp_waveform
else:
waveforms[ii] = g.waveform
else:
waveforms[ii] = points_to_waveform(
amplitudes=g.waveform,
times=g.tt,
grad_raster_time=system.grad_raster_time,
grad_raster_time=target_raster,
)
elif g.type == 'trap':
if g.flat_time > 0: # Triangle or trapezoid
Expand All @@ -200,7 +219,7 @@ def add_gradients(
waveforms[ii] = points_to_waveform(
amplitudes=amplitudes,
times=times,
grad_raster_time=system.grad_raster_time,
grad_raster_time=target_raster,
)
else:
raise ValueError('Unknown gradient type')
Expand Down Expand Up @@ -228,6 +247,9 @@ def add_gradients(
max_slew=max_slew,
max_grad=max_grad,
delay=common_delay,
oversampling=np.any(is_osa),
first=np.sum(firsts[delays == common_delay]),
last=np.sum(lasts[durs == total_duration]),
)
# Fix the first and the last values
# First is defined by the sum of firsts with the minimal delay (common_delay)
Expand Down
7 changes: 5 additions & 2 deletions src/pypulseq/add_ramps.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def add_ramps(
max_slew: int = 0,
rf: Union[SimpleNamespace, None] = None,
system: Union[Opts, None] = None,
oversampling: bool = False,
) -> List[np.ndarray]:
"""
Add segments to the trajectory to ramp to and from the given trajectory.
Expand All @@ -32,6 +33,8 @@ def add_ramps(
Maximum gradient amplitude.
max_slew : int, default=0
Maximum slew rate.
oversampling : bool, default=False
Boolean flag to indicate if gradient is oversampled by a factor of 2.

Returns
-------
Expand Down Expand Up @@ -64,8 +67,8 @@ def add_ramps(
num_channels = k.shape[0]
k = np.vstack((k, np.zeros((3 - num_channels, k.shape[1])))) # Pad with zeros if needed

k_up, ok1 = calc_ramp(k0=np.zeros((3, 2)), k_end=k[:, :2], system=system)
k_down, ok2 = calc_ramp(k0=k[:, -2:], k_end=np.zeros((3, 2)), system=system)
k_up, ok1 = calc_ramp(k0=np.zeros((3, 2)), k_end=k[:, :2], system=system, oversampling=oversampling)
k_down, ok2 = calc_ramp(k0=k[:, -2:], k_end=np.zeros((3, 2)), system=system, oversampling=oversampling)
if not (ok1 and ok2):
raise RuntimeError('Failed to calculate gradient ramps')

Expand Down
8 changes: 7 additions & 1 deletion src/pypulseq/calc_ramp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def calc_ramp(
max_points: int = 500,
max_slew: Union[np.ndarray, None] = None,
system: Union[Opts, None] = None,
oversampling: bool = False,
) -> Tuple[np.ndarray, bool]:
"""
Join the points `k0` and `k_end` in three-dimensional k-space in minimal time, observing the gradient and slew
Expand All @@ -34,6 +35,8 @@ def calc_ramp(
Maximum total slew rate. Either a single value or one value for each coordinate, of shape `[3, 1]`.
system : Opts, default=Opts()
System limits.
oversampling : bool, default=False
Boolean flag to indicate if gradient is oversampled by a factor of 2.

Returns
-------
Expand Down Expand Up @@ -321,7 +324,10 @@ def __joinright1(k0, k_end, use_points, G0, G_end):
if np.all(np.where(max_slew <= 0)):
max_slew = [system.max_slew]

grad_raster = system.grad_raster_time
if oversampling:
grad_raster = 0.5 * system.grad_raster_time
else:
grad_raster = system.grad_raster_time

if len(max_grad) == 1 and len(max_slew) == 1:
mode = 0
Expand Down
Loading
Loading