diff --git a/bin/dipy_tracking_gui b/bin/dipy_tracking_gui deleted file mode 100755 index b460e29142..0000000000 --- a/bin/dipy_tracking_gui +++ /dev/null @@ -1,20 +0,0 @@ -#!python -import sys -import pickle -from dipy.tracking.gui_tools import gui_track -from dipy.tracking.interfaces import ShmTrackingInterface - -w = ShmTrackingInterface() - -if len(sys.argv) == 2: - input = sys.argv[1] - if input[-2:] == '.p': - w = pickle.load(open(input,'rb')) - try: - gui_track(w) - sys.exit(0) - except AttributeError: - raise ValueError('this track was not created using the gui') - else: - execfile(sys.argv[1]) -gui_track(w) diff --git a/dipy/tracking/gui_tools.py b/dipy/tracking/gui_tools.py deleted file mode 100644 index 91f78265fa..0000000000 --- a/dipy/tracking/gui_tools.py +++ /dev/null @@ -1,78 +0,0 @@ -from warnings import warn - -# Import traits as optional package -try: - from traitsui.api import Item, Group, View, ArrayEditor -except ImportError: - from ..utils.optpkg import OptionalImportError - raise OptionalImportError("You must have traits to use this module") -from .interfaces import InputData - -from ..tracking.interfaces import InputData, ShmTrackingInterface - -I = InputData() -iview = I.trait_view() -iview.resizable = True -iview.width = 600 -I.trait_view('traits_view', iview) - -main_view = View(Group(Group( - Item('dwi_images'), - Item('all_inputs'), - Item('min_signal'), - Item('seed_roi'), - Item('seed_density', editor=ArrayEditor()), - show_border=True), - Group( - Item('smoothing_kernel_type'), - Item('smoothing_kernel'), - show_border=True), - Group( - Item('interpolator'), - Item('model_type'), - Item('sh_order'), - Item('Lambda'), - Item('sphere_coverage'), - Item('min_peak_spacing'), - Item('min_relative_peak'), - show_border=True), - Group( - Item('probabilistic'), - show_border=True), - Group( - # Item( 'integrator' ), - Item('seed_largest_peak',), - Item('track_two_directions'), - Item('start_direction', editor=ArrayEditor(), - enabled_when='not (seed_largest_peak and ' - 'track_two_directions)'), - Item('fa_threshold'), - Item('max_turn_angle'), - show_border=True), - Group( - Item('stop_on_target'), - Item('targets'), - show_border=True), - Group( - Item('save_streamlines_to'), - Item('save_counts_to'), - show_border=True), - orientation='vertical'), - buttons=['OK', 'Cancel'], width=600, close_result=False, - resizable=True, scrollable=True) - - -def gui_track(interface=None): - if interface is None: - interface = ShmTrackingInterface() - if not interface.configure_traits(view=main_view): - return - if interface.save_streamlines_to == '' and interface.save_counts_to == '': - raise IOError('must provide filename where to save results') - streamlines = interface.track_shm() - if interface.save_streamlines_to and interface.save_counts_to: - streamlines = list(streamlines) - if interface.save_streamlines_to: - interface.save_streamlines(streamlines, interface.save_streamlines_to) - if interface.save_counts_to: - interface.save_counts(streamlines, interface.save_counts_to) diff --git a/dipy/tracking/interfaces.py b/dipy/tracking/interfaces.py deleted file mode 100644 index 0724d44bfb..0000000000 --- a/dipy/tracking/interfaces.py +++ /dev/null @@ -1,316 +0,0 @@ -from nose import SkipTest - - -import pickle -import string -import os.path as path - -import numpy as np -from scipy.ndimage import convolve - -# Import traits as optional package -try: - import traits.api as T -except ImportError: - from ..utils.optpkg import OptionalImportError - raise OptionalImportError("You must have traits to use this module") - -import nibabel as nib -from nibabel.trackvis import write, empty_header - -from ..reconst.shm import (SlowAdcOpdfModel, MonoExpOpdfModel, QballOdfModel, - normalize_data, ClosestPeakSelector, - ResidualBootstrapWrapper, hat, lcr_matrix, - bootstrap_data_array, NND_ClosestPeakSelector) -from ..reconst.interpolate import (TriLinearInterpolator, - NearestNeighborInterpolator) -from ..tracking.integration import (BoundryIntegrator, FixedStepIntegrator, - generate_streamlines) -from ..tracking.utils import (seeds_from_mask, target, merge_streamlines, - density_map) -from ..io.bvectxt import (read_bvec_file, orientation_to_string, - reorient_vectors) - - -############################################################################# -# Remove this when the module becomes functional again -class ThisIsBroken(SkipTest): - pass -raise ThisIsBroken("this module is undergoing a major overhaul as therefore " - "does not currently work") -############################################################################# - -nifti_file = T.File(filter=['Nifti Files', '*.nii.gz', - 'Nifti Pair or Analyze Files', '*.img.gz', - 'All Files', '*']) - - -def read_roi(file, threshold=0, shape=None): - img = nib.load(file) - if shape is not None: - if shape != img.shape: - raise IOError('The roi image does not have the right shape, ' + - 'expecting '+str(shape)+' got '+str(img.shape)) - img_data = img.get_data() - if img_data.max() > 1: - raise ValueError('this does not seem to be a mask') - mask = img_data > threshold - return mask - - -class InputData(T.HasTraits): - dwi_images = nifti_file - fa_file = nifti_file - bvec_file = T.File(filter=['*.bvec']) - bvec_orientation = T.String('IMG', minlen=3, maxlen=3) - min_signal = T.Float(1) - - @T.on_trait_change('dwi_images') - def update_files(self): - dir, file = path.split(self.dwi_images) - base = string.split(file, path.extsep, 1)[0] - if self.fa_file == '': - self.fa_file = path.join(dir, base+'_fa.nii.gz') - if self.bvec_file == '': - self.bvec_file = path.join(dir, base+'.bvec') - - def read_data(self): - data_img = nib.load(self.dwi_images) - affine = data_img.get_affine() - voxel_size = data_img.get_header().get_zooms() - voxel_size = voxel_size[:3] - fa_img = nib.load(self.fa_file) - assert data_img.shape[:-1] == fa_img.shape - bvec, bval = read_bvec_file(self.bvec_file) - data_ornt = nib.io_orientation(affine) - if self.bvec_orientation != 'IMG': - bvec = reorient_vectors(bvec, self.bvec_orientation, data_ornt) - fa = fa_img.get_data() - data = data_img.get_data() - return data, voxel_size, affine, fa, bvec, bval - - -class GausianKernel(T.HasTraits): - sigma = T.Float(1, label='sigma (in voxels)') - shape = T.Array('int', shape=(3,), value=[1, 1, 1], - label='shape (in voxels)') - - def get_kernel(self): - raise NotImplementedError - # will get to this soon - - -class BoxKernel(T.HasTraits): - shape = T.Array('int', shape=(3,), value=[3, 3, 3], - label='shape (in voxels)') - - def get_kernel(self): - kernel = np.ones(self.shape)/self.shape.prod() - kernel.shape += (1,) - return kernel - - -def lazy_index(index): - """Produces a lazy index - - Returns a slice that can be used for indexing an array, if no slice can be - made index is returned as is. - """ - index = np.asarray(index) - assert index.ndim == 1 - if index.dtype == np.bool: - index = index.nonzero()[0] - if len(index) == 1: - return slice(index[0], index[0] + 1) - step = np.unique(np.diff(index)) - if len(step) != 1 or step[0] == 0: - return index - else: - return slice(index[0], index[-1] + 1, step[0]) - - -def closest_start(seeds, peak_finder, best_start): - starts = np.empty(seeds.shape) - best_start = np.asarray(best_start, 'float') - best_start /= np.sqrt((best_start*best_start).sum()) - for i in xrange(len(seeds)): - try: - starts[i] = peak_finder.next_step(seeds[i], best_start) - except StopIteration: - starts[i] = best_start - return starts - -all_kernels = {None: None, 'Box': BoxKernel, 'Gausian': GausianKernel} -all_interpolators = {'NearestNeighbor': NearestNeighborInterpolator, - 'TriLinear': TriLinearInterpolator} -all_shmodels = {'QballOdf': QballOdfModel, 'SlowAdcOpdf': SlowAdcOpdfModel, - 'MonoExpOpdf': MonoExpOpdfModel} -all_integrators = {'Boundry': BoundryIntegrator, - 'FixedStep': FixedStepIntegrator} - - -class ShmTrackingInterface(T.HasStrictTraits): - - dwi_images = T.DelegatesTo('all_inputs') - all_inputs = T.Instance(InputData, args=()) - min_signal = T.DelegatesTo('all_inputs') - seed_roi = nifti_file - seed_density = T.Array(dtype='int', shape=(3,), value=[1, 1, 1]) - - smoothing_kernel_type = T.Enum(None, all_kernels.keys()) - smoothing_kernel = T.Instance(T.HasTraits) - - @T.on_trait_change('smoothing_kernel_type') - def set_smoothing_kernel(self): - if self.smoothing_kernel_type is not None: - kernel_factory = all_kernels[self.smoothing_kernel_type] - self.smoothing_kernel = kernel_factory() - else: - self.smoothing_kernel = None - - interpolator = T.Enum('NearestNeighbor', all_interpolators.keys()) - model_type = T.Enum('SlowAdcOpdf', all_shmodels.keys()) - sh_order = T.Int(4) - Lambda = T.Float(0, desc="Smoothing on the odf") - sphere_coverage = T.Int(5) - min_peak_spacing = T.Range(0., 1, np.sqrt(.5), desc="as a dot product") - min_relative_peak = T.Range(0., 1, .25) - - probabilistic = T.Bool(False, label='Probabilistic (Residual Bootstrap)') - bootstrap_input = T.Bool(False) - bootstrap_vector = T.Array(dtype='int', value=[]) - - # integrator = Enum('Boundry', all_integrators.keys()) - seed_largest_peak = T.Bool(False, desc="Ignore sub-peaks and start follow " - "the largest peak at each seed") - start_direction = T.Array(dtype='float', shape=(3,), value=[0, 0, 1], - desc="Prefered direction from seeds when " - "multiple directions are available. " - "(Mostly) doesn't matter when 'seed " - "largest peak' and 'track two directions' " - "are both True", - label="Start direction (RAS)") - track_two_directions = T.Bool(False) - fa_threshold = T.Float(1.0) - max_turn_angle = T.Range(0., 90, 0) - - stop_on_target = T.Bool(False) - targets = T.List(nifti_file, []) - - # will be set later - voxel_size = T.Array(dtype='float', shape=(3,)) - affine = T.Array(dtype='float', shape=(4, 4)) - shape = T.Tuple((0, 0, 0)) - - # set for io - save_streamlines_to = T.File('') - save_counts_to = nifti_file - - # io methods - def save_streamlines(self, streamlines, save_streamlines_to): - trk_hdr = empty_header() - voxel_order = orientation_to_string(nib.io_orientation(self.affine)) - trk_hdr['voxel_order'] = voxel_order - trk_hdr['voxel_size'] = self.voxel_size - trk_hdr['vox_to_ras'] = self.affine - trk_hdr['dim'] = self.shape - trk_tracks = ((ii, None, None) for ii in streamlines) - write(save_streamlines_to, trk_tracks, trk_hdr) - pickle.dump(self, open(save_streamlines_to + '.p', 'wb')) - - def save_counts(self, streamlines, save_counts_to): - counts = density_map(streamlines, self.shape, self.voxel_size) - if counts.max() < 2**15: - counts = counts.astype('int16') - nib.save(nib.Nifti1Image(counts, self.affine), save_counts_to) - - # tracking methods - def track_shm(self, debug=False): - if self.sphere_coverage > 7 or self.sphere_coverage < 1: - raise ValueError("sphere coverage must be between 1 and 7") - verts, edges, faces = create_half_unit_sphere(self.sphere_coverage) - verts, pot = disperse_charges(verts, 10, .3) - - data, voxel_size, affine, fa, bvec, bval = self.all_inputs.read_data() - self.voxel_size = voxel_size - self.affine = affine - self.shape = fa.shape - - model_type = all_shmodels[self.model_type] - model = model_type(self.sh_order, bval, bvec, self.Lambda) - model.set_sampling_points(verts, edges) - - data = np.asarray(data, dtype='float', order='C') - if self.smoothing_kernel is not None: - kernel = self.smoothing_kernel.get_kernel() - convolve(data, kernel, out=data) - - normalize_data(data, bval, self.min_signal, out=data) - dmin = data.min() - data = data[..., lazy_index(bval > 0)] - if self.bootstrap_input: - if self.bootstrap_vector.size == 0: - n = data.shape[-1] - self.bootstrap_vector = np.random.randint(n, size=n) - H = hat(model.B) - R = lcr_matrix(H) - data = bootstrap_data_array(data, H, R, self.bootstrap_vector) - data.clip(dmin, out=data) - - mask = fa > self.fa_threshold - targets = [read_roi(tgt, shape=self.shape) for tgt in self.targets] - if self.stop_on_target: - for target_mask in targets: - mask = mask & ~target_mask - - seed_mask = read_roi(self.seed_roi, shape=self.shape) - seeds = seeds_from_mask(seed_mask, self.seed_density, voxel_size) - - if ((self.interpolator == 'NearestNeighbor' and not - self.probabilistic and not debug)): - using_optimze = True - peak_finder = NND_ClosestPeakSelector(model, data, mask, - voxel_size) - else: - using_optimze = False - interpolator_type = all_interpolators[self.interpolator] - interpolator = interpolator_type(data, voxel_size, mask) - peak_finder = ClosestPeakSelector(model, interpolator) - - # Set peak_finder parameters for start steps - peak_finder.angle_limit = 90 - model.peak_spacing = self.min_peak_spacing - if self.seed_largest_peak: - model.min_relative_peak = 1 - else: - model.min_relative_peak = self.min_relative_peak - - data_ornt = nib.io_orientation(self.affine) - best_start = reorient_vectors(self.start_direction, 'ras', data_ornt) - start_steps = closest_start(seeds, peak_finder, best_start) - - if self.probabilistic: - interpolator = ResidualBootstrapWrapper(interpolator, model.B, - min_signal=dmin) - peak_finder = ClosestPeakSelector(model, interpolator) - elif using_optimze and self.seed_largest_peak: - peak_finder.reset_cache() - - # Reset peak_finder parameters for tracking - peak_finder.angle_limit = self.max_turn_angle - model.peak_spacing = self.min_peak_spacing - model.min_relative_peak = self.min_relative_peak - - integrator = BoundryIntegrator(voxel_size, overstep=.1) - streamlines = generate_streamlines(peak_finder, integrator, seeds, - start_steps) - if self.track_two_directions: - start_steps = -start_steps - streamlinesB = generate_streamlines(peak_finder, integrator, seeds, - start_steps) - streamlines = merge_streamlines(streamlines, streamlinesB) - - for target_mask in targets: - streamlines = target(streamlines, target_mask, voxel_size) - - return streamlines diff --git a/dipy/tracking/markov.py b/dipy/tracking/markov.py deleted file mode 100644 index 2008f32340..0000000000 --- a/dipy/tracking/markov.py +++ /dev/null @@ -1,465 +0,0 @@ -# -*- coding: utf-8 -*- -"""Implemention of various Tractography methods. - -these tools are meant to be paired with diffusion reconstruction methods from -dipy.reconst - -This module uses the trackvis coordinate system, for more information about -this coordinate system please see dipy.tracking.utils -The following modules also use this coordinate system: -dipy.tracking.utils -dipy.tracking.integration -dipy.reconst.interpolate -""" -from __future__ import division, print_function, absolute_import - -from ..utils.six.moves import xrange - -import numpy as np -from ..reconst.interpolate import OutsideImage, NearestNeighborInterpolator -from dipy.direction.peaks import default_sphere, peak_directions -from . import utils - - -class DirectionFinder(object): - - sphere = default_sphere - relative_peak_threshold = .5 - min_seperation_angle = 45 - - def __call__(self, fit): - discrete_odf = fit.odf(self.sphere) - directions, _, _ = peak_directions(discrete_odf, self.sphere, - self.relative_peak_threshold, - self.min_seperation_angle) - return directions - - -class BoundaryStepper(object): - """Steps along a direction past the closest voxel boundary - - Parameters - ---------- - voxel_size : array-like - Size of voxels in data volume - overstep : float - A small number used to prevent the track from getting stuck at the - edge of a voxel. - - """ - def __init__(self, voxel_size=(1, 1, 1), overstep=.1): - self.overstep = overstep - self.voxel_size = np.array(voxel_size, 'float') - - def __call__(self, location, step): - """takes a step just past the edge of the next voxel along step - - given a location and a step, finds the smallest step needed to move - into the next voxel - - Parameters - ---------- - location : ndarray, (3,) - location to integrate from - step : ndarray, (3,) - direction in 3 space to integrate along - """ - step_sizes = self.voxel_size * (~np.signbit(step)) - step_sizes -= location % self.voxel_size - step_sizes /= step - smallest_step = min(step_sizes) + self.overstep - return location + smallest_step * step - - -class FixedSizeStepper(object): - """A stepper that uses a fixed step size""" - def __init__(self, step_size=.5): - self.step_size = step_size - - def __call__(self, location, step): - """Takes a step of step_size from location""" - new_location = self.step_size * step + location - return new_location - - -def markov_streamline(get_direction, take_step, seed, first_step, maxlen): - """Creates a streamline from seed - - Parameters - ---------- - get_direction : callable - This function should return a direction for the streamline given a - location and the previous direction. - take_step : callable - Take step should take a step from a location given a direction. - seed : array (3,) - The seed point of the streamline - first_step : array (3,) - A unit vector giving the direction of the first step - maxlen : int - The maximum number of segments allowed in the streamline. This is good - for preventing infinite loops. - - Returns - ------- - streamline : array (N, 3) - A streamline. - - """ - streamline = [] - location = seed - direction = first_step - - try: - for i in xrange(maxlen): - streamline.append(location) - location = take_step(location, direction) - direction = get_direction(location, direction) - if direction is None: - streamline.append(location) - break - except OutsideImage: - pass - - return np.array(streamline) - - -class MarkovIntegrator(object): - """An abstract class for fiber-tracking""" - - _get_directions = DirectionFinder() - - def __init__(self, model, interpolator, mask, take_step, angle_limit, - seeds, max_cross=None, maxlen=500, mask_voxel_size=None, - affine=None): - """Creates streamlines by using a Markov approach. - - Parameters - ---------- - model : model - The model used to fit diffusion data. - interpolator : interpolator - Diffusion weighted data wrapped in an interpolator. Data should be - normalized. - mask : array, 3D - Used to confine tracking, streamlines are terminated if the - tracking leaves the mask. - take_step : callable - Determines the length of each step. - angle_limit : float [0, 90] - Maximum angle allowed between successive steps of the streamline. - seeds : array (N, 3) - Points to seed the tracking. Seed points should be given in point - space of the track (see ``affine``). - max_cross : int or None - The maximum number of direction to track from each seed in crossing - voxels. By default track all peaks of the odf, otherwise track the - largest `max_cross` peaks. - maxlen : int - Maximum number of steps to track from seed. Used to prevent - infinite loops. - mask_voxel_size : array (3,) - Voxel size for the mask. `mask` should cover the same FOV as data, - but it can have a different voxel size. Same as the data by - default. - affine : array (4, 4) - Coordinate space for the streamline point with respect to voxel - indices of input data. - - """ - self.model = model - self.interpolator = interpolator - self.seeds = seeds - self.max_cross = max_cross - self.maxlen = maxlen - - voxel_size = np.asarray(interpolator.voxel_size) - self._tracking_space = tracking_space = np.eye(4) - tracking_space[[0, 1, 2], [0, 1, 2]] = voxel_size - tracking_space[:3, 3] = voxel_size / 2. - if affine is None: - self.affine = tracking_space.copy() - else: - self.affine = affine - - self._take_step = take_step - self._cos_similarity = np.cos(np.deg2rad(angle_limit)) - - if mask_voxel_size is None: - if mask.shape != interpolator.data.shape[:-1]: - raise ValueError("The shape of the mask and the shape of the " - "data do not match") - mask_voxel_size = interpolator.voxel_size - else: - mask_voxel_size = np.asarray(mask_voxel_size) - mask_FOV = mask_voxel_size * mask.shape - data_FOV = interpolator.voxel_size * interpolator.data.shape[:-1] - if not np.allclose(mask_FOV, data_FOV): - raise ValueError("The FOV of the data and the FOV of the mask " - "do not match") - self._mask = NearestNeighborInterpolator(mask.copy(), mask_voxel_size) - - def __iter__(self): - # Check that seeds are reasonable - seeds = np.asarray(self.seeds) - if seeds.ndim != 2 or seeds.shape[1] != 3: - raise ValueError("Seeds should be an (N, 3) array of points") - - # Compute affine from point space to tracking space, apply to seeds - inv_A = np.dot(self._tracking_space, np.linalg.inv(self.affine)) - tracking_space_seeds = np.dot(seeds, inv_A[:3, :3].T) + inv_A[:3, 3] - - # Make tracks, move them to point space and return - track = self._generate_streamlines(tracking_space_seeds) - return utils.move_streamlines(track, output_space=self.affine, - input_space=self._tracking_space) - - def _generate_streamlines(self, seeds): - """A streamline generator""" - for s in seeds: - directions = self._next_step(s, prev_step=None) - directions = directions[:self.max_cross] - for first_step in directions: - F = markov_streamline(self._next_step, self._take_step, s, - first_step, self.maxlen) - first_step = -first_step - B = markov_streamline(self._next_step, self._take_step, s, - first_step, self.maxlen) - yield np.concatenate([B[:0:-1], F], axis=0) - - -def _closest_peak(peak_directions, prev_step, cos_similarity): - """Return the closest direction to prev_step from peak_directions. - - All directions should be unit vectors. Antipodal symmetry is assumed, ie - direction x is the same as -x. - - Parameters - ---------- - peak_directions : array (N, 3) - N unit vectors. - prev_step : array (3,) or None - Previous direction. - cos_similarity : float - `cos(max_angle)` where `max_angle` is the maximum allowed angle between - prev_step and the returned direction. - - Returns - ------- - direction : array or None - If prev_step is None, returns peak_directions. Otherwise returns the - closest direction to prev_step. If no directions are close enough to - prev_step, returns None - """ - if prev_step is None: - return peak_directions - if len(peak_directions) == 0: - return None - - peak_dots = np.dot(peak_directions, prev_step) - closest_peak = abs(peak_dots).argmax() - dot_closest_peak = peak_dots[closest_peak] - if dot_closest_peak >= cos_similarity: - return peak_directions[closest_peak] - elif dot_closest_peak <= -cos_similarity: - return -peak_directions[closest_peak] - else: - return None - - -class ClosestDirectionTracker(MarkovIntegrator): - - def _next_step(self, location, prev_step): - """Returns the direction closest to prev_step at location - - Fits the data from location using model and returns the tracking - direction closest to prev_step. If prev_step is None, all the - directions are returned. - - Parameters - ---------- - location : point in space - location is passed to the interpolator in order to get data - prev_step : array_like (3,) - the direction of the previous tracking step - - """ - if not self._mask[location]: - return None - vox_data = self.interpolator[location] - fit = self.model.fit(vox_data) - directions = self._get_directions(fit) - return _closest_peak(directions, prev_step, self._cos_similarity) - - -class ProbabilisticOdfWeightedTracker(MarkovIntegrator): - """A stochastic (probabilistic) fiber tracking method - - Stochastically tracks streamlines by randomly choosing directions from - sphere. The likelihood of a direction being chosen is taken from - `model.fit(data).odf(sphere)`. Negative values are set to 0. If no - directions less than `angle_limit` degrees are from the incoming direction - have a positive likelihood, the streamline is terminated. - - Parameters - ---------- - model : model - The model used to fit diffusion data. - interpolator : interpolator - Diffusion weighted data wrapped in an interpolator. Data should be - normalized. - mask : array, 3D - Used to confine tracking, streamlines end when they leave the mask. - take_step : callable - Determines the length of each step. - angle_limit : float [0, 90] - The angle between successive steps in the streamlines cannot be more - than `angle_limit` degrees. - seeds : array (N, 3) - Points to seed the tracking. - sphere : Sphere - sphere used to evaluate the likelihood. A Sphere or a HemiSphere can be - used here. A HemiSphere is more efficient. - max_cross : int or None - Max number of directions to follow at each seed. By default follow all - peaks of the odf. - maxlen : int - Maximum number of segments to follow from seed. Used to prevent - infinite loops. - mask_voxel_size : array (3,) - Voxel size for the mask. `mask` should cover the same FOV as data, but - it can have a different voxel size. Same as the data by default. - - Notes - ----- - The tracker is based on a method described in [1]_ and [2]_ as fiber - orientation distribution (FOD) sampling. - - References - ---------- - .. [1] Jeurissen, B., Leemans, A., Jones, D. K., Tournier, J.-D., & - Sijbers, J. (2011). Probabilistic fiber tracking using the residual - bootstrap with constrained spherical deconvolution. Human Brain - Mapping, 32(3), 461-479. doi:10.1002/hbm.21032 - .. [2] J-D. Tournier, F. Calamante, D. G. Gadian, A. Connelly (2005). - Probabilistic fibre tracking through regions containing crossing - fibres. http://cds.ismrm.org/ismrm-2005/Files/01343.pdf - - """ - def __init__(self, model, interpolator, mask, take_step, angle_limit, - seeds, sphere, max_cross=None, maxlen=500, - mask_voxel_size=None, affine=None): - - MarkovIntegrator.__init__(self, model, interpolator, mask, take_step, - angle_limit, seeds, max_cross, maxlen, - mask_voxel_size, affine) - self.sphere = sphere - self._set_adjacency_matrix(sphere, self._cos_similarity) - self._get_directions.sphere = sphere - - def _set_adjacency_matrix(self, sphere, cos_similarity): - """A boolean array of where the angle between vertices i and j of - sphere is less than `angle_limit` apart.""" - matrix = np.dot(sphere.vertices, sphere.vertices.T) - matrix = abs(matrix) >= cos_similarity - keys = [tuple(v) for v in sphere.vertices] - adj_matrix = dict(zip(keys, matrix)) - keys = [tuple(-v) for v in sphere.vertices] - adj_matrix.update(zip(keys, matrix)) - self._adj_matrix = adj_matrix - - def _next_step(self, location, prev_step): - """Returns the direction closest to prev_step at location - - Fits the data from location using model and returns the tracking - direction closest to prev_step. If prev_step is None, all the - directions are returned. - - Parameters - ---------- - location : point in space - location is passed to the interpolator in order to get data - prev_step : array_like (3,) - the direction of the previous tracking step - - """ - if not self._mask[location]: - return None - vox_data = self.interpolator[location] - fit = self.model.fit(vox_data) - if prev_step is None: - return self._get_directions(fit) - odf = fit.odf(self.sphere) - odf.clip(0, out=odf) - cdf = (self._adj_matrix[tuple(prev_step)] * odf).cumsum() - if cdf[-1] == 0: - return None - random_sample = np.random.random() * cdf[-1] - idx = cdf.searchsorted(random_sample, 'right') - direction = self.sphere.vertices[idx] - if np.dot(direction, prev_step) > 0: - return direction - else: - return -direction - - -class CDT_NNO(ClosestDirectionTracker): - """ClosestDirectionTracker optimized for NearestNeighbor interpolator - - For use with Nearest Neighbor interpolation, directions at each voxel are - remembered to avoid recalculating. - - Parameters - ---------- - model : model - A model used to fit data. Should return a some fit object with - directions. - interpolator : interpolator - A NearestNeighbor interpolator, for other interpolators do not use this - class. - angle_limit : float [0, 90] - Maximum angle allowed between prev_step and next_step. - - """ - def __init__(self, model, interpolator, mask, take_step, angle_limit, - seeds, max_cross=None, maxlen=500, mask_voxel_size=None, - affine=None): - if not isinstance(interpolator, NearestNeighborInterpolator): - msg = ("CDT_NNO is an optimized version of " - "ClosestDirectionTracker that requires a " - "NearestNeighborInterpolator") - raise ValueError(msg) - - ClosestDirectionTracker.__init__(self, model, interpolator, mask, - take_step, angle_limit, seeds, - max_cross=max_cross, maxlen=maxlen, - mask_voxel_size=mask_voxel_size, - affine=None) - self._data = self.interpolator.data - self._voxel_size = self.interpolator.voxel_size - self.reset_cache() - - def reset_cache(self): - """Clear saved directions""" - lookup = np.empty(self._data.shape[:-1], 'int') - lookup.fill(-1) - self._lookup = lookup - self._peaks = [] - - def _next_step(self, location, prev_step): - """Returns the direction closest to prev_step at location""" - if not self._mask[location]: - return None - - vox_loc = tuple(location // self._voxel_size) - hash = self._lookup[vox_loc] - if hash >= 0: - directions = self._peaks[hash] - else: - vox_data = self._data[vox_loc] - fit = self.model.fit(vox_data) - directions = self._get_directions(fit) - self._lookup[vox_loc] = len(self._peaks) - self._peaks.append(directions) - - return _closest_peak(directions, prev_step, self._cos_similarity) diff --git a/dipy/tracking/tests/test_markov.py b/dipy/tracking/tests/test_markov.py deleted file mode 100644 index 396e2f31af..0000000000 --- a/dipy/tracking/tests/test_markov.py +++ /dev/null @@ -1,298 +0,0 @@ -from __future__ import division, print_function, absolute_import - -import numpy as np -from dipy.tracking import utils -from dipy.reconst.interpolate import NearestNeighborInterpolator -from dipy.tracking.markov import (BoundaryStepper, _closest_peak, - FixedSizeStepper, MarkovIntegrator, - markov_streamline, OutsideImage, - ClosestDirectionTracker, - ProbabilisticOdfWeightedTracker) -from dipy.core.sphere import HemiSphere, unit_octahedron -from numpy.testing import (assert_array_almost_equal, assert_array_equal, - assert_equal, assert_, assert_raises) - - -def test_BoundaryStepper(): - os = 1 - bi = BoundaryStepper(overstep=os) - loc = np.array([.5, .5, .5]) - step = np.array([1, 1, 1]) / np.sqrt(3) - assert_array_almost_equal(bi(loc, step), os * step + [1, 1, 1]) - assert_array_almost_equal(bi(loc, -step), -os * step) - - os = 2 - bi = BoundaryStepper((2, 3, 4), overstep=2) - assert_array_almost_equal(bi(loc, step), os * step + [2, 2, 2]) - assert_array_almost_equal(bi(loc, -step), -os * step) - - loc = np.array([7.5, 7.5, 7.5]) - assert_array_almost_equal(bi(loc, step), os * step + [8, 8, 8]) - assert_array_almost_equal(bi(loc, -step), [6, 6, 6] - os * step) - - -def test_FixedSizeStepper(): - fsi = FixedSizeStepper(step_size=2.) - loc = np.array([2, 3, 12]) - step = np.array([3, 2, 4]) / np.sqrt(3) - assert_array_almost_equal(fsi(loc, step), loc + 2. * step) - assert_array_almost_equal(fsi(loc, -step), loc - 2. * step) - - -def test_markov_streamline(): - - east = np.array([1, 0, 0]) - - class MoveEastWest(object): - def get_direction(self, location, prev_step): - if np.any(location < 0): - raise OutsideImage - elif np.any(location > 10.): - return None - if np.dot(prev_step, east) >= 0: - return east - else: - return -east - - seed = np.array([5.2, 0, 0]) - first_step = east - dir_getter = MoveEastWest() - stepper = FixedSizeStepper(.5) - - # The streamline terminates when it goes past (10, 0, 0). (10.2, 0, 0) - # should be the last point in the streamline - streamline = markov_streamline(dir_getter.get_direction, stepper, - seed, first_step, 100) - expected = np.zeros((11, 3)) - expected[:, 0] = np.linspace(5.2, 10.2, 11) - assert_array_almost_equal(streamline, expected) - - # OutsideImage gets raised when the streamline points become negative - # the streamline should end, and the negative points should not be part - # of the streamline - first_step = -east - streamline = markov_streamline(dir_getter.get_direction, stepper, - seed, first_step, 100) - expected = np.zeros((11, 3)) - expected[:, 0] = np.linspace(5.2, 0.2, 11) - assert_array_almost_equal(streamline, expected) - - -def test_MarkovIntegrator(): - - class KeepGoing(MarkovIntegrator): - def _next_step(self, location, prev_step): - if prev_step is None: - return np.array([[1., 0, 0], - [0, 1., 0], - [0, 0., 1]]) - if not self._mask[location]: - return None - else: - return prev_step - - data = np.ones((10, 10, 10, 65)) - data_interp = NearestNeighborInterpolator(data, (1, 1, 1)) - - seeds = [np.array([5.2, 5.2, 5.2])] - stepper = FixedSizeStepper(.5) - mask = np.ones((10, 10, 10), 'bool') - gen = KeepGoing(model=None, interpolator=data_interp, mask=mask, - take_step=stepper, angle_limit=0., seeds=seeds) - streamlines = list(gen) - assert_equal(len(streamlines), 3) - - expected = np.zeros((20, 3)) - for i in range(3): - expected[:] = 5.2 - expected[:, i] = np.arange(.2, 10, .5) - assert_array_almost_equal(streamlines[i], expected) - - # Track only the first (largest) peak for each seed - gen = KeepGoing(model=None, interpolator=data_interp, mask=mask, - take_step=stepper, angle_limit=0., seeds=seeds, - max_cross=1) - streamlines = list(gen) - assert_equal(len(streamlines), 1) - - expected = np.zeros((20, 3)) - expected[:] = 5.2 - expected[:, 0] = np.arange(.2, 10, .5) - assert_array_almost_equal(streamlines[0], expected) - - mask = np.ones((20, 20, 20), 'bool') - gen = KeepGoing(model=None, interpolator=data_interp, mask=mask, - take_step=stepper, angle_limit=0., seeds=seeds, - max_cross=1, mask_voxel_size=(.5, .5, .5)) - streamlines = list(gen) - assert_equal(len(streamlines), 1) - assert_array_almost_equal(streamlines[0], expected) - - # Test tracking with affine - affine = np.eye(4) - affine[:3, :] = np.random.random((3, 4)) - .5 - - seeds = [np.dot(affine[:3, :3], seeds[0] - .5) + affine[:3, 3]] - sl_affine = KeepGoing(model=None, interpolator=data_interp, mask=mask, - take_step=stepper, angle_limit=0., seeds=seeds, - max_cross=1, mask_voxel_size=(.5, .5, .5), affine=affine) - - default = np.eye(4) - default[:3, 3] = .5 - sl_default = list(utils.move_streamlines(sl_affine, default, affine)) - - assert_equal(len(sl_default), 1) - assert_array_almost_equal(sl_default[0], expected) - - -def test_closest_peak(): - peak_values = np.array([1, .9, .8, .7, .6, .2, .1]) - peak_points = np.array([[1., 0., 0.], - [0., .9, .1], - [0., 1., 0.], - [.9, .1, 0.], - [0., 0., 1.], - [1., 1., 0.], - [0., 1., 1.]]) - norms = np.sqrt((peak_points * peak_points).sum(-1)) - peak_points = peak_points / norms[:, None] - - prev = np.array([1, -.9, 0]) - prev = prev / np.sqrt(np.dot(prev, prev)) - cp = _closest_peak(peak_points, prev, 0.) - assert_array_equal(cp, peak_points[0]) - cp = _closest_peak(peak_points, -prev, 0.) - assert_array_equal(cp, -peak_points[0]) - - -def test_ClosestDirectionTracker(): - class MyModel(object): - def fit(self, data): - return MyFit() - - class MyFit(object): - pass - - class MyDirectionFinder(object): - - directions = np.array([[1., 0, 0], - [0, 1., 0], - [0, 0., 1]]) - - def __call__(self, fit): - return self.directions - - data = np.ones((10, 10, 10, 65)) - data_interp = NearestNeighborInterpolator(data, (1, 1, 1)) - - mask = np.ones((10, 10, 10), 'bool') - mask[0, 0, 0] = False - cdt = ClosestDirectionTracker(model=MyModel(), interpolator=data_interp, - mask=mask, take_step=None, - angle_limit=90., seeds=None) - - # We're going to use a silly set of directions for the test - cdt._get_directions = MyDirectionFinder() - - prev_step = np.array([[.9, .1, .1], - [.1, .9, .1], - [.1, .1, .9]]) - prev_step /= np.sqrt((prev_step * prev_step).sum(-1))[:, None] - a, b, c = prev_step - assert_array_equal(cdt._next_step([1., 1., 1.], a), [1, 0, 0]) - assert_array_equal(cdt._next_step([1., 1., 1.], b), [0, 1, 0]) - assert_array_equal(cdt._next_step([1., 1., 1.], c), [0, 0, 1]) - # Assert raises outside image - assert_raises(OutsideImage, cdt._next_step, [-1., 1., 1.], c) - # Returns None when mask is False - assert_equal(cdt._next_step([0, 0, 0], c), None) - - # Test Angle limit - cdt = ClosestDirectionTracker(model=MyModel(), interpolator=data_interp, - mask=mask, take_step=None, - angle_limit=45, seeds=None) - - # We're going to use a silly set of directions for the test - cdt._get_directions = MyDirectionFinder() - sq3 = np.sqrt(3) - a = np.array([sq3 / 2, 1. / 2, 0]) - b = np.array([1. / 2, sq3 / 2, 0]) - c = np.array([1, 1, 1]) / sq3 - - assert_array_equal(cdt._next_step([1., 1., 1.], a), [1, 0, 0]) - assert_array_equal(cdt._next_step([1., 1., 1.], b), [0, 1, 0]) - assert_array_equal(cdt._next_step([1., 1., 1.], c), None) - - -def test_ProbabilisticOdfWeightedTracker(): - sphere = HemiSphere.from_sphere(unit_octahedron) - - # A simple image with three possible configurations, a vertical tract, - # a horizontal tract and a crossing - odf_list = [np.array([0., 0., 0.]), - np.array([1., 0., 0.]), - np.array([0., 1., 0.]), - np.array([1., 1., 0.]), - ] - simple_image = np.array([[0, 1, 0, 0, 0, 0], - [0, 1, 0, 0, 0, 0], - [0, 3, 2, 2, 2, 0], - [0, 1, 0, 0, 0, 0], - [0, 1, 0, 0, 0, 0], - ]) - # Make the image 4d - simple_image = simple_image[..., None, None] - - # Simple model and fit for this image - class MyModel(): - def fit(self, data): - return MyFit(data) - - class MyFit(object): - def __init__(self, n): - self.n = int(n) - - def odf(self, sphere): - return odf_list[self.n] - - seeds = [np.array([1.5, 1.5, .5])] * 30 - model = MyModel() - mask = np.ones([5, 6, 1], dtype="bool") - stepper = FixedSizeStepper(1.) - interpolator = NearestNeighborInterpolator(simple_image, (1, 1, 1)) - - # These are the only two possible paths though the simple_image - pwt = ProbabilisticOdfWeightedTracker(model, interpolator, mask, - stepper, 90, seeds, sphere) - expected = [np.array([[0.5, 1.5, 0.5], - [1.5, 1.5, 0.5], - [2.5, 1.5, 0.5], - [2.5, 2.5, 0.5], - [2.5, 3.5, 0.5], - [2.5, 4.5, 0.5], - [2.5, 5.5, 0.5]]), - np.array([[0.5, 1.5, 0.5], - [1.5, 1.5, 0.5], - [2.5, 1.5, 0.5], - [3.5, 1.5, 0.5], - [4.5, 1.5, 0.5]]) - ] - - def allclose(x, y): - return x.shape == y.shape and np.allclose(x, y) - - path = [False, False] - for streamline in pwt: - if allclose(streamline, expected[0]): - path[0] = True - elif allclose(streamline, expected[1]): - path[1] = True - else: - raise AssertionError() - assert_(all(path)) - - # The first path is not possible if 90 degree turns are excluded - pwt = ProbabilisticOdfWeightedTracker(model, interpolator, mask, - stepper, 80, seeds, sphere) - for streamline in pwt: - assert_(np.allclose(streamline, expected[1]))