From 8f6f7219afd66ba3ac3d09f3f364cafa27ca04d9 Mon Sep 17 00:00:00 2001 From: Balint Seeber Date: Mon, 24 Nov 2014 23:27:04 -0800 Subject: [PATCH] Added fft_tools and realtime_graph (for Cyberspectrum scanning tools) --- src/python/fft_tools.py | 108 +++++++++ src/python/realtime_graph.py | 416 +++++++++++++++++++++++++++++++++++ 2 files changed, 524 insertions(+) create mode 100644 src/python/fft_tools.py create mode 100644 src/python/realtime_graph.py diff --git a/src/python/fft_tools.py b/src/python/fft_tools.py new file mode 100644 index 0000000..8f3e3dc --- /dev/null +++ b/src/python/fft_tools.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# fft_tools.py +# +# Copyright 2014 Balint Seeber +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301, USA. +# +# + +import numpy +import math + +import sys + +def mirror_fft(data): + pos_len = (len(data) + 1) / 2 + pos = data[:pos_len] + neg = data[len(data) - pos_len:] + return numpy.concatenate((neg, pos)) + +# Assumes input values [0,1] +def calc_fft(samps, num_bins=None, log_scale=True, step=1, window=numpy.hamming, pad=True, adjust=True, verbose=False): # FIXME: step (when it was floating point, for more flexible overlap) + if num_bins is None: + num_bins = len(samps) + num_ffts = len(samps)/num_bins + point_count = num_ffts * num_bins + if verbose: print "Processing %d FFTs" % (num_ffts / step) + left_over = len(samps) - point_count + if point_count != len(samps): + if not pad: + if verbose: print "Skipping %d tail samples for FFT" % (left_over) + + fft_sum = numpy.zeros(num_bins) + fft_max = numpy.zeros(num_bins) + fft_min = numpy.ones(num_bins) + if window is None: + #window_points = numpy.ones(num_bins) + window_points = None + else: + window_points = window(num_bins) + + if pad: + pad_amount = num_bins - left_over + if isinstance(samps, list): + samps += [0]*pad_amount + elif isinstance(samps, numpy.ndarray): + samps = numpy.concatenate((samps, numpy.zeros(pad_amount))) + else: + raise Exception("Cannot pad unknown type '%s': %s" % (type(samps), str(samps))) + + cnt = 0 + for i in range(0, num_ffts, step): + cnt += 1 + data = numpy.array(samps[i*num_bins:i*num_bins + num_bins]) + if window_points is not None: + data *= window_points + fft = numpy.fft.fft(data) + fft = mirror_fft(fft) + fft = numpy.abs(fft) + + fft = (fft * fft) + fft_sum += fft + fft_min = numpy.minimum(fft, fft_min) + fft_max = numpy.maximum(fft, fft_max) + + #sys.stdout.write("%d " % (i)) + #sys.stdout.flush() + + #print + + fft_avg = fft_sum / float(cnt) + + if log_scale: + if verbose: + sys.stdout.write("Running logarithm...") + sys.stdout.flush() + adjust_amount = 0.0 + if adjust: + ref_scale = 2 + adjust_amount =(-20.0 * math.log10(num_bins) # Adjust for number of bins + -20.0 * math.log10(ref_scale/2)) # Adjust for reference scale + if window_points is not None: + window_power = sum(map(lambda x: x*x, window_points)) + adjust_amount += (-10.0 * math.log10(window_power/num_bins)) # Adjust for windowing loss + + fft_avg = (10.0 * numpy.log10(fft_avg)) + adjust_amount + fft_max = (10.0 * numpy.log10(fft_max)) + adjust_amount + fft_min = (10.0 * numpy.log10(fft_min)) + adjust_amount + + if verbose: + print "done." + + return (cnt, fft_avg, fft_min, fft_max) diff --git a/src/python/realtime_graph.py b/src/python/realtime_graph.py new file mode 100644 index 0000000..13b984d --- /dev/null +++ b/src/python/realtime_graph.py @@ -0,0 +1,416 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# realtime_graph.py +# +# Copyright 2014 Balint Seeber +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301, USA. +# +# + +# FIXME: +# Detect window close (e.g. wx._core.PyDeadObjectError) +# Replace horizontal line code with MPL's in-built one + +import numpy +import matplotlib +import matplotlib.pyplot as pyplot + +class realtime_graph(): + def __init__(self, title="Real-time Graph", sub_title="", x_range=None, show=False, parent=None, manual=False, pos=111, redraw=True, figsize=None, padding=None, y_limits=None, gui_timeout=0.1, data=None, x=None): + self.parent = parent + + if isinstance(x_range, float) or isinstance(x_range, int): + x_range = (0, x_range-1) + + self.title_text = title + self.sub_title_text = sub_title + self.x_range = x_range + self.y_limits = y_limits + + self.figsize = figsize + self.pos = pos + self.padding = padding + + self.figure = None + self.title = None + #self.plot = None + self.plots = [] + self.subplot = None # Axes + self.points = [] + + self._gui_timeout = gui_timeout + + self._horz_lines = [] + self._horz_lines_map = {} + + self._vert_lines = [] + self._vert_lines_map = {} + + if show: + self._create_figure(data=data, x=x, manual=manual, redraw=redraw) + + def _calc_agg_x_range(self, xx): + if len(xx) == 0: + return None + + agg_x_range = [None, None] + for _x in xx: + _x_range = (min(_x), max(_x)) + if agg_x_range[0] is None or _x_range[0] < agg_x_range[0]: + agg_x_range[0] = _x_range[0] + if agg_x_range[1] is None or _x_range[1] < agg_x_range[1]: + agg_x_range[1] = _x_range[1] + + return agg_x_range + + def _fuse_coords(self, data, x): + xx = [] + dd = [] + + if data is not None: + if not isinstance(data, list): + data = [data] + + for d in data: + if isinstance(d, tuple): + dd += [d[0]] + xx += [d[1]] + else: + dd += [d] + manual_x = False + if x is not None: + if isinstance(x, list): + if len(dd) <= len(x): + _x = x[len(dd)-1] + if _x: + xx += [_x] + else: + manual_x = True + else: + manual_x = True + else: + xx += [x] + else: + manual_x = True + + if manual_x: + xx += [numpy.linspace(0, len(d) - 1, len(d))] + + return xx, dd + + def clear(self, redraw=True): + for plot in self.plots: + self.subplot.lines.remove(plot) + self.plots = [] + if redraw: + self._redraw() + + def _create_figure(self, data=None, x=None, meta={}, redraw=True, manual=False): + if self.parent is None: + pyplot.ion() # Must be here + + kwds = {} + if self.figsize is not None: + kwds['figsize'] = self.figsize + self.figure = pyplot.figure(**kwds) # num=X + + if self.padding is not None: + self.figure.subplots_adjust(**self.padding) + + self.title = self.figure.suptitle(self.title_text) + if manual == False: + self.subplot = self.figure.add_subplot(self.pos) + else: + self.subplot = self.parent.figure.add_subplot(self.pos) + + if self.subplot is not None: + self.subplot.grid(True) + self.subplot.set_title(self.sub_title_text) + + xx, dd = self._fuse_coords(data, x) + + if self.x_range is None and len(xx) > 0: + self.x_range = self._calc_agg_x_range(xx) + + #if x is None: + #x = numpy.array([0]) + #if self.x_range is None and data is not None: + # self._calc_x_range(data) + #if self.x_range is not None: + # x = numpy.linspace(self.x_range[0], self.x_range[1], self.x_range[1]-self.x_range[0]) + + # if data is not None: + # self._calc_x_range(data) + # x = numpy.linspace(self.x_range[0], self.x_range[1], len(data[0])) + #else: + # self.x_range = (min(x), max(x)) # FIXME: Only if x_range is not None? + + #if data is None: + # data = numpy.array([0]*len(x)) + + #if data is not None and x is not None: + #self.plot, = pyplot.plot(x, data) + #self.plot, = self.subplot.plot(x, data) + + #self.plots += self.subplot.plot([(x, _y) for _y in data]) # FIXME + # for d in data: + # self.plots += self.subplot.plot(x, d) + + cnt = 0 + _meta = meta + for d in dd: + if isinstance(meta, list): + _meta = meta[cnt] + self.plots += self.subplot.plot(xx[cnt], d, **_meta) + cnt += 1 + + # This was moved left one indent level ('_apply_axis_limits' is safe) + + #self.plot.axes.grid(True) + #self.plot.axes.set_title(self.sub_title_text) + + #self.plot.axes.set_xlim([min(x),max(x)]) + self._apply_axis_limits() + + if redraw: + self._redraw() + + def _apply_axis_limits(self): + if self.x_range is not None: + #self.plot.axes.set_xlim(self.x_range) + self.subplot.set_xlim(self.x_range) + if self.y_limits is not None: + #self.plot.axes.set_ylim(self.y_limits) + self.subplot.set_ylim(self.y_limits) + + def _calc_x_range(self, data, store=True): + if isinstance(data, list): + #data = data[0] + max_len = 1 + for d in data: + if isinstance(d, tuple): + d = d[0] + max_len = max(max_len, len(d)) + x_range = (0, max_len - 1) + else: + if isinstance(data, tuple): + data = data[0] + + x_range = (0, len(data) - 1) + + if store: + self.x_range = x_range + + return x_range + + def set_y_limits(self, y_limits): + self.y_limits = y_limits + + def set_data(self, data, x=None, meta={}, auto_x_range=True, x_range=None, autoscale=True, redraw=False): # Added auto_x_range/x_range/autoscale before redraw + if data is None: + return + elif not isinstance(data, list): + data = [data] + + #self.figure.canvas.flush_events() + + if x_range is not None: + self.x_range = x_range + + xx, dd = self._fuse_coords(data, x) + + #if self.x_range is None: + # self._calc_x_range(data) + if (self.x_range is None or auto_x_range) and len(xx) > 0: + self.x_range = self._calc_agg_x_range(xx) + #print "Calculated agg X range:", self.x_range + self.subplot.set_xlim(self.x_range) + + #if x is None: + # x = numpy.linspace(self.x_range[0], self.x_range[1], len(data[0])) + #elif auto_x_range and x_range is None: + # self.x_range = (min(x), max(x)) + + cnt = 0 + _meta = meta + #for d in data: + for d in dd: + if isinstance(meta, list): + _meta = meta[cnt] + if cnt >= len(self.plots): + #self.plots += self.subplot.plot(x, d) + self.plots += self.subplot.plot(xx[cnt], d) + else: + #self.plots[cnt].set_data(x, d) + self.plots[cnt].set_data(xx[cnt], d) + cnt += 1 + + if autoscale: + # All three are necessary! + self.subplot.relim() + self.subplot.autoscale_view() + #self.plot.axes.set_xlim(self.x_range) + self._apply_axis_limits() + + if self.x_range is not None: + for line in self._horz_lines: # FIXME: Use line.get_data() + line_x, line_y = line.get_data() + value = line_y[0] + line.set_data(numpy.array([self.x_range[0], self.x_range[1]]), numpy.array([value, value])) + + if self.y_limits is not None: + for line in self._vert_lines: # FIXME: Use line.get_data() + line_x, line_y = line.get_data() + value = line_x[0] + line.set_data(numpy.array([value, value]), numpy.array([self.y_limits[0], self.y_limits[1]])) + + if redraw: + self._redraw() + + def update(self, data=None, title=None, sub_title=None, x=None, meta={}, auto_x_range=True, x_range=None, autoscale=True, points=None, clear_existing_points=True, redraw=True): + if title is not None: + self.set_title(title) + if sub_title is not None: + self.set_sub_title(sub_title) + if self.parent is None and self.figure is None: + self._create_figure(data=data, x=x, redraw=False) # FIXME: 'auto_x_range', 'x_range' + elif data is not None: + self.set_data(data=data, x=x, meta=meta, auto_x_range=auto_x_range, x_range=x_range, autoscale=autoscale) + if points is not None: + if clear_existing_points: + self.clear_points() + self.add_points(points) + if redraw: + self._redraw() + + def clear_points(self, redraw=False): + for line in self.points: + self.subplot.lines.remove(line) + self.points = [] + if redraw: + self._redraw() + + def add_points(self, points, marker='mo', redraw=False): + if len(points) == 0: + return + self.points += self.subplot.plot(numpy.array(map(lambda x: x[0], points)), numpy.array(map(lambda x: x[1], points)), marker) # FIXME: Better way to do this? + if redraw: + self._redraw() + + def redraw(self): + self._redraw() + + def _redraw(self, quick=False): + if self.parent is None: + try: + if self.figure is None: + self._create_figure(redraw=False) + self.figure.canvas.draw() + self.figure.canvas.flush_events() + if quick == False: + self.figure.canvas.start_event_loop(timeout=self._gui_timeout) + self.figure.canvas.flush_events() + except RuntimeError: + self._create_figure() + else: + self.parent._redraw(quick=quick) + + def run_event_loop(self, timeout=None): + if timeout is None: + timeout = self._gui_timeout + self.figure.canvas.start_event_loop(timeout=timeout) + + def go_modal(self): + if self.figure is None: + return False + return self.figure.canvas.start_event_loop() + + def set_title(self, title, redraw=False): + self.title_text = title + if self.title is not None: + self.title.set_text(title) + if redraw: + self._redraw() + + def set_sub_title(self, sub_title, redraw=False): + self.sub_title_text = sub_title + if self.subplot is not None: + self.subplot.set_title(sub_title) + #self.plot.axes.set_title(self.sub_title_text) # Same + if redraw: + self._redraw() + + def add_horz_line(self, value, color='red', linestyle='-', id=None, replace=True, redraw=False): + if id in self._horz_lines_map.keys(): + if not replace: + return + self.remove_horz_line(id) + line = matplotlib.lines.Line2D(numpy.array([self.x_range[0], self.x_range[1]]), numpy.array([value, value]), linestyle=linestyle, color=color) + self._horz_lines += [line] + if id is not None: + self._horz_lines_map[id] = line + self.subplot.add_line(line) + if redraw: + self._redraw() + + def remove_horz_line(self, id): + if not id in self._horz_lines_map.keys(): + return + line = self._horz_lines_map[id] + self._horz_lines.remove(line) + self.subplot.lines.remove(line) + del self._horz_lines_map[id] + + def add_vert_line(self, value, color='black', linestyle='-', id=None, replace=True, redraw=False): + if id in self._vert_lines_map.keys(): + if not replace: + return + self.remove_vert_line(id) + if self.y_limits is None: + return + line = matplotlib.lines.Line2D(numpy.array([value, value]), numpy.array([self.y_limits[0], self.y_limits[1]]), linestyle=linestyle, color=color) + self._vert_lines += [line] + if id is not None: + self._vert_lines_map[id] = line + self.subplot.add_line(line) + if redraw: + self._redraw() + + def remove_vert_line(self, id): + if not id in self._vert_lines_map.keys(): + return + line = self._vert_lines_map[id] + self._vert_lines.remove(line) + self.subplot.lines.remove(line) + del self._vert_lines_map[id] + + def save(self, output_name): + if self.parent is not None: + return self.parent.save(output_name) + self.figure.savefig(output_name) + return True + + def close(self): + pyplot.close(self.figure) + +def main(): + # FIXME: Plot something simple + return 0 + +if __name__ == '__main__': + main()