Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

variant filter #6

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
137 changes: 137 additions & 0 deletions mosqito/functions/variant_filter/variant_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# -*- coding: utf-8 -*-
"""
Created on Sat Dec 12 12:28:15 2020

@author: josem
"""

import scipy.signal as sig
from mosqito.functions.shared.load import load

# FIR filter definition
#Parameters:
# Required input definitions are as follows;
# data: The data to be filtered
# fs: Sampling frecuency
# fc: The centerline frequency to be filtered
# band: The bandwidth around the centerline frequency that you wish to filter
#Returns
# Filtered data

def fir_notch_filter(data, fs, fc, band):
#Order of the FIR filter
numtaps = 999
nyq = fs / 2

#Lower and upper filter cut-off frequencies are determined
if fc < band / 2:
band = fc / 2
low = (fc - band / 2) / nyq
high = (fc + band / 2) / nyq
else:
low = (fc - band / 2) / nyq
high = (fc + band / 2) / nyq

#The coefficients of a lowpass and a highpass filters are obtained, using Hamming window
h_l = sig.firwin(
numtaps,low, window = 'blackman', pass_zero='lowpass'
)
h_h = sig.firwin(
numtaps,high, window = 'blackman', pass_zero='highpass'
)

#Filters the data applying the obtained coefficients
filtered_data = sig.filtfilt(h_l, 1, data, padlen=0) + sig.filtfilt(h_h, 1, data, padlen=0)

return filtered_data

# IIR filter definition
#Parameters:
# data: The data to be filtered
# fs: Sampling frecuency
# fc: The centerline frequency to be filtered
# band: The bandwidth around the centerline frequency that you wish to filter
#Return:
# Filtered data

def iir_notch_filter(data, fs, fc, band):
#Determines the quality factor
Q = fc / band

#The coefficients of a band remover filter are obtained
b, a = sig.iirnotch(fc, Q, fs = fs)

#Filters the data applying the obtained coefficients
filtered_data = sig.filtfilt(b, a, data, padlen=0)

return filtered_data

# Variant filter definition
#Parameters:
# signal: Signal to be filtered
# track: Signal to track the harmonics
# ftype: Type of filter. 'FIR' or 'IIR'
# harmonic_order: Order of the harmonic to filter
#Returns:
# Original signal
# Filtered signal
# Sampling frequency

def variant_filter(original_signal, signal_tracking, ftype, harmonic_order, att):
#Load original and tracking data
if(isinstance(signal_tracking, str)):
signal_tracking, fs = load(False, signal_tracking, calib = 1)

if(isinstance(original_signal, str)):
original_signal, fs = load(False, original_signal, calib = 2 * 2**0.5 )

if not(isinstance(signal_tracking, str) or isinstance(original_signal, str)):
fs = 48000

#Band selector
if(ftype == 'fir'):
if(att == 3):
band = 25 #Atenuation = 3 dB (FIR)
if (att == 6):
band = 130 #Atenuation = 6 dB (FIR and IIR)
if(att == 9):
band = 330 #Atenuation = 9 dB (FIR and IIR)
if(att == 12):
band = 600 #Atenuation = 12 dB (FIR)
if(ftype == 'iir'):
if(att == 3):
band = 55 #Atenuation = 3 dB (IIR)
if(att == 6):
band = 150 #Atenuation = 6 dB (FIR and IIR)
if(att == 9):
band = 330 #Atenuation = 9 dB (FIR and IIR)
if(att == 12):
band = 500 #Atenuation = 12 dB (IIR)

i = int(fs / 16) #Segment size
j = 0
h = int(fs / 32) #Segment size to overlap
filtered_signal = []

while i <= len(signal_tracking):

#Value of RPM
rpm = signal_tracking[j]

#The center frequency is calculated
if(rpm == 0):
fc = 1
else:
fc = (harmonic_order * rpm) / 6

#The filter is applied
if ftype == 'fir':
filtered_signal[j:i] = fir_notch_filter(original_signal[j:i], fs, fc, band)

if ftype == 'iir':
filtered_signal[j:i] = iir_notch_filter(original_signal[j:i], fs, fc, band)

j = i - h
i = int(j + fs / 16)

return original_signal, filtered_signal, fs
Empty file.
184 changes: 184 additions & 0 deletions mosqito/tests/variant_filter/final_test_variant_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# -*- coding: utf-8 -*-
"""
Created on Sat Dec 12 12:28:15 2020

@author: josem
"""

import numpy as np
import matplotlib as mpl
import scipy.signal as sig
import pylab as plt
from mosqito.functions.shared.load import load
from matplotlib import gridspec
from scipy.io.wavfile import write
from mosqito.functions.variant_filter.variant_filter import variant_filter

#Filter parameters
harmonic_order = 1 #harmonics 1, 2, 3, ...
att = 12

signal = "mosqito/tests/variant_filter/signals/RUN5_RunUp_60s_Track1_Rec0.UFF"
track = "mosqito/tests/variant_filter/signals/RUN5_RunUp_60s_RPM-profile_Rec0.UFF"

#Aplication of the variant_filter function
original_signal, fir_filtered_signal, Fs = variant_filter(
signal, track,'fir', harmonic_order, att
)
original_signal, iir_filtered_signal, Fs = variant_filter(
signal, track,'iir', harmonic_order, att
)

#Scale signals to integers to write a wav file
scale = np.max([np.max(np.abs(original_signal)), np.max(np.abs(iir_filtered_signal))])
scaled_o = np.int16(original_signal / scale * 32767)
scaled_fir = np.int16(fir_filtered_signal / scale * 32767)
scaled_iir = np.int16(iir_filtered_signal / scale * 32767)

write('motor_signal.wav', Fs, scaled_o)
write('FIR_filtered_motor_signal_'+str(harmonic_order)+' harmonic.wav', Fs, scaled_fir)
write('IIR_filtered_motor_signal_'+str(harmonic_order)+' harmonic.wav', Fs, scaled_iir)

#RPM signal
rpm_signal, fs = load(False, track, calib = 1)
l_rpm=len(rpm_signal)
t_rpm = np.arange(0, l_rpm)/ fs # Time interval in seconds

#Plotting results

#Average spectrum of 24000 samples
plt.figure()

f, Pxx_spec = sig.welch(
original_signal[1200000:1224000], fs, 'flattop', 1024, scaling='spectrum'
)

plt.semilogy(f, np.sqrt(Pxx_spec), color = 'darkred')
plt.xlabel('frequency [Hz]')
plt.ylabel('Linear spectrum [V RMS]')

f, Pxx_spec = sig.welch(
fir_filtered_signal[1200000:1224000], fs,'flattop', 1024, scaling='spectrum'
)
plt.semilogy(f, np.sqrt(Pxx_spec), color = 'darkblue', linestyle='--')
plt.xlabel('frequency [Hz]')
plt.ylabel('Linear spectrum [V RMS]')

f, Pxx_spec = sig.welch(
iir_filtered_signal[1200000:1224000], fs,'flattop', 1024, scaling='spectrum'
)
axes = plt.axes()
axes.set_xlim([0,3000])
plt.semilogy(f, np.sqrt(Pxx_spec), color='darkgreen', linestyle='--')
plt.xlabel('frequency [Hz]')
plt.ylabel('Linear spectrum [V RMS]')
plt.title('Power Spectrum')
plt.show()

#Plot Spectrogram original signal
NFFT = 4096
noverlap = NFFT / 2
vmin = 20*np.log10(np.max(original_signal)) - 70

#Set height ratios for subplots
gs = gridspec.GridSpec(2, 1, height_ratios=[2, 1])

ax1 = plt.subplot(gs[0])
pxx, freq, t, cax = plt.specgram(
original_signal,
NFFT=NFFT,
Fs=Fs,
vmin=vmin,
noverlap=noverlap,
cmap = plt.cm.jet,
window=plt.window_none
)
plt.ylabel('Frequency [Hz]')
plt.title('Original Signal')
plt.axis([0,37,0,3000])
ax1.tick_params(axis='x', direction='in')

ax2 = plt.subplot(gs[1], sharex=ax1)
plt.setp(ax1.get_xticklabels(), visible=False)
plt.plot(t_rpm, rpm_signal)
plt.xlabel('Time [s]')
plt.ylabel('RPM')
plt.subplots_adjust(bottom=0.1, right=0.94, top=0.9, hspace=0.05)

norm = mpl.colors.Normalize(vmin=-25, vmax=30)
cax = plt.axes([0.95, 0.38, 0.025, 0.52])
plt.colorbar(
mpl.cm.ScalarMappable(cmap = plt.cm.jet, norm=norm),
cax=cax).set_label('PSD [dB/Hz]', labelpad=-20, y=1.1, rotation=0)

plt.show()

#Plot Spectrogram filtered signal FIR
ax1 = plt.subplot(gs[0])
pxx, freq, t, cax = plt.specgram(
fir_filtered_signal,
NFFT=NFFT,
Fs=Fs,
vmin=vmin,
noverlap=noverlap,
cmap = plt.cm.jet,
window=plt.window_none
)
plt.ylabel('Frequency [Hz]')
plt.title('FIR Filtered signal')
plt.axis([0,37 ,0,3000])
ax1.tick_params(axis='x', direction='in')

ax2 = plt.subplot(gs[1], sharex=ax1)
plt.setp(ax1.get_xticklabels(), visible=False)
plt.plot(t_rpm, rpm_signal)
plt.xlabel('Time [s]')
plt.ylabel('RPM')

plt.subplots_adjust(bottom=0.1, right=0.94, top=0.9, hspace=0.05)

cax = plt.axes([0.95, 0.38, 0.025, 0.52])
plt.colorbar(
mpl.cm.ScalarMappable(
cmap = plt.cm.jet,
norm=norm
),
cax=cax
).set_label('PSD [dB/Hz]', labelpad=-20, y=1.1, rotation=0)

plt.show()

#Plot Spectrogram filtered signal IIR
ax1 = plt.subplot(gs[0])
pxx, freq, t, cax = plt.specgram(
iir_filtered_signal,
NFFT=NFFT,
Fs=Fs,
vmin=vmin,
noverlap=noverlap,
cmap = plt.cm.jet,
window=plt.window_none
)
plt.ylabel('Frequency [Hz]')
plt.title('IIR Filtered signal')
plt.axis([0,37 ,0,3000])
ax1.tick_params(axis='x', direction='in')

ax2 = plt.subplot(gs[1], sharex=ax1)
plt.setp(ax1.get_xticklabels(), visible=False)
plt.plot(t_rpm, rpm_signal)
plt.xlabel('Time [s]')
plt.ylabel('RPM')

plt.subplots_adjust(bottom=0.1, right=0.94, top=0.9, hspace=0.05)

cax = plt.axes([0.95, 0.38, 0.025, 0.52])
plt.colorbar(
mpl.cm.ScalarMappable(
cmap = plt.cm.jet,
norm=norm
),
cax=cax
).set_label('PSD [dB/Hz]', labelpad=-20, y=1.1, rotation=0)

plt.show()
Loading