In [1]:
import scipy.stats as sps
import numpy as np
import matplotlib.pyplot as plt 
import random
from scipy.fft import rfft, rfftfreq
from scipy.signal import find_peaks



Task1.

First, we can generate test data(GENERATE_TEST_DATA = True) (or can just put *.npy files)

Second, we solve task per se. By fft we get top frequencies for mixtures and pure signals and compare top frequencies.

In [2]:
SAMPLE_RATE = 44100  # Гц #todo check it's not important
DURATION = 5  # Секунды

GENERATE_TEST_DATA = True

MIXTURE_LENGTH = 10000
SIGNAL_LENGTH = 1024
MIXTURES_NUM = 1000
SIGNALS_NUM = 100

def get_wave(freq1, freq2, sample_rate, duration, a = 1, b = 0, c = 0, d = 0):
    x = np.linspace(0, duration, sample_rate*duration, endpoint=False)[:MIXTURE_LENGTH]
    frequency1 = (x * freq1)
    frequency2 = (x * freq2)
    y = a* np.sin((2 * np.pi) * frequency1) + b* np.sin((2*np.pi + np.pi*c) * frequency2)    
    return x, y

def generate_data():
    y = []
    params = []
    for i in range(SIGNALS_NUM):

        params.append(
            (100 + 50*i, 
             5000 +  50*i,
             random.randint(2, 6),
             random.randint(2, 6),
             0,#random.randint(2, 2),
             0))#random.random()))
    shifts = [random.randint(0, 9000)] * 100
    ratios = [0.3] * 1000
    signals = []
    for i in range(MIXTURES_NUM):
        p = params[i % 100]
        _,wave = get_wave(p[0], p[1], SAMPLE_RATE, DURATION, p[2], p[3], p[4], p[5])
        len(wave)
        if (i < SIGNALS_NUM):
            signals.append(wave[shifts[i]:(shifts[i] + SIGNAL_LENGTH)])
        _,noise = get_wave(random.randint(10000, 18000),
                           random.randint(10000, 18000),
                           SAMPLE_RATE, DURATION,
                           1, 
                           1,
                           0,
                           0)
        y.append( wave + ratios[i] * noise)

    with open('signals.npy', 'wb') as f:
        np.save(f,signals)
    with open('mixtures.npy', 'wb') as f:
        np.save(f,y)    

if GENERATE_TEST_DATA:
    generate_data()

The main method do_task1().

In [3]:
def get_fxy(signal):
    yf = rfft(signal)
    xf = rfftfreq(len(signal), 1 / SAMPLE_RATE)
    return xf,yf

def get_indexes(xf, yf, length):   
    return find_peaks(np.abs(yf)/length, height=0.01)[0]



# return fft results and peaks indexes
def get_data(signal):
    xf, yf = get_fxy(signal)
    indexes = get_indexes(xf, yf, len(signal))
    return xf, yf, indexes

#compare frequencies as L1
def get_distance(a, b, peaks1, peaks2):
    #TODO add peaks magnitude comparison
    return np.sum(np.abs(a-b))


def do_task1():
    with open('signals.npy', 'rb') as f:
        signals = np.load(f)
        
    with open('mixtures.npy', 'rb') as f:
        y = np.load(f)
      
    signal_data = []
    
    #gather info about signals ahead (results of fft and peak indexes)
    for signal in signals:
        signal_data.append(get_data(signal))
    
    result = []
    for i in range(len(y)):
        yi = y[i]
        #fft results and peak indexes
        xf_y, yf_y, ind_y = get_data(yi)
        #peak frequencies
        f_y = xf_y[ind_y]

        diff = 30000
        #this index will contain index of most appropriate pure signal
        index = 0
        for j in range(len(signals)):
            #get pure signal data (gathered before)
            xf_s, yf_s, ind_s = signal_data[j]        
            #top frequencies for pure signal
            f_s = xf_s[ind_s]        
            # if mixed signal contains less top freq than pure signal, give up with this 
            if (len(ind_y) <= len(ind_s)):
                break
            #compare top frequencies of mixture and pure signal
            new_diff = get_distance(f_y[0:len(ind_s)],f_s, None, None)  
            #select the most appropriate
            if (new_diff < diff):
                index = j
                diff=new_diff
                
        #this assert for my test data
        #assert  i%100 == index, f"Not correct for {i} and {index}"
         
        # So we know pure signal now, let's do rest of computations    
        xf_s, yf_s, ind_s = signal_data[index]
        freq_num = len(ind_s)    
        phase_shift = np.abs(np.angle(yf_s[ind_s[0]]) - np.angle(yf_y[ind_y][0]))
        signal_max_power = np.max(np.abs(yf_y[ind_y[0:freq_num]]))
        noise_max_power = np.max(np.abs(yf_y[ind_y[freq_num:]]))
        snr_in_dB = 10 * np.log10(signal_max_power/noise_max_power)                        

        result.append((index, phase_shift, snr_in_dB))
        
        
do_task1()

Task2.

We will use chi2 criterion.
<ol>
<li>For each distribution: we split axis into k intervals, in each interval area under density curve will be the same (point corresponds to percentiles); </li>
<li>For each distribution: we count how many values would contain each interval for given n (size of the array), so these are our expected values;</li> 
<li>In practice: results of the (1) - computed intervals I would add in dictinary to don't compute for each k; </li>
<li>Then we need compute how many values from our array (each of 1024 arrays) will be in each interval (k intervals for each distribution);</li> 
<li>we make chi2 for observed values (step4) and expected values(step2) and make argmax to find the best answer. As an improvement we could use some threshold how much we sure about answer (say pvalue > 0.8)...</li>
</ol>    

In [4]:
#our test array
dim0 = 1024
#how many distributions we have
distr_num = 4

np.random.seed(42)
random.seed(42)

norm_distr = sps.norm()
expon_distr = sps.expon()              
chi_distr = sps.chi2(df=1) 
gamma_distr = sps.gamma(scale=2,a=3) #scale==teta, k=?a (not sure)
distrs = [norm_distr, expon_distr, chi_distr, gamma_distr]
distr_num = len(distrs)

def get_test_data():    
    # min value for n is 10 as we need at least 2 interval with 5 elements
    n=1000 #length of one array
    #randomly choose distribution for test data
    distr_nums = np.random.randint(0, high = distr_num, size=dim0)
    test_data = []
    for i in range(dim0):
        test_data.append(distrs[distr_nums[i]].rvs(n))
    test_data = np.array(test_data)
    return distr_nums, test_data

In [5]:
distr_nums, test_data = get_test_data()

# values for k(number of intervals) were taken from there: https://ami.nstu.ru/~headrd/seminar/publik_html/Z_lab_8.htm
def get_k(n):
    assert n>=10
    if n >= 10 and n < 40:
        return n/5
    elif n >= 40 and n <100:
        return 8
    elif n >= 100 and n < 500:
        return 10
    elif n >= 500 and n <1000:
        return 12
    else:
        return 20


def define_distr(test_data):
    n = test_data.shape[1]
    k=get_k(n)   
    points = []
    step = 1/k # area under density curve in each interval
    
    #compute interval boundaries, as we don't know n ahead and consequenly k, we compute it each time 
    #(but could use hash for more frequently used )
    for i in range(1, k):
        points.append([distr.ppf(i * step) for distr in distrs])
    points = np.array(points).T
    #number of expected values
    expected = [n/k]*k

    sums =[]
    test_data = np.sort(test_data, axis=1) 
    #for each arry
    for i in range(dim0):    
        #find indexes corresponding to interval boundaries
        indexes = np.searchsorted(test_data[i],points.reshape(-1),side='right')
        indexes = indexes.reshape(-1, k-1) 
        #count how many values in each interval
        sums.append([[len(x) for x in np.split(test_data[i], ind)] for ind in indexes])
    sums = np.array(sums)      
    return sps.chisquare(sums, expected, axis=2).pvalue.argmax(axis=1)


#assert np.array_equal(define_distr(test_data),np.array(distr_nums)), "May be something with test data? :)"
result = define_distr(test_data)
num_of_correct_answers = np.where(define_distr(test_data)== np.array(distr_nums), 1, 0).sum()
print("Accuracy: ", num_of_correct_answers/dim0)

Accuracy:  1.0
