Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,9 @@ Creating a free and open source framework that contains the generic algorithms a
2. [Filter samples](docs/clipping.md#62-filter-samples)
3. [Filter channels](docs/clipping.md#63-filter-channels)
4. [Filter individual channels](docs/clipping.md#64-filter-individual-channels)
7. [Pipeline](docs/pipeline.md)
1. [Introduction](docs/pipeline.md#71-introduction)
2. [Read rows](docs/pipeline.md#72-read-rows)
3. [Read n rows](docs/pipeline.md#73-read-n-rows)
3. [Read static](docs/pipeline.md#74-read-static)
4. [Measure methods](docs/pipeline.md#75-measure-methods)
23 changes: 10 additions & 13 deletions clipping/clipping.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,21 @@ def clipping(channels, samples):
# remove all rows(samples) with noise
samples = filter_samples(samples)
# remove all columns(channels) with noise, select first max n samples
channels, samples = filter_channels(channels, samples[:n_samples])
bad_channels = filter_channels(samples[:n_samples])
# remove bad channels from samples
channels = np.delete(channels, bad_channels)
samples = np.delete(samples, bad_channels, axis=1)
# remove all individual cells with noise
samples = filter_indv_channels(samples)
return channels, samples


def filter_samples(samples):
def filter_samples(samples, factor=11):
"""
Calulate mean power of all frequencies per time sample
and remove samples with significantly high power
"""
factor = 1.3
new_samples = []
new_samples = list()
# calculate mean intensity per sample
avg_sample = np.sum(samples)/len(samples)
# remove samples with significant high power
Expand All @@ -35,13 +37,12 @@ def filter_samples(samples):
return np.array(new_samples)


def filter_channels(channels, samples):
def filter_channels(samples, factor=9):
"""
Calculate mean power of all time samples per frequency
and remove frequencies with significantly high power
"""
factor = 1.3
bad_channels = []
bad_channels = list()
# calculate the mean power per channel
avg_power_chan = samples.mean(axis=0)
# calculate the standard deviation per channel
Expand All @@ -52,18 +53,14 @@ def filter_channels(channels, samples):
for i, (avg_channel, sd_channel) in enumerate(zip(avg_power_chan, sd_power_chan)):
if avg_channel >= (avg_power * factor) or sd_channel >= (avg_power * factor):
bad_channels.append(i)
# remove bad channels from samples
new_channels = np.delete(channels, bad_channels)
new_samples = np.delete(samples, bad_channels, axis=1)
return new_channels, new_samples
return bad_channels


def filter_indv_channels(samples):
def filter_indv_channels(samples, factor=9):
"""
Calculate mean power per frequency
and remove samples with significantly high power
"""
factor = 1.3
new_samples = np.zeros((len(samples), len(samples[0])))
# calculate the mean power for each sample per channel
avg_power_chan = samples.mean(axis=0)
Expand Down
55 changes: 55 additions & 0 deletions docs/pipeline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# 7. Pipeline

## 7.1 Introduction

The Pipeline module is used to execute the different modules in a specific order.
There are currently three different options for running the pipeline.

These options include:
* read multiple rows, `read_n_rows`
* read single rows, `read_rows`
* read all rows, `read_static`

The constructor of the pipeline module will recognize which method is fit for running which method, by looking at the given arguments to the constructor.

| Parameter | Description |
| --- | --- |
| filename | The path to the filterbank file. |
| as_stream | This parameter decides whether the filterbank should be read as stream. |
| DM | The dispersion measure (DM) is used for performing dedispersion. |
| scale | The scale is used for performing downsampling the time series. |
| n | The `n` is the rowsize of chunks for reading the filterbank as stream. |
| size | The size parameter is used for deciding the size of the filterbank. |

After deciding which method to run for running the filterbank in a pipeline, it will measure the time it takes to run each method using `measure_method`. After running all the different methods, the constructor will append the results (a dictionary) to a txt file.

## 7.2 Read rows

The `read_rows` method reads the Filterbank data row per row. Because it only reads the filterbank per row, it is unable to execute most methods. The alternative for this method is the `read_n_rows` method, which is able to run all methods.

```
pipeline.Pipeline(<filterbank_file>, as_stream=True)
```

## 7.3 Read n rows

The `read_n_rows` method first splits all the filterbank data into chunks of n samples. After splitting the filterbank data in chunks, it will run the different modules of the pipeline for each chunk. The remaining data, that which does not fit into the sample size, is currently ignored.

The `n` or sample size should be a power of 2 multiplied with the given scale for the downsampling.

```
pipeline.Pipeline(<filterbank_file>, n=<size> , as_stream=True)
```

## 7.4 Read static

The `read_static` method reads the entire filterbank at once, and applies each method to the entire dataset. If the filterbank file is too large for running it in-memory, the alternative is using `read_n_rows`.

```
pipeline.Pipeline(<filterbank_file>)
```

## 7.5 Measure methods

The `measure_methods` is ran for each of the above methods, and calculates the time it takes to run each of the different methods. For each method it will create a key using the name of the method, and save the time it took to run the method as a value.
At the end, it will returns a dictionary with all the keys and values.
25 changes: 25 additions & 0 deletions examples/run_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""
Script for running the pipeline
"""
#pylint: disable-all
import os,sys,inspect
CURRENT_DIR = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
PARENT_DIR = os.path.dirname(CURRENT_DIR)
sys.path.insert(0, PARENT_DIR)
from pipeline.pipeline import Pipeline

# init filterbank filename
fil_name = os.path.abspath("filterbank.fil")
# init filterbank sample size
sample_size = 49152
# init times the pipeline should run
n_times = 10

# run the filterbank n times
for i in range(n_times):
# read static
Pipeline(filename=fil_name, size=sample_size)
# read stream, row per row
Pipeline(filename=fil_name, as_stream=True)
# read stream, n rows
Pipeline(filename=fil_name, as_stream=True, n=sample_size)
5 changes: 5 additions & 0 deletions filterbank/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
Export filterbank methods
"""
from .filterbank import Filterbank
from .filterbank import read_header
16 changes: 7 additions & 9 deletions filterbank/filterbank.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ def __init__(self, filename, freq_range=None, time_range=None, read_all=False):
"""
if not os.path.isfile(filename):
raise FileNotFoundError(filename)
# iterator for stream
self.stream_iter = 0
# header values
self.data, self.freqs, self.n_chans_selected = None, None, None
self.filename = filename
self.header = read_header(filename)
Expand All @@ -51,6 +50,8 @@ def __init__(self, filename, freq_range=None, time_range=None, read_all=False):
self.fil.seek(int(self.ii_start * self.n_bytes * self.n_ifs * self.n_chans), 1)
# find possible channels
self.i_0, self.i_1 = self.setup_chans(freq_range)
# number if stream iterations
self.stream_iter = (self.n_samples * self.n_ifs)
# read filterbank at once
if read_all:
self.read_filterbank()
Expand Down Expand Up @@ -84,8 +85,8 @@ def next_row(self):

returns False if EOF
"""
if self.stream_iter < (self.n_samples * self.n_ifs):
self.stream_iter += 1
if self.stream_iter > 0:
self.stream_iter -= 1
# skip bytes
self.fil.seek(self.n_bytes * self.i_0, 1)
# read row of data
Expand All @@ -104,11 +105,8 @@ def next_n_rows(self, n_rows):

returns False if EOF
"""
if self.stream_iter < (self.n_samples * self.n_ifs):
# more rows requested than available
if self.stream_iter + n_rows >= self.n_samples * self.n_ifs:
n_rows = self.n_samples * self.n_ifs - self.stream_iter
self.stream_iter += n_rows
if self.stream_iter - n_rows > 0:
self.stream_iter -= n_rows
# init array of n rows
data = np.zeros((n_rows, self.n_chans_selected), dtype=self.dd_type)
for row in range(n_rows):
Expand Down
2 changes: 1 addition & 1 deletion fourier/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
import file for fourier.py
"""
from .fourier import fft_freq, fft_vectorized, dft_slow, fft_matrix, fft_shift
from .fourier import fft_freq, fft_vectorized, dft_slow, fft_matrix, fft_shift, ifft
4 changes: 4 additions & 0 deletions pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""
Export pipeline methods
"""
from .pipeline import Pipeline
146 changes: 146 additions & 0 deletions pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""
Pipeline for running all the modules in order
"""
# pylint: disable=wrong-import-position
import os
import sys
import inspect
CURRENT_DIR = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
PARENT_DIR = os.path.dirname(CURRENT_DIR)
sys.path.insert(0, PARENT_DIR)
from timeit import default_timer as timer
import filterbank.filterbank
import timeseries.timeseries
import clipping
import dedisperse
import fourier

# pylint: disable=too-many-locals
# pylint: disable=too-many-arguments
# pylint: disable=invalid-name
# pylint: disable=no-self-use

class Pipeline:
"""
The Pipeline combines the functionality of all modules
in the library.
"""

def __init__(self, filename=None, as_stream=False, DM=230, scale=3, n=None, size=None):
"""
Initialize Pipeline object

Args:
as_stream, read the filterbank data as stream
"""
if as_stream:
if n:
result = self.read_n_rows(n, filename, DM, scale)
file = open("n_rows_filterbank.txt", "a+")
else:
result = self.read_rows(filename)
file = open("rows_filterbank.txt", "a+")
else:
result = self.read_static(filename, DM, scale, size)
file = open("static_filterbank.txt", "a+")
file.write(str(result) + ",")
file.close()


def read_rows(self, filename):
"""
Read the filterbank data as stream
and measure the time
"""
# init filterbank as stream
fil = filterbank.Filterbank(filename)
time_start = timer()
while True:
fil_data = fil.next_row()
if isinstance(fil_data, bool):
break
time_stop = timer() - time_start
return time_stop


def read_n_rows(self, n, filename, DM, scale):
"""
Read the filterbank data as stream
and measure the time
"""
fil = filterbank.Filterbank(filename)
stopwatch_list = list()
while True:
stopwatch = dict.fromkeys(['time_read', 'time_select', 'time_clipping', 'time_dedisp',
'time_t_series', 'time_downsample', 'time_fft_vect',
'time_dft', 'time_ifft', 'time_fft_freq'])
time_start = timer()
fil_data = fil.next_n_rows(n)
# break if EOF
if isinstance(fil_data, bool):
break
stopwatch['time_read'] = timer() - time_start
# run methods
stopwatch = self.measure_methods(stopwatch, fil_data, fil.freqs, DM, scale)
stopwatch_list.append(stopwatch)
return stopwatch_list


def read_static(self, filename, DM, scale, size):
"""
Read the filterbank data at once
and measure the time per function/class
"""
stopwatch = dict.fromkeys(['time_read', 'time_select', 'time_clipping', 'time_dedisp',
'time_t_series', 'time_downsample', 'time_fft_vect', 'time_dft',
'time_ifft', 'time_fft_freq'])
time_start = timer()
# init filterbank
fil = filterbank.Filterbank(filename, read_all=True, time_range=(0, size))
stopwatch['time_read'] = timer() - time_start
# select data
time_select = timer()
freqs, fil_data = fil.select_data()
stopwatch['time_select'] = timer() - time_select
# run methods
stopwatch = self.measure_methods(stopwatch, fil_data, freqs, DM, scale)
return stopwatch


def measure_methods(self, stopwatch, fil_data, freqs, DM, scale):
"""
Run and time all methods/modules
"""
# clipping
time_clipping = timer()
_, _ = clipping.clipping(freqs, fil_data)
stopwatch['time_clipping'] = timer() - time_clipping
# dedisperse
time_dedisp = timer()
fil_data = dedisperse.dedisperse(fil_data, DM)
stopwatch['time_dedisp'] = timer() - time_dedisp
# timeseries
time_t_series = timer()
time_series = timeseries.Timeseries(fil_data)
stopwatch['time_t_series'] = timer() - time_t_series
# downsample
time_downsamp = timer()
time_series = time_series.downsample(scale)
stopwatch['time_downsample'] = timer() - time_downsamp
# fft vect
time_fft_vect = timer()
fourier.fft_vectorized(time_series)
stopwatch['time_fft_vect'] = timer() - time_fft_vect
# dft
time_dft = timer()
fourier.dft_slow(time_series)
stopwatch['time_dft'] = timer() - time_dft
# ifft
time_ifft = timer()
fourier.ifft(time_series)
stopwatch['time_ifft'] = timer() - time_ifft
# fft freq
time_fft_freq = timer()
fourier.fft_freq(10)
stopwatch['time_fft_freq'] = timer() - time_fft_freq
return stopwatch
1 change: 0 additions & 1 deletion plot/waterfall.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ def update_plot_labels(self):
else:
freq_range = ((center_freq - sample_freq/2)/1e6,\
(center_freq + sample_freq*(self.scans_per_sweep - 0.5))/1e6)
print(self.image)
self.image.set_extent(freq_range + (0, 1))
self.fig.canvas.draw_idle()

Expand Down
1 change: 1 addition & 0 deletions tests/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
import dedisperse
import filterbank.header as header
import filterbank.filterbank as filterbank
import pipeline.pipeline as pipeline
5 changes: 1 addition & 4 deletions tests/test_clipping.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,9 @@ def test_filter_channels(self):
When filtering samples, all channels
with a relative high power should be removed
"""
result_channels, result_samples = clipping.filter_channels(
self.fil_chans, np.array(self.fil_vector))
result_channels = np.array(clipping.filter_channels(np.array(self.fil_vector)))
expect_channels = np.delete(self.fil_chans, self.bad_channels)
expect_samples = np.delete(self.fil_vector, self.bad_channels, axis=1)
self.assertEqual(result_channels.all(), expect_channels.all())
self.assertEqual(result_samples.all(), expect_samples.all())

def test_filter_indv_channels(self):
"""
Expand Down
Loading