In [1]:
import numpy as np
import scipy as sp
from scipy.io import wavfile
import matplotlib.pyplot as plt
import soundfile as sf
from skimage.measure import block_reduce
from scipy.signal import find_peaks
from scipy.fft import fft

%matplotlib widget

In [4]:
fs, lpg25 = wavfile.read("28_(18;41).wav")
lpg25 = lpg25[:, 1]

fs, lpg50 = wavfile.read("28_(18;31)-001.wav")
lpg50 = lpg50[:, 1]

fs, lpg100 = wavfile.read("28_(18;36).wav")
lpg100 = lpg100[:, 1]

plt.close('all')
plt.figure()
plt.plot(lpg50)
plt.show()

  """Entry point for launching an IPython kernel.
  after removing the cwd from sys.path.
  import sys


Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [9]:
# Data extraction
data = lpg100[start:start+length];

# Extract multiple spikes post noise bursts
def extract_trains(data, ds_ratio):
    ds = block_reduce(np.abs(data), (ds_ratio,), func=np.mean)
    peaks = find_peaks(ds, width=2000*30/ds_ratio)[0]
    chunks = [ds[int(a) : int(b)] for a, b in zip(peaks[:-1], peaks[1:])]

    ranges = []
    for a, b in zip(peaks[:-1], peaks[1:]):
        chunk = ds[int(a) : int(b)]
        mn = np.min(chunk) + .3 * (np.max(chunk) - np.min(chunk))
        mask = chunk < mn
        i1 = np.argwhere(mask)[0][0]
        i2 = np.argwhere(np.flip(mask))[0][0]
        ranges.append([a + i1 + 1, b - i2 - 3])

    timestamps = [np.arange(int(a*ds_ratio), int(b*ds_ratio)) for a, b in ranges]
    sliced = [data[int(a*ds_ratio) : int(b*ds_ratio)] for a, b in ranges]
    
    return timestamps, sliced
    
# Extract individual impulses
def extract_impulses(data, window = [100, 1000], thresh=3e6, max_impulses=30):
    data_copy = np.copy(data)
    last_impulse = np.inf
    impulses = []
    n_impulses = 0
    while last_impulse > thresh:
        i = np.argmax(np.abs(data_copy))
        last_impulse = np.abs(data_copy[i])
        impulses.append(data[i-window[0]:i+window[1]])
        data_copy[max([0, i-window[0]]):min([len(data), i+window[1]])] = 0
        n_impulses += 1
        if n_impulses > max_impulses:
            break
        
    return impulses[1:]

In [10]:
# Spectrum calculation and plotting
def calc_spectrum(y):
    N = len(y)
    yf = fft(y, N)
    return 20.0 * np.log10((2.0 / N) * (np.abs(yf[0:N//2])))

def plot_spectrum(y, fs, N=None, c=None, omit=None, linestyle='-'):
    T = 1.0/fs
    N = len(y) if N is None else N
    if N > 0:
        yf = fft(y, N)
        xf = np.linspace(0.0, 1.0/(2.0*T), N//2)
        color = np.array([1.0-c, 0.0, c]) if c is not None else None
        color = color * .5 if linestyle == '--' else color
        plt.plot(xf[:omit], 20 * np.log10((2.0 / N) * (np.abs(yf[0:N//2])))[:omit], linestyle=linestyle, color=color)
        plt.xscale('log')
        
plot_spectrum(np.array([1,2,3,4,5,6]), 100)

In [11]:
# Code to simulate and fit filter models

from numba import njit
from scipy.optimize import least_squares

class tpt():
    def __init__(self, cutoff):
        g = np.tan(np.pi * cutoff);
        self.coeff = g / (1.0 + g)
        self.s = 0
    
    def lp(self, x):
        x, self.s = tpt.eval_lp(x, self.coeff, self.s)
        return x
    
    def hp(self, x):
        lp = np.copy(x)
        lp, self.s = tpt.eval_lp(lp, self.coeff, self.s)
        return x - lp
    
    @njit
    def eval_lp(x, coeff, s):
        for i in range(len(x)):
            v = coeff*(x[i] - s)
            y = v + s
            s = y + v
            x[i] = y
        
        return x, s

class svf():
    def __init__(self, cutoff, res):
        g = np.tan(np.pi * cutoff)
        self.k = 2.0 - 2.0 * res
        
        self.ic2eq = 0.0
        self.ic1eq = 0.0
        self.a1 = 1.0 / (1.0 + g * (g + self.k))
        self.a2 = g * self.a1
        self.a3 = g * self.a2
    
    @njit
    def eval_lp(x, a1, a2, a3, k, ic1eq, ic2eq):
        for i in range(len(x)):
            v3 = x[i] - ic2eq
            v1 = a1 * ic1eq + a2 * v3
            v2 = ic2eq + a2 * ic1eq + a3 * v3
            ic1eq = 2.0 * v1 - ic1eq
            ic2eq = 2.0 * v2 - ic2eq
            x[i] = v2
    
        return x, ic1eq, ic2eq
    
    @njit
    def eval_hp(x, a1, a2, a3, k, ic1eq, ic2eq):
        for i in range(len(x)):
            v3 = x[i] - ic2eq
            v1 = a1 * ic1eq + a2 * v3
            v2 = ic2eq + a2 * ic1eq + a3 * v3
            ic1eq = 2.0 * v1 - ic1eq
            ic2eq = 2.0 * v2 - ic2eq
            x[i] = x[i] - k * v1 - v2
   
        return x, ic1eq, ic2eq
    
    def lp(self, x):
        result, ic1eq, ic2eq = svf.eval_lp(x, self.a1, self.a2, self.a3, self.k, self.ic1eq, self.ic2eq)
        self.ic1eq = ic1eq
        self.ic2eq = ic2eq
        return result
        
    def hp(self, x):
        result, ic1eq, ic2eq = svf.eval_hp(x, self.a1, self.a2, self.a3, self.k, self.ic1eq, self.ic2eq)
        self.ic1eq = ic1eq
        self.ic2eq = ic2eq
        return result

def fit_model(current_data):
    init = [440/fs, 5000.0/fs, 1.0]
    init[1] = .03 

    def model(x, p):
        k = np.zeros(len(x), dtype=np.float)
        k[0] = p[2]
        return tpt(p[0]).hp(svf(p[1], 0.0).lp(k))

    current_data = current_data / 1500000000.0
    data = calc_spectrum(current_data)
    valid = data > -90
    valid[0:2] = False
    valid[-90:] = False
    def residual(x, p):
        return (data - calc_spectrum(model(x, p)))[valid]

    res = least_squares(lambda x: residual(current_data, x), init, loss='soft_l1', method='trf', bounds=([0, 0, 0], [.9999, .9999, 10]), verbose=0, xtol=2.3e-16, ftol=1e-12) # , 
    
    return res.x, model(current_data, res.x)*1500000000.0

In [13]:
plt.close('all')

pulse = 2 * fs
start = int(3.50e6)
length = int(3e6)
ds_ratio = 800

timestamps, sliced = extract_trains(lpg50[start:], ds_ratio)
#lpg = 25; max_impulses = 5;
#lpg = 50; max_impulses = 8;
lpg = 100; max_impulses = 8;

gains = np.arange(1, 0, -0.05)

plt.figure(figsize=(12, 6))
for k in np.arange(1, np.min([len(sliced), 7])):
    print(k)
    
    plt.subplot(2, 3, k)
    plt.title(f"LPG: {lpg}%, Input gain: {20*np.log10(gains[k]):.3} dB")
    impulses = extract_impulses(sliced[k], max_impulses=max_impulses)
    
    for c, impulse in enumerate(impulses[:max_impulses]):
        if len(impulse) > 0:
            pars, sim = fit_model(impulse)
            plot_spectrum(impulse, fs, c=c/8)
            plot_spectrum(sim, fs, c=c/8, linestyle='--')
    
    plt.ylim([60, 130])
    plt.grid()
    
plt.tight_layout()

plt.savefig(f"lpg_{lpg}.png")

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

1


  """
  """
  """


2
3


  """
  """
  """
  """


4


  """


5


  """


6


  """


In [None]:
plt.close('all')


        
plt.figure()
impulses = extract_impulses(sliced[0], max_impulses=max_impulses)
#plot_spectrum(impulses[0] / 1500000000.0, fs=fs)

plt.plot(calc_spectrum(impulses[0] / 1500000000.0))

In [None]:



current_data = impulses[0]
pars, simulation = fit_model(current_data)

#result = residual(current_data, res.x)
#plt.close('all')
#plt.figure()
#plt.plot(result)

plt.close('all')
plt.figure()
plot_spectrum(current_data, fs=fs)
plot_spectrum(simulation, fs=fs, omit=-130)
plt.ylim([40, 130])

In [None]:
print(len())

In [None]:
ds = block_reduce(np.abs(data), (25000,), func=np.mean)
plt.plot(np.abs(np.diff(ds)))

In [None]:
from scipy.fft import fft


#data.shape

In [None]:
plt.figure()
impulses = extract_impulses(chunks[5][pulse:])
[plot_spectrum(impulse, fs) for impulse in impulses]


In [None]:
plt.figure()
plt.plot(impulses[0])

In [None]:
def extract_chunks(data, start, chunksize, chunks, ):
    return [data[start + i * chunksize : start + (i+1) * chunksize] for i in np.arange(chunks, dtype=np.int)], start + chunks * chunksize
        
# Slice up the data
pulse = 2 * fs
steps = int(1.0 / 0.05)
dataset = lpg25
start = 3527230 + 300
chunk = 3681529 - start + 400

chunks, ptr = extract_chunks(dataset, start, chunk, steps)

plt.figure()
plt.subplot(2, 1, 1)
plt.plot(np.abs(chunks[0][pulse:]))
plt.subplot(2, 1, 2)
#impulses = extract_impulses(chunks[0][pulse:])
#[plt.plot(impulse) for impulse in impulses]
plt.show()