# Power Spectrum

imports

In [None]:
import os
import re
import gc
import sys
import glob
import yaml
import math
from scipy import stats
import numpy as np
import pandas as pd
from astropy.io import fits
import aicspylibczi as aplc
import matplotlib.pyplot as plt
from collections import defaultdict
from matplotlib_scalebar.scalebar import ScaleBar


from turbustat.statistics import PowerSpectrum


In [None]:
%load_ext autoreload
%autoreload 2
functions_path = 'functions'

sys.path.append(cluster + functions_path)

import image_plots as ip

In [None]:
def get_resolution(fn, dim='X'):
    czi = aplc.CziFile(fn)
    for n in czi.meta.iter():
        if 'Scaling' in n.tag:
            if dim in n.tag:
                resolution = float(n.text)
    return resolution

def _image_figure(dims, dpi=500):
    fig = plt.figure(figsize=(dims[0], dims[1]))
    ax = plt.Axes(fig, [0., 0., 1., 1.], )
    # ax.set_axis_off()
    fig.add_axes(ax)
    return(fig, ax)

def plot_image(
            im, im_inches=5, cmap='inferno', clims=('min','max'), zoom_coords=(), scalebar_resolution=0,
            axes_off=True, discrete=False, cbar_ori='horizontal', dpi=500,
            norm=None
        ):
    s = im.shape
    dims = (im_inches*s[1]/np.max(s), im_inches*s[0]/np.max(s))
    fig, ax = _image_figure(dims, dpi=dpi)
    im_ = im[~np.isnan(im)]
    llim = np.min(im_) if clims[0]=='min' else clims[0]
    ulim = np.max(im_) if clims[1]=='max' else clims[1]
    clims = (llim, ulim)
    if len(s) > 2:
        ax.imshow(im, interpolation="none")
    else:
        ax.imshow(im, cmap=cmap, clim=clims, interpolation="none", norm=norm)
    zc = zoom_coords if zoom_coords else (0,im.shape[0],0,im.shape[1])
    ax.set_ylim(zc[1],zc[0])
    ax.set_xlim(zc[2],zc[3])
    if axes_off:
        ax.set_axis_off()
    if scalebar_resolution:
        scalebar = ScaleBar(
                scalebar_resolution, 'um', frameon = False,
                color = 'white', box_color = 'white'
            )
        plt.gca().add_artist(scalebar)
    cbar = []
    fig2 = []
    if len(s) == 2:
        if cbar_ori == 'horizontal':
            fig2 = plt.figure(figsize=(dims[0], dims[0]/10))
        elif cbar_ori == 'vertical':
            fig2 = plt.figure(figsize=(dims[1]/10, dims[1]))
        if discrete:
            vals = np.sort(np.unique(im))
            vals = vals[~np.isnan(vals)]
            vals = vals[(vals>=clims[0]) & (vals<=clims[1])]
            cbar = get_discrete_colorbar(vals, cmap)
        else:
            image=plt.imshow(im, cmap=cmap, clim=clims, norm=norm)
            plt.gca().set_visible(False)
            cbar = plt.colorbar(image,orientation=cbar_ori)
    return(fig, ax, [fig2, cbar])


Get workdir

In [None]:
cluster = ""
workdir = ""
os.chdir(cluster + workdir)


In [None]:
os.getcwd()

Get czi filenames

In [None]:
input_table_fn = "input_table_all.csv"
input_table = pd.read_csv(input_table_fn)
filenames = input_table["filenames"]

In [None]:
dict_date_sn_fns = defaultdict(lambda: defaultdict(list))
for fn in filenames:
    bn = os.path.split(fn)[1]
    date, bn = re.split("(?<=^\d{4}_\d{2}_\d{2})_", bn)
    sn, ext = re.split("(?<=fov_\d{2})", bn)
    dict_date_sn_fns[date][sn].append(fn)
dict_date_sn_fns

Get processed filename formats

In [None]:
out_dir = "../outputs/{date}/{date}_{sn}"
out_fmt_classif = out_dir + "/classif"
centroid_sciname_fmt = out_fmt_classif + "/{date}_{sn}_centroid_sciname.csv"

Get color dict

In [None]:
sciname_list = [
    "Corynebacterium",
    "Actinomyces",
    "Rothia",
    "Capnocytophaga",
    "Prevotella",
    "Porphyromonas",
    "Streptococcus",
    "Gemella",
    "Veillonella",
    "Selenomonas",
    "Lautropia",
    "Neisseriaceae",
    "Pasteurellaceae",
    "Campylobacter",
    "Fusobacterium",
    "Leptotrichia",
    "Treponema",
    "TM7",
]
colors = plt.get_cmap("tab20").colors
# colors = [c + (1,) for c in colors]
dict_sciname_color = dict(zip(sciname_list, colors))
dict_sciname_color["Neisseria"] = dict_sciname_color["Neisseriaceae"]
dict_sciname_color["Saccharibacteria"] = dict_sciname_color["TM7"]
dict_sciname_color["TM"] = dict_sciname_color["TM7"]

## Test image

In [None]:
date = "2023_02_08"
sn = "hsdm_group_1_sample_12_fov_01"

centroid_sciname_fn = centroid_sciname_fmt.format(date=date, sn=sn)
centroid_sciname = pd.read_csv(centroid_sciname_fn)
coords = np.array([eval(c) for c in centroid_sciname["coord"].values])
scinames = centroid_sciname["sciname"].values
scn_unq = np.unique(scinames)


### Veillonella

Plot spot distribution

In [None]:
scn = 'Veillonella'

col = dict_sciname_color[scn]
spot_size = 1

res_umpix = get_resolution(dict_date_sn_fns[date][sn][0]) * 10**6

xlim = (0, np.max(coords[:, 1]))
ylim = (0, np.max(coords[:, 0]))

coords_scn = coords[scinames == scn]

fig, ax = plt.subplots(figsize=(10,10))
ax.scatter(coords_scn[:,1], coords_scn[:,0], s=spot_size, color=col)
ax.set_xlim(xlim[0], xlim[1])
ax.set_ylim(ylim[0], ylim[1])
ax.patch.set_color('k')
fig.patch.set_facecolor('k')
ax.invert_yaxis()
ax.set_aspect('equal')

plt.axis('off')
scalebar = ScaleBar(res_umpix, 'um', frameon = False, color = 'white', box_color = 'white')
plt.gca().add_artist(scalebar)

spatial_dir = out_dir + '/spatial_statistics'
cl_size_dir = spatial_dir + '/power_spectrum'
cluster_slope_fmt = cl_size_dir + '/plots/{date}_{sn}_scinames_{scn}_scatter.pdf'
out_fn = cluster_slope_fmt.format(date=date, sn=sn, scn=scn)
ip.check_dir(out_fn)
ip.save_fig(out_fn, transp=False)


Get density plot

In [None]:
radius_um = 10
step_um = 5

step = step_um / res_umpix
radius = radius_um / res_umpix
xs = np.arange(xlim[0],xlim[1], step)
ys = np.arange(ylim[0],ylim[1], step)

density_arr = np.zeros((len(rs), len(cs)))
for j, x in enumerate(xs): 
    eboolx = (
        (x > (xlim[0] + radius)) 
        * (x < (xlim[1] - radius))
    )
    if eboolx:
        for i, y in enumerate(ys):
            ebooly = (
                (y > (ylim[0] + radius)) 
                * (y < (ylim[1] - radius))
            )
            if ebooly:
                bools = (
                    (coords_scn[:,1] > (x - radius))
                    * (coords_scn[:,1] < (x + radius))
                    * (coords_scn[:,0] > (y - radius))
                    * (coords_scn[:,0] < (y + radius))
                )
                density_arr[i,j] = sum(bools) / (radius_um**2)


In [None]:
fig, ax, cbar = plot_image(
    density_arr, cmap='inferno', im_inches=10, cbar_ori='vertical'
)
plt.figure(fig)
spatial_dir = out_dir + '/spatial_statistics'
cl_size_dir = spatial_dir + '/power_spectrum'
cluster_slope_fmt = cl_size_dir + '/plots/{date}_{sn}_scinames_{scn}_density.png'
out_fn = cluster_slope_fmt.format(date=date, sn=sn, scn=scn)
ip.check_dir(out_fn)
ip.save_fig(out_fn, transp=False)

plt.figure(cbar[0])
cluster_slope_fmt = cl_size_dir + '/plots/{date}_{sn}_scinames_{scn}_density_cbar.pdf'
out_fn = cluster_slope_fmt.format(date=date, sn=sn, scn=scn)
ip.check_dir(out_fn)
ip.save_fig(out_fn, transp=False)

Get power spectrum

In [None]:
hdr = fits.PrimaryHDU().header
hdr['CDELT1'] = step_um
hdr['CDELT2'] = step_um


In [None]:
pspec = PowerSpectrum(density_arr, header=hdr)
pspec.run(verbose=True)

In [None]:
pspec.ps1D

### Selenomonas

Plot spot distribution

In [None]:
scn = 'Selenomonas'

col = dict_sciname_color[scn]
spot_size = 1

res_umpix = get_resolution(dict_date_sn_fns[date][sn][0]) * 10**6

xlim = (0, np.max(coords[:, 1]))
ylim = (0, np.max(coords[:, 0]))

coords_scn = coords[scinames == scn]

fig, ax = plt.subplots(figsize=(10,10))
ax.scatter(coords_scn[:,1], coords_scn[:,0], s=spot_size, color=col)
ax.set_xlim(xlim[0], xlim[1])
ax.set_ylim(ylim[0], ylim[1])
ax.patch.set_color('k')
fig.patch.set_facecolor('k')
ax.invert_yaxis()
ax.set_aspect('equal')

plt.axis('off')
scalebar = ScaleBar(res_umpix, 'um', frameon = False, color = 'white', box_color = 'white')
plt.gca().add_artist(scalebar)


spatial_dir = out_dir + '/spatial_statistics'
cl_size_dir = spatial_dir + '/power_spectrum'
cluster_slope_fmt = cl_size_dir + '/plots/{date}_{sn}_scinames_{scn}_scatter.pdf'
out_fn = cluster_slope_fmt.format(date=date, sn=sn, scn=scn)
ip.check_dir(out_fn)
ip.save_fig(out_fn, transp=False)



Get density plot

In [None]:
radius_um = 10
step_um = 5

step = step_um / res_umpix
radius = radius_um / res_umpix
xs = np.arange(xlim[0],xlim[1], step)
ys = np.arange(ylim[0],ylim[1], step)

density_arr = np.zeros((len(rs), len(cs)))
for j, x in enumerate(xs): 
    eboolx = (
        (x > (xlim[0] + radius)) 
        * (x < (xlim[1] - radius))
    )
    if eboolx:
        for i, y in enumerate(ys):
            ebooly = (
                (y > (ylim[0] + radius)) 
                * (y < (ylim[1] - radius))
            )
            if ebooly:
                bools = (
                    (coords_scn[:,1] > (x - radius))
                    * (coords_scn[:,1] < (x + radius))
                    * (coords_scn[:,0] > (y - radius))
                    * (coords_scn[:,0] < (y + radius))
                )
                density_arr[i,j] = sum(bools) / (radius_um**2)


In [None]:
fig, ax, cbar = plot_image(
    density_arr, cmap='inferno', im_inches=10, cbar_ori='vertical'
)
plt.figure(fig)
spatial_dir = out_dir + '/spatial_statistics'
cl_size_dir = spatial_dir + '/power_spectrum'
cluster_slope_fmt = cl_size_dir + '/plots/{date}_{sn}_scinames_{scn}_density.png'
out_fn = cluster_slope_fmt.format(date=date, sn=sn, scn=scn)
ip.check_dir(out_fn)
ip.save_fig(out_fn, transp=False)

plt.figure(cbar[0])
cluster_slope_fmt = cl_size_dir + '/plots/{date}_{sn}_scinames_{scn}_density_cbar.pdf'
out_fn = cluster_slope_fmt.format(date=date, sn=sn, scn=scn)
ip.check_dir(out_fn)
ip.save_fig(out_fn, transp=False)

Get power spectrum

In [None]:
hdr = fits.PrimaryHDU().header
hdr['CDELT1'] = step_um
hdr['CDELT2'] = step_um


In [None]:
pspec = PowerSpectrum(density_arr, header=hdr)
pspec.run(verbose=True)

Gemella

Plot spot distribution

In [None]:
scn = 'Prevotella'

col = dict_sciname_color[scn]
spot_size = 1

res_umpix = get_resolution(dict_date_sn_fns[date][sn][0]) * 10**6

xlim = (0, np.max(coords[:, 1]))
ylim = (0, np.max(coords[:, 0]))

coords_scn = coords[scinames == scn]

fig, ax = plt.subplots(figsize=(10,10))
ax.scatter(coords_scn[:,1], coords_scn[:,0], s=spot_size, color=col)
ax.set_xlim(xlim[0], xlim[1])
ax.set_ylim(ylim[0], ylim[1])
ax.patch.set_color('k')
fig.patch.set_facecolor('k')
ax.invert_yaxis()
ax.set_aspect('equal')

plt.axis('off')
scalebar = ScaleBar(res_umpix, 'um', frameon = False, color = 'white', box_color = 'white')
plt.gca().add_artist(scalebar)



Get density plot

In [None]:
radius_um = 10
step_um = 5

step = step_um / res_umpix
radius = radius_um / res_umpix
xs = np.arange(xlim[0],xlim[1], step)
ys = np.arange(ylim[0],ylim[1], step)

density_arr = np.zeros((len(rs), len(cs)))
for j, x in enumerate(xs): 
    eboolx = (
        (x > (xlim[0] + radius)) 
        * (x < (xlim[1] - radius))
    )
    if eboolx:
        for i, y in enumerate(ys):
            ebooly = (
                (y > (ylim[0] + radius)) 
                * (y < (ylim[1] - radius))
            )
            if ebooly:
                bools = (
                    (coords_scn[:,1] > (x - radius))
                    * (coords_scn[:,1] < (x + radius))
                    * (coords_scn[:,0] > (y - radius))
                    * (coords_scn[:,0] < (y + radius))
                )
                density_arr[i,j] = sum(bools) / (radius_um**2)


In [None]:
plot_image(density_arr, cmap='inferno', im_inches=10)

Get power spectrum

In [None]:
hdr = fits.PrimaryHDU().header
hdr['CDELT1'] = step_um
hdr['CDELT2'] = step_um


In [None]:
pspec = PowerSpectrum(density_arr, header=hdr)
pspec.run(verbose=True)

### Gemella

Plot spot distribution

In [None]:
scn = 'Gemella'

col = dict_sciname_color[scn]
spot_size = 1

res_umpix = get_resolution(dict_date_sn_fns[date][sn][0]) * 10**6

xlim = (0, np.max(coords[:, 1]))
ylim = (0, np.max(coords[:, 0]))

coords_scn = coords[scinames == scn]

fig, ax = plt.subplots(figsize=(10,10))
ax.scatter(coords_scn[:,1], coords_scn[:,0], s=spot_size, color=col)
ax.set_xlim(xlim[0], xlim[1])
ax.set_ylim(ylim[0], ylim[1])
ax.patch.set_color('k')
fig.patch.set_facecolor('k')
ax.invert_yaxis()
ax.set_aspect('equal')

plt.axis('off')
scalebar = ScaleBar(res_umpix, 'um', frameon = False, color = 'white', box_color = 'white')
plt.gca().add_artist(scalebar)



Get density plot

In [None]:
radius_um = 10
step_um = 5

step = step_um / res_umpix
radius = radius_um / res_umpix
xs = np.arange(xlim[0],xlim[1], step)
ys = np.arange(ylim[0],ylim[1], step)

density_arr = np.zeros((len(rs), len(cs)))
for j, x in enumerate(xs): 
    eboolx = (
        (x > (xlim[0] + radius)) 
        * (x < (xlim[1] - radius))
    )
    if eboolx:
        for i, y in enumerate(ys):
            ebooly = (
                (y > (ylim[0] + radius)) 
                * (y < (ylim[1] - radius))
            )
            if ebooly:
                bools = (
                    (coords_scn[:,1] > (x - radius))
                    * (coords_scn[:,1] < (x + radius))
                    * (coords_scn[:,0] > (y - radius))
                    * (coords_scn[:,0] < (y + radius))
                )
                density_arr[i,j] = sum(bools) / (radius_um**2)


In [None]:
plot_image(density_arr, cmap='inferno', im_inches=10)

Get power spectrum

In [None]:
hdr = fits.PrimaryHDU().header
hdr['CDELT1'] = step_um
hdr['CDELT2'] = step_um


In [None]:
pspec = PowerSpectrum(density_arr, header=hdr)
pspec.run(verbose=True)

### Comopare curves

In [None]:
def get_density_arr(coords_scn, step, radius):
    xs = np.arange(xlim[0],xlim[1], step)
    ys = np.arange(ylim[0],ylim[1], step)

    density_arr = np.zeros((len(ys), len(xs)))
    for j, x in enumerate(xs): 
        eboolx = (
            (x > (xlim[0] + radius)) 
            * (x < (xlim[1] - radius))
        )
        if eboolx:
            for i, y in enumerate(ys):
                ebooly = (
                    (y > (ylim[0] + radius)) 
                    * (y < (ylim[1] - radius))
                )
                if ebooly:
                    bools = (
                        (coords_scn[:,1] > (x - radius))
                        * (coords_scn[:,1] < (x + radius))
                        * (coords_scn[:,0] > (y - radius))
                        * (coords_scn[:,0] < (y + radius))
                    )
                    density_arr[i,j] = sum(bools) / (radius_um**2)
    return density_arr

In [None]:
radius_um = 10
step_um = 5

res_umpix = get_resolution(dict_date_sn_fns[date][sn][0]) * 10**6
step = step_um / res_umpix
radius = radius_um / res_umpix

xlim = (0, np.max(coords[:, 1]))
ylim = (0, np.max(coords[:, 0]))

hdr = fits.PrimaryHDU().header
hdr['CDELT1'] = step_um
hdr['CDELT2'] = step_um

dims = (2,1.5)
ft=6
lw=1
s=2

fmin = 1e-2
fmax = 1e-1

fig, ax = ip.general_plot(dims=dims, ft=ft, lw=lw)
scns = ['Veillonella','Selenomonas']
for scn in scns:
    # Setup
    col = dict_sciname_color[scn]
    coords_scn = coords[scinames == scn]

    # Get density
    density_arr = get_density_arr(coords_scn, step, radius)
    pspec = PowerSpectrum(density_arr, header=hdr)
    _ = pspec.run()

    # Plot 
    freqs = pspec.freqs.value / step_um
    bool_l = freqs > fmin
    bool_u = freqs < fmax
    freqs_clip = freqs[bool_l * bool_u]
    ps1D_clip = pspec.ps1D[bool_l * bool_u]

    ax.scatter(freqs_clip, ps1D_clip, color=col, s=s)

    lnc = np.log(freqs_clip)
    lnp = np.log(ps1D_clip)
    slope, intercept, r_value, p_value, std_err = stats.linregress(lnc, lnp)

    # intercept, slope = pspec.fit.params
    x = np.array([fmin, fmax])
    # x = np.array([pspec.low_cut.value, pspec.high_cut.value])
    y = math.exp(intercept) * x**slope
    ax.plot(x,y, color=col, lw=lw)
    print('Slope:',slope, "rsquared:", pspec.fit.rsquared)    

ax.set_yscale("log")
ax.set_xscale("log")
ax.set_xticks([1e-2, 1e-1], labels=[r"$10^{2}$", r"$10^{1}$"])

spatial_dir = out_dir + '/spatial_statistics'
cl_size_dir = spatial_dir + '/power_spectrum'
cluster_slope_fmt = cl_size_dir + '/plots/{date}_{sn}_scinames_{scn0}_{scn1}_power_spectrum.pdf'
out_fn = cluster_slope_fmt.format(date=date, sn=sn, scn0=scns[0], scn1=scns[1])
ip.check_dir(out_fn)
ip.save_fig(out_fn)

In [None]:
intercept


In [None]:
pspec.low_cut.value

In [None]:
dir(pspec)

In [None]:
pspec.fit.rsquared

In [None]:
dir(pspec.fit)