In [17]:
%matplotlib inline
import numpy as np
from scipy import stats
from blimpy import Waterfall
from blimpy.utils import rebin
from matplotlib import pyplot as plt
from bisect import bisect_left
from tqdm import tqdm
import dask.array as da
import h5py
from time import time

MAX_PLT_POINTS      = 65536                  # Max number of points in matplotlib plot
MAX_IMSHOW_POINTS   = (8192, 4096)           # Max number of points in imshow plot

fil_path = "data/filterbanks/"
h5_path = "data/h5/"

test_fil = fil_path + "blc20_guppi_57991_48899_3C161_0007.gpuspec.0000.fil"

fri_obs = h5_path + "GBT_57532_09539_HIP56445_fine.h5"

plt_args = {
            'aspect':'auto',
            'origin':'lower',
            'rasterized':True,
            'interpolation':'nearest',
            'cmap':'viridis'
            }
try:
    client.close()
except NameError:
    pass
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=3,
                n_workers=2, memory_limit='4GB')
client

0,1
Client  Scheduler: inproc://10.142.38.73/12833/1312  Dashboard: http://10.142.38.73/12833/1312:8787/status,Cluster  Workers: 2  Cores: 6  Memory: 8.00 GB


In [12]:
def norm_test(arr):
    return stats.normaltest(arr.flatten())

In [13]:
def show_stamp(window, i):
    test_data = window[:, i:i+200]
    plt.figure()
    plt.imshow(test_data, **plt_args)
    
def show_stamp_f(freqs, data, f):
    ind = bisect_left(freqs, f)
    test_data = data[:, ind:ind+200]
    plt.figure()
    plt.imshow(test_data, **plt_args)
    
def plot_segment(plot_data):
    dec_fac_x, dec_fac_y = 1, 1
    if plot_data.shape[0] > MAX_IMSHOW_POINTS[0]:
        dec_fac_x = int(plot_data.shape[0] / MAX_IMSHOW_POINTS[0])

    if plot_data.shape[1] > MAX_IMSHOW_POINTS[1]:
        dec_fac_y = int(plot_data.shape[1] / MAX_IMSHOW_POINTS[1])

    print(f'Downsampling by a factor of ({dec_fac_x}, {dec_fac_y})')
    plot_data = rebin(plot_data, dec_fac_x, dec_fac_y)
    plt.figure(figsize=(10, 6))
    plt.imshow(plot_data, **plt_args)

In [66]:
# Show Info
wf = Waterfall(fri_obs, load_data=False)
wf.info()

blimpy.file_wrapper INFO     Skipping loading data ...

--- File Info ---
b'DIMENSION_LABELS' : [b'frequency' b'feed_id' b'time']
     b'az_start' :                              0.0
    b'data_type' :                                1
         b'fch1' :                    1926.26953125
         b'foff' :           -2.835503418452676e-06
   b'machine_id' :                               20
        b'nbits' :                               32
       b'nchans' :                        318230528
         b'nifs' :                                1
  b'source_name' :                      b'HIP56445'
      b'src_dej' :                     3d03m34.006s
      b'src_raj' :                    11h34m21.699s
 b'telescope_id' :                                6
        b'tsamp' :                     17.986224128
       b'tstart' :                57532.11040509259
     b'za_start' :                              0.0

Num ints in file :                               16
      File shape :               (16,

  self.h5 = h5py.File(self.filename)


In [67]:
wf = Waterfall(fri_obs, f_stop=1080, max_load=2)
wf.info()


--- File Info ---
b'DIMENSION_LABELS' : [b'frequency' b'feed_id' b'time']
     b'az_start' :                              0.0
    b'data_type' :                                1
         b'fch1' :                    1926.26953125
         b'foff' :           -2.835503418452676e-06
   b'machine_id' :                               20
        b'nbits' :                               32
       b'nchans' :                        318230528
         b'nifs' :                                1
  b'source_name' :                      b'HIP56445'
      b'src_dej' :                     3d03m34.006s
      b'src_raj' :                    11h34m21.699s
 b'telescope_id' :                                6
        b'tsamp' :                     17.986224128
       b'tstart' :                57532.11040509259
     b'za_start' :                              0.0

Num ints in file :                               16
      File shape :               (16, 1, 318230528)
--- Selection Info ---
Data selection sh

In [69]:
channel_len = np.int(np.round(187.5/64/abs(wf.header[b'foff'])))
channel_len

1033216

In [None]:
plt.figure()
wf.plot_waterfall(f_start=0, f_stop=1080)

In [None]:
freqs, data = wf.grab_data(f_start=0, f_stop=1080)
print(data.shape)
freqs = freqs[::-1]
data = data[:,::-1]

In [None]:
len(freqs)/channel_len

In [None]:
data = data[:, :16 * 1033216]
freqs = freqs[:16 * 1033216]

integrated = np.sum(data, axis=0)/len(data)
for n in np.nonzero(integrated > 10**13):
    integrated[n] = (integrated[n-1] + integrated[n+1]) /2
plt.figure()
plt.plot(freqs, np.log(integrated))


In [None]:
c = 6

window = data[:, channel_len*(c):channel_len*(c+1)]
plot_segment(np.log(window))

In [None]:
window

In [None]:
res = []
for chan in tqdm(range(15)):
    res.append([])
    window = data[:, channel_len*(chan):channel_len*(chan+1)]
    window_f = freqs[channel_len*(chan):channel_len*(chan+1)]
    for i in range(0, (len(window[0])//200*200), 100):
        test_data = window[:, i:i+200]
        s, p = norm_test(test_data)
        if p < 1e-25:
            res[chan].append((window_f[i], s, p))

In [None]:
from multiprocessing import Pool
from time import time

import warnings
warnings.filterwarnings("ignore")


def threshold_hits(chan):
    res = list()
    window = data[:, channel_len*(chan):channel_len*(chan+1)]
    window_f = freqs[channel_len*(chan):channel_len*(chan+1)]
    for i in range(0, (len(window[0])//200*200), 100):
        test_data = window[:, i:i+200]
        s, p = norm_test(test_data)
        if p < 1e-25:
            res.append((window_f[i], s, p))
    return res

start = time()
with Pool(12) as p:
    chan_hits = p.map(threshold_hits, range(16))
end = time()
print(end-start)

In [None]:
show_stamp_f(freqs, data, 1025.0100805927195)

In [None]:
hits = [len(e) for e in chan_hits]
print(hits)
print(sum(hits))

In [None]:
sorted_hits = sorted(res[7], key=lambda x: x[2])
sorted_hits

In [None]:
top = [x[0] for x in sorted_hits[:20]]
top

In [None]:
for i in top:
    print(i)
    show_stamp(window, i)

In [None]:
res[:15]

In [None]:
plt.figure()
plt.imshow(data[:, 8:8+128])

In [None]:
with open("pfb512coef.txt", "r") as f:
    coef_file = f.read()

In [None]:
lines = coef_file.splitlines()
filter_coefs = []
for line in lines:
    filter_coefs.append(float(line))

In [None]:
coefs = np.array(filter_coefs)/2**17

In [None]:
plt.figure()
plt.plot(coefs)

In [None]:
from numpy import fft
l = 2**16
f = fft.fft(coefs, l)
plt.figure()
plt.plot(np.log(np.abs(f)**2))

In [None]:
stacked = np.reshape(integrated, (15, 1033216))
plt.figure()
for i in range(15):
    plt.plot(np.log(stacked[i]))

In [None]:
rebined = rebin(stacked, 1, 64)

In [None]:
plt.figure()
plt.imshow(np.log(rebined), **plt_args)

In [None]:
model_shape = np.sum(stacked, axis=0)/15
model_shape[model_shape > 10**10] = np.mean(model_shape)

In [None]:
plt.figure()
plt.plot(model_shape)

In [88]:
h5_file = h5py.File(fri_obs, "r")
a = da.from_array(h5_file["data"], chunks=(2, 1, channel_len*14))
a = a
a

Unnamed: 0,Array,Chunk
Bytes,20.37 GB,115.72 MB
Shape,"(16, 1, 318230528)","(2, 1, 14465024)"
Count,177 Tasks,176 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 20.37 GB 115.72 MB Shape (16, 1, 318230528) (2, 1, 14465024) Count 177 Tasks 176 Chunks Type float32 numpy.ndarray",318230528  1  16,

Unnamed: 0,Array,Chunk
Bytes,20.37 GB,115.72 MB
Shape,"(16, 1, 318230528)","(2, 1, 14465024)"
Count,177 Tasks,176 Chunks
Type,float32,numpy.ndarray


In [89]:
start = time()
means = da.mean(a, axis=2)
# means_data = means.compute()
# means_data
end = time()

print(f"{end-start}")

0.0032460689544677734


In [90]:
means = da.from_array(np.reshape(means_data, (16,1,1)))
means

Unnamed: 0,Array,Chunk
Bytes,64 B,64 B
Shape,"(16, 1, 1)","(16, 1, 1)"
Count,1 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 64 B 64 B Shape (16, 1, 1) (16, 1, 1) Count 1 Tasks 1 Chunks Type float32 numpy.ndarray",1  1  16,

Unnamed: 0,Array,Chunk
Bytes,64 B,64 B
Shape,"(16, 1, 1)","(16, 1, 1)"
Count,1 Tasks,1 Chunks
Type,float32,numpy.ndarray


In [91]:
a

Unnamed: 0,Array,Chunk
Bytes,20.37 GB,115.72 MB
Shape,"(16, 1, 318230528)","(2, 1, 14465024)"
Count,177 Tasks,176 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 20.37 GB 115.72 MB Shape (16, 1, 318230528) (2, 1, 14465024) Count 177 Tasks 176 Chunks Type float32 numpy.ndarray",318230528  1  16,

Unnamed: 0,Array,Chunk
Bytes,20.37 GB,115.72 MB
Shape,"(16, 1, 318230528)","(2, 1, 14465024)"
Count,177 Tasks,176 Chunks
Type,float32,numpy.ndarray


In [92]:
308/14

22.0

In [93]:
normalized_a = da.divide(a.T, means.T).T
normalized_a

Unnamed: 0,Array,Chunk
Bytes,20.37 GB,115.72 MB
Shape,"(16, 1, 318230528)","(2, 1, 14465024)"
Count,724 Tasks,176 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 20.37 GB 115.72 MB Shape (16, 1, 318230528) (2, 1, 14465024) Count 724 Tasks 176 Chunks Type float32 numpy.ndarray",318230528  1  16,

Unnamed: 0,Array,Chunk
Bytes,20.37 GB,115.72 MB
Shape,"(16, 1, 318230528)","(2, 1, 14465024)"
Count,724 Tasks,176 Chunks
Type,float32,numpy.ndarray


In [94]:
da.to_npy_stack('normalized_a/', normalized_a, axis=2)



In [63]:
print(normalized_a.shape)
print(normalized_a.chunks)

(16, 1, 318230528)
((2, 2, 2, 2, 2, 2, 2, 2), (1,), (14680064, 14680064, 14680064, 14680064, 14680064, 14680064, 14680064, 14680064, 14680064, 14680064, 14680064, 14680064, 14680064, 14680064, 14680064, 14680064, 14680064, 14680064, 14680064, 14680064, 14680064, 9949184))


In [7]:
start = time()
normalized_a_data = normalized_a.compute()
end = time()
print(f"{end-start}")

23.68931007385254


In [9]:
start = time()

original_a_data = a[:, 0, :16 * 1033216].compute()

end = time()
print(f"{end-start}")

22.857927083969116


In [10]:
read_a = da.from_array(original_a_data)
read_and_normalize = da.divide(read_a, means_data)

start = time()
normalized_a_data = read_and_normalize.compute()
end = time()
print(f"{end-start}")

# 1.6420118808746338

1.949164867401123


In [11]:
del normalized_a_data

In [12]:
start = time()

numpy_normalized_a_data = original_a_data / means_data

end = time()
print(f"{end-start}")

# 0.6659681797027588

0.5543172359466553


In [14]:
del numpy_normalized_a_data

In [None]:
numpy_normalized_a_data.shape

In [None]:
plot_segment(normalized_a_data)

In [None]:
plot_segment(original_a_data)

In [None]:
print(normalized_a_data.shape == numpy_normalized_a_data.shape)
print(np.allclose(normalized_a_data, numpy_normalized_a_data))