Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spatial_maps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
from .stats import (
sparsity, selectivity, information_rate, information_specificity,
prob_dist)
from .tools import autocorrelation, fftcorrelate2d
from .tools import autocorrelation, fftcorrelate2d, nancorrelate2d
5 changes: 4 additions & 1 deletion spatial_maps/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def find_peaks(image):
indices = np.arange(1, num_objects+1)
peaks = ndimage.maximum_position(image, labels=labels, index=indices)
peaks = np.array(peaks)
center = np.array(image.shape) / 2
center = (np.array(image.shape) - 1) / 2
distances = np.linalg.norm(peaks - center, axis=1)
peaks = peaks[distances.argsort()]
return peaks
Expand Down Expand Up @@ -211,6 +211,9 @@ def calculate_field_centers(rate_map, labels, center_method='maxima'):
else:
raise ValueError(
"invalid center_method flag '{}'".format(center_method))
if not bc:
# empty list
return bc
bc = np.array(bc)
bc[:,[0, 1]] = bc[:,[1, 0]] # y, x -> x, y
return bc
Expand Down
131 changes: 66 additions & 65 deletions spatial_maps/maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ def _adjust_bin_size(box_size, bin_size=None, bin_count=None):
box_size = np.array([box_size, box_size])

if bin_size is None:
bin_size = np.array([box_size[0] / bin_count[0],
box_size[1] / bin_count[1]])
bin_size = np.array([box_size[0] / bin_count[0], box_size[1] / bin_count[1]])

# round bin size of to closest requested bin size
bin_size = np.array([box_size[0] / int(box_size[0] / bin_size[0]),
box_size[1] / int(box_size[1] / bin_size[1])])
bin_size = np.array(
[
box_size[0] / int(box_size[0] / bin_size[0]),
box_size[1] / int(box_size[1] / bin_size[1]),
]
)

return box_size, bin_size

Expand All @@ -42,115 +45,113 @@ def smooth_map(rate_map, bin_size, smoothing, **kwargs):
def _occupancy_map(x, y, t, xbins, ybins):
t_ = np.append(t, t[-1] + np.median(np.diff(t)))
time_in_bin = np.diff(t_)
values, _, _ = np.histogram2d(
x, y, bins=[xbins, ybins], weights=time_in_bin)
values, _, _ = np.histogram2d(x, y, bins=[xbins, ybins], weights=time_in_bin)
return values


def _spike_map(x, y, t, spike_times, xbins, ybins):
t_ = np.append(t, t[-1] + np.median(np.diff(t)))
spikes_in_bin, _ = np.histogram(spike_times, t_)
values, _, _ = np.histogram2d(
x, y, bins=[xbins, ybins], weights=spikes_in_bin)
values, _, _ = np.histogram2d(x, y, bins=[xbins, ybins], weights=spikes_in_bin)
return values


def interpolate_nan_2D(array, method='nearest'):
def interpolate_nan_2D(array, method="nearest"):
from scipy import interpolate

x = np.arange(0, array.shape[1])
y = np.arange(0, array.shape[0])
#mask invalid values
# mask invalid values
array = np.ma.masked_invalid(array)
xx, yy = np.meshgrid(x, y)
#get only the valid values
# get only the valid values
x1 = xx[~array.mask]
y1 = yy[~array.mask]
newarr = array[~array.mask]

return interpolate.griddata(
(x1, y1), newarr.ravel(), (xx, yy),
method=method, fill_value=0)
(x1, y1), newarr.ravel(), (xx, yy), method=method, fill_value=0
)


class SpatialMap:
def __init__(self, x, y, t, spike_times, box_size, bin_size, bin_count=None):
def __init__(
self, smoothing=0.05, box_size=[1.0, 1.0], bin_size=0.02, bin_count=None
):
"""
Parameters
----------
smoothing : float
Smoothing of spike_map and occupancy_map before division
box_size : Sequence-like
Size of box in x and y direction
bin_size : float
Resolution of spatial maps
"""
box_size, bin_size = _adjust_bin_size(box_size, bin_size, bin_count)
xbins, ybins = _make_bins(box_size, bin_size)
self.spike_pos = _spike_map(x, y, t, spike_times, xbins, ybins)
self.time_pos = _occupancy_map(x, y, t, xbins, ybins)
assert all(self.spike_pos[self.time_pos == 0] == 0)

self.smoothing = smoothing
self.bin_size = bin_size
self.box_size = box_size
self.xbins = xbins
self.ybins = ybins

def spike_map(self, smoothing, mask_zero_occupancy=False, **kwargs):
if smoothing == 0.0:
spmap = copy(self.spike_pos)
else:
spmap = smooth_map(self.spike_pos, self.bin_size, smoothing, **kwargs)

def spike_map(self, x, y, t, spike_times, mask_zero_occupancy=True, **kwargs):
spmap = _spike_map(x, y, t, spike_times, self.xbins, self.ybins)
spmap = (
smooth_map(spmap, self.bin_size, self.smoothing, **kwargs)
if self.smoothing
else spmap
)
if mask_zero_occupancy:
spmap[self.time_pos == 0] = np.nan

spmap[_occupancy_map(x, y, t, self.xbins, self.ybins) == 0] = np.nan
return spmap

def occupancy_map(self, smoothing, mask_zero_occupancy=False, threshold=None, **kwargs):
'''Compute occupancy map as a histogram of postition
weighted with time spent.
Parameters:
-----------
mask_zero_occupancy : bool
Set zero occupancy to nan
threshold : float
Set low occupancy times to zero as not to get spurious
large values in rate.

'''
ocmap = copy(self.time_pos)
if threshold is not None:
scmap[self.time_pos <= threshold] = 0
if smoothing > 0:
ocmap = smooth_map(ocmap, self.bin_size, smoothing, **kwargs)

def occupancy_map(self, x, y, t, mask_zero_occupancy=True, **kwargs):
ocmap = _occupancy_map(x, y, t, self.xbins, self.ybins)
ocmap_copy = copy(ocmap) # to mask zero occupancy after smoothing
ocmap = (
smooth_map(ocmap, self.bin_size, self.smoothing, **kwargs)
if self.smoothing
else ocmap
)
if mask_zero_occupancy:
ocmap[self.time_pos == 0] = np.nan

ocmap[ocmap_copy == 0] = np.nan
return ocmap

def rate_map(self, smoothing, mask_zero_occupancy=False, interpolate_invalid=False, threshold=None, **kwargs):
'''Calculate rate map as spike_map / occupancy_map
def rate_map(
self,
x,
y,
t,
spike_times,
mask_zero_occupancy=True,
interpolate_invalid=False,
**kwargs
):
"""Calculate rate map as spike_map / occupancy_map
Parameters
----------
smoothing : float
Smoothing of spike_map and occupancy_map before division
mask_zero_occupancy : bool
Set pixels of zero occupancy to nan
interpolate_invalid : bool
Interpolate rate_map after division to remove invalid values,
if False, and mask_zero_occupancy is False,
invalid values are set to zero.
threshold : float
Low occupancy can produce spurious high rate, a threshold sets
occupancy below this to zero.
kwargs : key word arguments to scipy.interpolate, when smoothing > 0
Returns
-------
rate_map : array
'''
spike_map = self.spike_map(smoothing=smoothing, **kwargs)
occupancy_map = self.occupancy_map(
smoothing=smoothing, threshold=threshold, **kwargs)
if mask_zero_occupancy:
# to avoid infinity (x/0) we set zero occupancy to nan
# this can be handy when analyzing low occupancy maps
rate_map = spike_map / occupancy_map
rate_map[self.time_pos == 0] = np.nan
"""
spike_map = self.spike_map(
x, y, t, spike_times, mask_zero_occupancy=mask_zero_occupancy, **kwargs
)
# to avoid infinity (x/0) we set zero occupancy to nan
occupancy_map = self.occupancy_map(x, y, t, mask_zero_occupancy=True, **kwargs)
rate_map = spike_map / occupancy_map
if not mask_zero_occupancy:
rate_map[np.isnan(rate_map)] = 0
elif interpolate_invalid:
rate_map = spike_map / occupancy_map
rate_map = interpolate_nan_2D(rate_map)
else:
rate_map = spike_map / occupancy_map
rate_map[np.isinf(rate_map)] = 0
return rate_map
Loading