# DFT mittels Korrelation

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import scipy.signal
import scipy.io.wavfile

import ipywidgets
import IPython.display as ipd

%matplotlib inline
%config InlineBackend.figure_format = 'retina'

### Hilfsfunktionen

- `create_sine_wave(frequency, sampling_rate, length_in_sec)`: Erzeugung eines Sinussignals
- `create_cosine_wave(frequency, sampling_rate, length_in_sec)`: Erzeugung eines Kosinussignals
- `plot_signal(signal, sampling_rate, title)`: Plot eines Zeitsignals

In [None]:
def create_sine_wave(frequency, sampling_rate, length_in_sec, phase=0):
# Generierung eines Sinus-Signals

    # create time vector based on length_in_sec and sampling_rate
    time = np.arange(length_in_sec * sampling_rate) / sampling_rate
    # create sine wave
    sine_wave = np.sin(2 * np.pi * frequency * time + phase)
    
    return sine_wave

def create_cosine_wave(frequency, sampling_rate, length_in_sec, phase=0):
# Generierung eines Kosinus-Signals

    # create time vector based on length_in_sec and sampling_rate
    time = np.arange(length_in_sec * sampling_rate) / sampling_rate
    # create cosine wave
    cosine_wave = np.cos(2 * np.pi * frequency * time + phase)
    
    return cosine_wave
    

def plot_signal(signal, sampling_rate, title=None):
# Plot eines Zeitsignals

    fig, ax = plt.subplots(1, 1, figsize=(12,6))

    plt.plot(signal);
    
    ax.set(xlabel="Abtastwerte")
    ax.set(ylabel="Amplitude")
    ax.grid()
    
    # second axis
    ax2 = ax.twiny();
    ax2.set_xlim(ax.get_xlim())
    
    ax_ticks = ax.get_xticks()[1:-1]
    ax2_tick_labels = ax.get_xticks()[1:-1] / sampling_rate
    
    num_samples_shown = ax.get_xlim()[1] - ax.get_xlim()[0]
    time_shown = num_samples_shown / sampling_rate
    if time_shown < 1:
        ax2.set_xlabel("Zeit (ms)")
        ax2_tick_labels = [f"{x * 1000:.1f}" for x in ax2_tick_labels]
    else:
        ax2.set_xlabel("Zeit (Sek)")
        ax2_tick_labels = ['{:.2f}'.format(x) for x in ax2_tick_labels]

    ax2.set_xticks(ax_ticks)
    ax2.set_xticklabels(ax2_tick_labels)
    
    plot_title = title
    if plot_title is None:
        plot_title = f"Abtastrate: {sampling_rate} Hz"
    
     # we use y=1.14 to make room for the secondary x-axis
    # see: https://stackoverflow.com/questions/12750355/python-matplotlib-figure-title-overlaps-axes-label-when-using-twiny
    if plot_title is not None:
        ax.set_title(plot_title, y=1.108)
    
    fig.tight_layout()
    #return fig;

def plot_signal_spectrum(signal, freqs, correlation_results, title=None):
    
    fig, axes = plt.subplots(2, 1, figsize=(15,8))
    
    axes[0].set_title(title);
    
    axes[0].plot(signal)
    axes[0].grid()
    axes[0].set_xlabel("Abtastwerte");
    axes[0].set_ylabel("Amplitude");
    
    # second axis
    ax2 = axes[0].twiny();
    ax2.set_xlim(axes[0].get_xlim())
    
    ax_ticks = axes[0].get_xticks()[1:-1]
    ax2_tick_labels = axes[0].get_xticks()[1:-1] / sampling_rate
    
    num_samples_shown = axes[0].get_xlim()[1] - axes[0].get_xlim()[0]
    time_shown = num_samples_shown / sampling_rate
    if time_shown < 1:
        ax2.set_xlabel("Zeit (ms)")
        ax2_tick_labels = [f"{x * 1000:.1f}" for x in ax2_tick_labels]
    else:
        ax2.set_xlabel("Zeit (Sek)")
        ax2_tick_labels = ['{:.2f}'.format(x) for x in ax2_tick_labels]

    ax2.set_xticks(ax_ticks)
    ax2.set_xticklabels(ax2_tick_labels)

    axes[1].plot(freqs, correlation_results, color="orange");
    axes[1].grid()
    axes[1].set_xlabel("Frequenz [Hz]");
    axes[1].set_ylabel("Kreuzkorrelation");
    
    return axes

## Ein einfaches 18Hz-Signal

Wir starten mit einem einfachen 18Hz-Signal. Die Frage lautet: Können wir dieses Signals mittels Kreuzkorrelation detektieren?

In [None]:
sampling_rate = 800
length_in_sec = 0.5

signal_18Hz = create_cosine_wave(18, sampling_rate, length_in_sec)
plot_signal(signal_18Hz, sampling_rate, title="18Hz-Signal")

In [None]:
# Wir erzeugen uns nun Sinussignale mit der Frequenz 0...400Hz (wir müssen nur bis zur halben Abtastrate untersuchen)
nyquist_limit = sampling_rate // 2;

# Im Vektor correlation legen wir die Ergebnisse der Korrelation als Tupel (freq, max correlation) ab
correlation = []

for test_freq in range(1, nyquist_limit):
    
    test_signal = create_cosine_wave(test_freq, sampling_rate, length_in_sec);
    xcorr = np.correlate(signal_18Hz, test_signal, 'full')
    
    correlation.append((test_freq, np.max(xcorr)))

# Speichere Tupel-Elemente in 'freqs' und 'correlation_result'
freqs, xcorr_results = list(zip(*correlation))

# Grafische Darstellung der Ergebnisse
plot_signal_spectrum(signal_18Hz, freqs, xcorr_results, title="DFT mittels Kreuzkorrelation berechnet");

**Variante mit Anzeige der Frequenzen**

In [None]:
# Wir erzeugen uns nun Sinussignale mit der Frequenz 0...400Hz (wir müssen nur bis zur halben Abtastrate untersuchen)
nyquist_limit = sampling_rate // 2;

# Im Vektor correlation legen wir die Ergebnisse der Korrelation als Tupel (freq, max correlation) ab
correlation = []

for test_freq in range(1, nyquist_limit):
    
    test_signal = create_cosine_wave(test_freq, sampling_rate, length_in_sec);
    xcorr = np.correlate(signal_18Hz, test_signal, 'full')
    
    correlation.append((test_freq, np.max(xcorr)))

# Speichere Tupel-Elemente in 'freqs' und 'correlation_result'
freqs, xcorr_results = list(zip(*correlation))

# Grafische Darstellung der Ergebnisse
axes = plot_signal_spectrum(signal_18Hz, freqs, xcorr_results, title="DFT mittels Kreuzkorrelation berechnet");

# Identifikation der Frequenzen
xcorr_results           = np.array(xcorr_results)  # Konvertierung in NumPy-Vektor
xcorr_detected_freq_idx = np.argmax(xcorr_results) # An welchem Index befindet sich der höchster Korrelationswert?
xcorr_detected_freq     = freqs[xcorr_detected_freq_idx] # Wie lautet die dazugehörige Frequenz?

axes[1].plot(xcorr_detected_freq, xcorr_results[xcorr_detected_freq_idx], marker="x", color="red", alpha=0.8);
axes[1].text(xcorr_detected_freq + 3, xcorr_results[xcorr_detected_freq_idx] - 8, f"{xcorr_detected_freq} Hz", color="red");

## Vergleich mit 'echter' DFT

In [None]:
# Plot der Kreuzkorrelation und DFT
fig, axes = plt.subplots(2, 1, figsize=(15,8))

correlation_results_norm = np.array(xcorr_results) / (nyquist_limit - 1) 

axes[0].plot(freqs, correlation_results_norm, color="orange");
axes[0].grid()
axes[0].set_xlabel("Frequenz [Hz]");
axes[0].set_ylabel("Kreuzkorrelation");

axes[0].plot(xcorr_detected_freq, correlation_results_norm[xcorr_detected_freq_idx], marker="x", color="red", alpha=0.8);
axes[0].text(xcorr_detected_freq + 3, correlation_results_norm[xcorr_detected_freq_idx], f"{xcorr_detected_freq} Hz", color="red");

# Eingebaute DFT
spectrum, freqs_of_spectrum, line = axes[1].magnitude_spectrum(signal_18Hz, Fs= sampling_rate, color='r')
axes[1].grid()
axes[1].set_xlabel("Frequenz [Hz]");
axes[1].set_ylabel("Magnitude");

# Zustatzinformationen anzeigen
highest_mag_spectrum_idx = np.argmax(spectrum)
highest_mag = spectrum[highest_mag_spectrum_idx]
freq_at_highest_mag = freqs_of_spectrum[highest_mag_spectrum_idx]

axes[1].plot(freq_at_highest_mag, highest_mag, marker="x", color="black", alpha=0.8)
axes[1].text(freq_at_highest_mag + 3, highest_mag, f"{xcorr_detected_freq} Hz", color="black")

**Weitere Hilfsfunktionen**

In [None]:
def get_freq_and_amplitude_csv(freqs, amplitudes):
    '''Convenience function to convert a freq and amplitude array to a csv string'''
    freq_peaks_with_amplitudes_csv = ""
    for i in range(len(freqs)):
        freq_peaks_with_amplitudes_csv += f"{freqs[i]} Hz ({amplitudes[i]:0.2f})"
        if i + 1 is not len(freqs):
            freq_peaks_with_amplitudes_csv += ", "
    return freq_peaks_with_amplitudes_csv

def calc_and_plot_xcorr_dft(s, sampling_rate, 
                            time_domain_graph_title = None,
                            xcorr_freq_step_size = None,
                            xcorr_comparison_signal_length_in_secs = 0.5,
                            normalize_xcorr_result = True,
                            include_annotations = True,
                            minimum_freq_amplitude = 0.08,
                            y_axis_amplitude = True,
                            fft_pad_to = None,
                            title=None):
    
    total_time_in_secs = len(s) / sampling_rate
    
    # Enumerate through frequencies from 1 to the nyquist limit
    # and run a cross-correlation comparing each freq to the signal
    freq_and_correlation_results = [] # tuple of (freq, max correlation result)
    nyquist_limit = sampling_rate // 2
    
    freq_step_size = 1
    freq_seq_iter = range(1, nyquist_limit)
    if xcorr_freq_step_size is not None:
        freq_seq_iter = np.arange(1, nyquist_limit, xcorr_freq_step_size)
        freq_step_size = xcorr_freq_step_size
    
    progress_bar = ipywidgets.FloatProgress(value=1, min=1, max=nyquist_limit)
    ipd.display(progress_bar)
    num_comparisons = 0
    
    for test_freq in freq_seq_iter:
        signal_to_test = create_cosine_wave(test_freq, sampling_rate, xcorr_comparison_signal_length_in_secs)
        correlate_result = np.correlate(s, signal_to_test, 'full')

        # Add in the tuple of test_freq, max correlation result value
        freq_and_correlation_results.append((test_freq, np.max(correlate_result)))
        
        num_comparisons += 1
        progress_bar.value += freq_step_size

    # The `freq_and_correlation_results` is a list of tuple results with (freq, correlation result)
    # Unpack this tuple list into two separate lists freqs, and correlation_results
    correlation_freqs, correlation_results = list(zip(*freq_and_correlation_results))
    
    correlation_freqs = np.array(correlation_freqs)
    correlation_results = np.array(correlation_results)
    correlation_results_original = correlation_results
    
    cross_correlation_ylabel = "Cross-correlation result";
    if normalize_xcorr_result:
        correlation_results = correlation_results / (nyquist_limit - 1)
        cross_correlation_ylabel += " (normalized)";
        
    if y_axis_amplitude:
        correlation_results = (correlation_results_original / (nyquist_limit - 1)) * 2
        cross_correlation_ylabel = "Magnitude"

    # Plotting
    fig, axes = plt.subplots(3, 1, figsize=(15, 12));

    # Plot the signal in the time domain
    #makelab.signal.plot_signal_to_axes(axes[0], s, sampling_rate, title = time_domain_graph_title)
    axes[0].set_title(title);
    
    axes[0].plot(s)
    axes[0].grid()
    axes[0].set_xlabel("Abtastwerte");
    axes[0].set_ylabel("Amplitude");
    
    # second axis
    ax2 = axes[0].twiny();
    ax2.set_xlim(axes[0].get_xlim())
    
    ax_ticks = axes[0].get_xticks()[1:-1]
    ax2_tick_labels = axes[0].get_xticks()[1:-1] / sampling_rate
    
    num_samples_shown = axes[0].get_xlim()[1] - axes[0].get_xlim()[0]
    time_shown = num_samples_shown / sampling_rate
    if time_shown < 1:
        ax2.set_xlabel("Zeit (ms)")
        # format with 'g' causes insignificant trailing zeroes to be removed
        # https://stackoverflow.com/a/2440708 but also uses scientific notation, oh well!
        ax2_tick_labels = [f"{x * 1000:.1f}" for x in ax2_tick_labels]
    else:
        ax2.set_xlabel("Zeit (Sek)")
        ax2_tick_labels = ['{:.2f}'.format(x) for x in ax2_tick_labels]

    ax2.set_xticks(ax_ticks)
    ax2.set_xticklabels(ax2_tick_labels)
    
    axes[0].set_title(axes[0].get_title(), y=1.2);

    # Plot the signal correlations (our brute force approach)
    axes[1].plot(correlation_freqs, correlation_results, color="orange");
    axes[1].set_title("DFT mittels Kreuzkorrelation");
    axes[1].set_xlabel("Frequenz");
    axes[1].set_ylabel(cross_correlation_ylabel);
    axes[1].grid()

    # Plot the "ground truth" via an FFT
    # https://matplotlib.org/3.1.1/api/_as_gen/matplotlib.pyplot.magnitude_spectrum.html
    # Use zero padding to obtain a more accurate estimate of the sinusoidal amplitudes
    # See: https://www.mathworks.com/help/signal/ug/amplitude-estimation-and-zero-padding.html
    if fft_pad_to is None:
        fft_pad_to = len(s) * 4
    elif fft_pad_to == 0:
        fft_pad_to = None
    
    fft_spectrum, fft_freqs, line = axes[2].magnitude_spectrum(s, Fs = sampling_rate, color='r', pad_to = fft_pad_to);
    
    if y_axis_amplitude:
        # By default, the magnitude_spectrum plots half amplitudes (perhaps because it's
        # showing only one-half of the full FFT (the positive frequencies). But there is not
        # way to control this to show full amplitudes by passing in a normalization parameter
        # So, instead, we'll do it by hand here (delete the old line and plot the normalized spectrum)
        line.remove()
        fft_spectrum = np.array(fft_spectrum) * 2
        axes[2].plot(fft_freqs, fft_spectrum, color = 'r')
        axes[2].set_ylabel("Magnitude")
    
    axes[2].set_title("FFT via matplotlib.pyplot.magnitude_spectrum");
    axes[2].grid();

    # Find the number of peaks
    # https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.find_peaks_cwt.html
    # Du et al., Improved Peak Detection, Bioinformatics: https://academic.oup.com/bioinformatics/article/22/17/2059/274284
    # https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.find_peaks.html
    xcorr_dft_peaks_indices, xcorr_dft_peaks_props = scipy.signal.find_peaks(correlation_results, height = minimum_freq_amplitude)
    xcorr_freq_peaks = correlation_freqs[xcorr_dft_peaks_indices]
    xcorr_freq_peak_amplitudes = correlation_results[xcorr_dft_peaks_indices]
    xcorr_freq_peaks_csv = ", ".join(map(str, xcorr_freq_peaks))
    xcorr_freq_peaks_with_amplitudes_csv = get_freq_and_amplitude_csv(xcorr_freq_peaks, xcorr_freq_peak_amplitudes)
    xcorr_freq_peaks_array_str = [f"{freq} Hz" for freq in xcorr_freq_peaks]
    
    fft_peaks_indices, fft_peaks_props = scipy.signal.find_peaks(fft_spectrum, height = minimum_freq_amplitude)
    fft_freq_peaks = fft_freqs[fft_peaks_indices]
    fft_freq_peaks_amplitudes = fft_spectrum[fft_peaks_indices]
    fft_freq_peaks_csv = ", ".join(map(str, fft_freq_peaks))
    fft_freq_peaks_with_amplitudes_csv = get_freq_and_amplitude_csv(fft_freq_peaks, fft_freq_peaks_amplitudes)
    fft_freq_peaks_array_str = [f"{freq} Hz" for freq in fft_freq_peaks]
    
    # Print out frequency analysis info and annotate plots
    if include_annotations:
        axes[1].plot(xcorr_freq_peaks, xcorr_freq_peak_amplitudes, linestyle='None', marker="x", color="red", alpha=0.8);
        for i in range(len(xcorr_freq_peaks)):
            axes[1].text(xcorr_freq_peaks[i] + 2, xcorr_freq_peak_amplitudes[i], f"{xcorr_freq_peaks[i]} Hz", color="red");
            
        axes[2].plot(fft_freq_peaks, fft_freq_peaks_amplitudes, linestyle='None', marker="x", color="black", alpha=0.8);
        for i in range(len(fft_freq_peaks)):
            axes[2].text(fft_freq_peaks[i] + 2, fft_freq_peaks_amplitudes[i], f"{fft_freq_peaks[i]} Hz", color="black");
    
    print("**DFT mittels Kreuzkorrelation**")
    print(f"Anzahl Berechnungen: {num_comparisons}")
    print(f"Auflösung: {freq_step_size} Hz")
    #print(f"Found {len(xcorr_dft_peaks_indices)} freq peak(s) at: {xcorr_freq_peaks_csv} Hz")
    print(f"Schwellwert für minimale Amplitude: {minimum_freq_amplitude}")
    print(f"Gefundene Frequenzen und Amplituden: {xcorr_freq_peaks_with_amplitudes_csv} Hz")#

    print("")
    print("**FFT**")
    print(f"Anzahl Spektrallinien: {len(fft_freqs)}")
    print(f"Auflösung: {fft_freqs[1] - fft_freqs[0]} Hz")
    #print(f"Found {len(fft_peaks_indices)} freq peak(s) at: {fft_freq_peaks_csv} Hz")
    print(f"Schwellwert für minimale Amplitude: {minimum_freq_amplitude}")
    print(f"Gefundene Frequenzen und Amplituden: {fft_freq_peaks_with_amplitudes_csv} Hz")
    #print(f"Freq: {fft_freqs[fft_peaks_indices]} Hz")
    
    fig.tight_layout(pad=2)
    return (fig, axes, correlation_freqs, correlation_results, fft_freqs, fft_spectrum)

### Test mit neuer Frequenz (und erweiterter Analyse-Funktion)

In [None]:
sampling_rate = 800
length_in_sec = 0.5
frequency     = 23

signal = create_cosine_wave(frequency, sampling_rate, length_in_sec)

# Analyse und Plot
calc_and_plot_xcorr_dft(signal,
                        sampling_rate,
                        title=f"Signal mit $f={frequency}Hz$ und $f_s={sampling_rate}Hz$ abgetastet (Länge: {length_in_sec}s)");

## Analyse von Signalgemischen

Bisher haben wir nur einzelne Frequenzen analysiert. Im nachfolgenden Beispiel erzeugen wir nun ein komplexeres Signal mit drei unterschiedlichen Frequenzen.

- Frequenz 1: 10 Hz
- Frequenz 2: 42 Hz
- Frequenz 3: 151 Hz

In [None]:
sampling_rate = 800
total_time_in_secs = 0.5

freq1 = 10
freq2 = 42
freq3 = 151

# Erstellung der einzelnen Signalanteile
signal1 = create_cosine_wave(freq1, sampling_rate, total_time_in_secs)
signal2 = create_cosine_wave(freq2, sampling_rate, total_time_in_secs)
signal3 = create_cosine_wave(freq3, sampling_rate, total_time_in_secs)

# Kombination in einem einzelnen Signal
signal = signal1 + signal2 + signal3

calc_and_plot_xcorr_dft(signal,
                        sampling_rate,
                        title=f"Signalgemisch (Länge: {length_in_sec}s, $f_s={sampling_rate}Hz$)");

## Detektion unterschiedlicher Signalamplituden

Bisher hatten alle Signale die gleiche Amplitude. Lässt sich mit der Kreuzkorrelation neben der Frequenz auch die Signalamplitude bestimmen?

Dazu erzeugen wir uns aus den vorherigen Einzelsignalen ein Signalgemisch mit unterschiedlichen Amplituden:
- Signal 1: Amplitude = 0.8
- Signal 2: Amplitude = 0.1
- Signal 3: Amplitude = 0.1

Anschließend analysieren wir das Signalgemisch mit der Kreuzkorrelation und im Vergleich mit der FFT.

In [None]:
signal = 0.8 * signal1 + 0.1 * signal2 + 0.1 * signal3

calc_and_plot_xcorr_dft(signal,
                        sampling_rate,
                        title=f"Signalgemisch mit unterschiedlichen Amplituden (Länge: {length_in_sec}s, $f_s={sampling_rate}Hz$)");

Das Ganze können wir auch noch mal mit anderen Amplituden ausprobieren:

- Signal 1: Amplitude = 0.2
- Signal 2: Amplitude = 0.2
- Signal 3: Amplitude = 0.6

In [None]:
# Versuch mit anderen Amplituden

signal = 0.2 * signal1 + 0.2 * signal2 + 0.6 * signal3

calc_and_plot_xcorr_dft(signal,
                        sampling_rate,
                        title=f"Signalgemisch mit unterschiedlichen Amplituden (Länge: {length_in_sec}s, $f_s={sampling_rate}Hz$)");

Jetzt fügen wir noch weitere Signalanteile hinzu:

Signal 4: Frequenz = 222 Hz, Amplitude = 1.0
Signal 5: Frequenz = 347 Hz, Amplitude = 0.8

In [None]:
freq4 = 222
freq5 = 347

signal4 = create_cosine_wave(freq4, sampling_rate, total_time_in_secs)
signal5 = create_cosine_wave(freq5, sampling_rate, total_time_in_secs)

signal = 0.5*signal1 + 0.3*signal2 + 0.2*signal3 + 1*signal4 + 0.8*signal5

calc_and_plot_xcorr_dft(signal,
                        sampling_rate,
                        title=f"Signalgemisch mit unterschiedlichen Amplituden (Länge: {length_in_sec}s, $f_s={sampling_rate}Hz$)");

## Untersuchungen zur Präzision der Auflösung

Bisher wurden nur ganzzahlige Frequenzkomponenten untersucht. Was passiert, wenn man nun Frequenzen auswählt, die zwischen zwei ganzzahligen Werten liegen?

Für den Versuch erzeugen wir nun ein Signal mit folgenden Eigenschaften:

- Signal: Frequenz = 49,5 Hz, Amplitde = 1

Die Frage lautet, ob mittels Kreuzkorrelation die richtige Amplitude detektiert werden kann?

In [None]:
sampling_rate      = 800
total_time_in_secs = 0.5
freq               = 49.2

signal = create_cosine_wave(freq, sampling_rate, total_time_in_secs)

calc_and_plot_xcorr_dft(signal,
                        sampling_rate,
                        title=f"Signalgemisch (Länge: {length_in_sec}s, $f_s={sampling_rate}Hz$)");

Mit dem implementieren Verfahren der Korrelation lassen sich nur ganzzahlige Frequenzen detektieren, weil die Analysefrequenzen auch ganzzahlige Werte enthalten.

Möchte man die Frequenzauflösung erhöhen, muss die Korrelation auch mit feiner aufgelösten Frequenzen durchgeführt werden, bspw. mit der Schrittweite von 0.1 Hz.

In [None]:
calc_and_plot_xcorr_dft(signal,
                        sampling_rate,
                        xcorr_freq_step_size = .1,
                        minimum_freq_amplitude = 0.4,
                        title=f"Signalgemisch (Länge: {length_in_sec}s, $f_s={sampling_rate}Hz$)");

Genauso ist es möglich, die Auflösung zu verringern. Damit erhöht man die Verarbeitungsgeschwindigkeit auf Kosten der Präzision.

Setzt man die Auflösung auf 10Hz, erzielt man einen Geschwindigkeitsvorteil von 100 gegenüber der Auflösung von 0.1Hz.

In [None]:
calc_and_plot_xcorr_dft(signal,
                        sampling_rate,
                        xcorr_freq_step_size=10,
                        minimum_freq_amplitude=0.3,
                        title=f"Signalgemisch (Länge: {length_in_sec}s, $f_s={sampling_rate}Hz$)");

## Eigener Frequenzmix

In [None]:
sampling_rate  = 800
length_in_secs = 0.5

freqs      = [15, 80, 147, 223.5, 320]      # ändere oder füge weitere Frequenzen hinzu
amplitudes = [0.3, 1.5, 1.3, 0.5, 0.7]

signal_composite = create_cosine_wave(0, sampling_rate, length_in_sec);

for i, freq in enumerate(freqs):
    signal = amplitudes[i] * create_cosine_wave(freqs[i], sampling_rate, length_in_sec);
    signal_composite = signal_composite + signal

# Analyse und Plot
calc_and_plot_xcorr_dft(signal_composite,
                        sampling_rate,
                        xcorr_freq_step_size=.5,
                        minimum_freq_amplitude = 0.2,
                        title=f"Signalgemisch (Länge: {length_in_sec}s, $f_s={sampling_rate}Hz$)");

## Analyse von Audiodaten

Die Datei `SineWaveChord_MinFreq261_MaxFreq4186_1sec_11025Hz_mono.wav` enthält folgende Frequenzen:

- C4: $A=0.2, f_s=261.6256 Hz$
- D4: $A=0.2, f_s=293.665 Hz$
- G4: $A=0.2, f_s=391.995 Hz$
- C7: $A=0.2, f_s=2093 Hz$
- C8: $A=0.2, f_s=4186.01 Hz$

In [None]:
sampling_rate, signal = scipy.io.wavfile.read('../data/audio/SineWaveChord_MinFreq261_MaxFreq4186_1sec_11025Hz_mono.wav')

# Normalisieren des Signals
signal = signal/np.max(signal);

# Plot der Signale
calc_and_plot_xcorr_dft(signal,
                        sampling_rate,
                        xcorr_freq_step_size=1,
                        minimum_freq_amplitude=0.2,
                        title=f"Audiodatei ($f_s={sampling_rate}Hz$)");

# Audiodaten abspielen
ipd.Audio(signal, rate=sampling_rate)

# Analyse der menschlichen Stimme

In [None]:
import librosa

# Import mittels librosa und Downsampling auf fs=8000Hz
signal, sampling_rate = librosa.load('../data/audio/HumanVoice-Hello_16bit_44.1kHz_mono.wav', sr=8000)

# Normalisieren des Signals
signal = signal/np.max(signal);

calc_and_plot_xcorr_dft(signal,
                        sampling_rate,
                        xcorr_freq_step_size=1, 
                        minimum_freq_amplitude=0.2,
                        title=f"Audiodatei ($f_s={sampling_rate}Hz$)");

ipd.Audio(signal, rate=sampling_rate)

- mittels Kreuzkorrelation lassen sich unbekannte Signale hinsichtlich ihres Spektrums analysieren
- mit Hilfe von Analysesignalen (Sinusschwingungen) lassen sich sowohl die enthaltenen Frequenzen sowie deren Amplituden aus einem unbekannten Signalgemisch ermitteln
- Was ist mit der 3. Komponente, der Phase?

## Phasenanalyse mittels Kreuzkorrelation?

In [None]:
sampling_rate = 800
length_in_sec = 0.5

signal1 = create_cosine_wave(10, sampling_rate, length_in_sec, phase=0)
signal2 = create_cosine_wave(55, sampling_rate, length_in_sec, phase=np.pi/2)

#plot_signal(signal1, sampling_rate)
#plot_signal(signal2, sampling_rate)

#signal = signal1;
#signal = signal2;
signal = signal1 + signal2;

calc_and_plot_xcorr_dft(signal,
                        sampling_rate,
                        title=f"Sinus mit $\phi=0$ ($f_s={sampling_rate}Hz$)");

Eine Signalanalyse mittels Kreuzkorrelation besitzt einen Nachteil:
- keine Aussage zur Phase des Signals

Lösung: Kreuzkorrelation mit Sinus und Cosinus

$$ X[k] = \sum_{n=0}^{N-1} x[n] \cdot \left[ \cos{\left(2\pi\frac{kn}{N} \right)} -j \cdot \sin{\left(2\pi \frac{kn}{N} \right)} \right ] \tag{1}$$

bzw.

$$ X[k] = \sum_{n=0}^{N-1} x[n] \cdot e^{-j 2\pi\frac{kn}{N}} \tag{2}$$

mit Signallänge $N$ und Frequenzanteil $k$.

Eine Kreuzkorrelation mit Sinus- und Kosinus-Signal ist nichts anderes als die Diskrete Fourier-Transformation.

# Die diskrete Fourier-Transformation (DFT)

In [None]:
sampling_rate = 32
length_in_sec = 1

signal1 = 1.0 * create_sine_wave(2, sampling_rate, length_in_sec, phase=0)
signal2 = 0.5 * create_sine_wave(4, sampling_rate, length_in_sec, phase=np.pi/4)

signal  = signal1 + signal2;

fig = plot_signal(signal, sampling_rate, title="Komplexes Signal")
#fig.savefig("dfttest1.png", dpi=300);

### Berechnung der DFT

In [None]:
def dft(signal, freq, output=1):
    
    N = len(signal);
    n = np.arange(N);
    
    real = signal * np.cos(2*np.pi*freq*n/N)
    imag = signal * np.sin(2*np.pi*freq*n/N)
    
    real_sum = np.sum(real);
    imag_sum = np.sum(imag);
    
    
    betrag = np.sqrt(real_sum*real_sum + imag_sum*imag_sum);
    phase  = np.arctan(imag_sum/real_sum)*180.0/np.pi;
    
    if (output):
        np.set_printoptions(precision=4)
        print("Summe Realteil:     %7.3f" % real_sum)
        print("Summe Imaginärteil: %7.3f" % imag_sum)
        print("Betrag:             %7.3f" % betrag)
        print("Phase :             %7.3f" % phase)
        
    return betrag, phase;

In [None]:
# freq = 0...8
dft(signal, freq=4);

In [None]:
betrag = np.zeros(len(signal))
phase  = np.zeros(len(signal))

# Berechnung des Betrags und der Phase für n=0...N-1
for n in np.arange(len(signal)):
    betrag[n], phase[n] = dft(signal, n, output=0)

# Grafische Darstellung der Frequenzen
fig = plt.figure(figsize=(15,5))
plt.stem(betrag)
plt.grid();
plt.ylabel('Betrag');
plt.xlabel('n');
plt.title('Betragsspektrum');

# ToDo: Spektrum halbieren, Amplituden verdoppeln, Amplituden Normalisieren
# n => sind diskrete Werte, die noch auf die entsprechenden Frequenzen gemappt werden müssen (hier im Beispiel nicht relevant)

### Normalisierung der Werte

In [None]:
N = len(signal)

# Grafische Darstellung der Frequenzen
fig = plt.figure(figsize=(15,8))
plt.subplot(2,1,1)
plt.stem(2*betrag[0:N//2]/N)
plt.grid();
plt.ylabel('Betrag');
plt.xlabel('Frequenz [Hz]');
plt.title('Betragsspektrum');

# Grafische Darstellung der Phase
plt.subplot(2,1,2)
plt.stem(phase[0:N//2])
plt.ylabel('Phase (deg)');
plt.xlabel('Frequenz [Hz]');
plt.title('Phasenspektrum');
plt.grid();

fig.tight_layout(pad=2)
#fig.savefig("dfttest3.png", dpi=300)

### Berechnungsaufwand der DFT

In [None]:
from timeit import default_timer as timer

sampling_rate = 8000
length_in_sec = 2

signal1 = create_cosine_wave(10, sampling_rate, length_in_sec, phase=0)
signal2 = create_cosine_wave(55, sampling_rate, length_in_sec, phase=np.pi/2)
signal  = signal1 + signal2;

betrag = np.zeros(len(signal))
phase  = np.zeros(len(signal))

start = timer()
for i in np.arange(len(signal)):
    betrag[i], phase[i] = dft(signal, i, output=0)

dauer_dft = timer()-start;
print("Dauer %.3f Sek." % (dauer_dft))

N = len(signal)

# Grafische Darstellung der Frequenzen
plt.figure(figsize=(15,5))
plt.stem(2*betrag[0:N//2]/N)
plt.grid();
plt.ylabel('Betrag');
plt.xlabel('n');
plt.title('Betragsspektrum');

Mit Hilfe der diskreten Fourier-Transformation (DFT) lassen sich komplexe Signale analysieren. Mittels DFT erhält man komplexe Fourier-Koeffizienten, aus denen man sowohl die Frequenz, die Amplitude und die Phase der im Signal befindlichen Spektralanteile bestimmen kann.

Einziger Nachteil der DFT: der sehr hohe Berechnungsaufwand (doppelt so hoch wie Kreuzkorrelation)

Lösung: Optimierung der Berechnungen ==> Schnelle Fouriertransformation