# Sounds from Swarm

Currently generating sound from MAG_LR (F)

Sound processing based on work by Nikolai Linden-Vørnle: https://gitlab.gbar.dtu.dk/s183730/sonification-ESA-Swarm/

References:

- https://resampy.readthedocs.io/
- https://pyrubberband.readthedocs.io/
- https://panel.holoviz.org/reference/index.html

In [None]:
from time import sleep
import datetime as dt
import numpy as np
from scipy import signal
from scipy.io import wavfile
import matplotlib.pyplot as plt
import pandas as pd

from hapiclient import hapi, hapitime2datetime
import resampy
import pyrubberband as pyrb

import panel as pn

pn.extension()

In [None]:
# Time inputs, to be made configurable
t0 = dt.datetime(2022, 1, 16, 0, 0, 0)
# t1 = t0 + dt.timedelta(minutes=1)
t1 = dt.datetime(2022, 1, 18, 0, 0, 0)

## Data access through VirES+HAPI

In [None]:
def fetch_data(t0, t1) -> pd.DataFrame:
    """Fetch data from VirES HAPI
    
    This needs to be done in chunks due to the limit, x_maxTimeSelection
    """
    # Generate time chunks
    times = pd.date_range(start=t0, end=t1, freq="D").to_pydatetime()
    start_times = times[:-1]
    end_times = times[1:]
    # Build dataframe in chunks of _df
    df = pd.DataFrame()
    for start_time, end_time in zip(start_times, end_times):
        # Fetch data
        data, meta = hapi(
            "https://vires.services/hapi/",
            "SW_OPER_MAGA_LR_1B",
            "Latitude,Longitude,Radius,F",  # ,B_NEC",
            start_time.isoformat(),
            end_time.isoformat(),
        )
        # Convert to dataframe
        #  To fix: this will not work with vector, e.g. B_NEC
        _df = pd.DataFrame(columns=data.dtype.names, data=data)
        _df = _df.set_index("Timestamp")
        _df.index = hapitime2datetime(_df.index.values.astype(str))
        _df.index = _df.index.tz_convert("UTC").tz_convert(None)
        df = pd.concat([df, _df])
    return df

In [None]:
df = fetch_data(t0, t1)

## Sound processing tools

In [None]:
def normalise(x):
    """Normalise signal to +1/-1 range"""
    return (x - np.average(x)) / (np.ptp(x))


def highpass_filter(x, filter_order=8, cutoff_freq=0.0001, fs=1):
    """Apply butterworth highpass filter to remove DC offset"""
    sos = signal.butter(filter_order, cutoff_freq, 'highpass', fs=fs, output='sos')
    # w, h = signal.sosfreqz(sos)
    return signal.sosfilt(sos, x)


def resample(x, fs=44100, resampling_factor=7):
    """Resample to new frequency"""
    sr_new = int(fs / resampling_factor)
    return resampy.resample(x, fs, sr_new)


def smooth_edges(x, fs=44100, t_fade=0.1):
    """Smoothing window to avoid clicks and pops at the start and end of the signal"""
    window = np.ones(len(x))
    L = int(t_fade * fs)
    fade = np.linspace(0, 1, L)
    for i in range(L):
        window[i] *= fade[i]
        window[len(window)-1-i] *= fade[i]
    return x * window


def time_stretch(x, fs=44100, target_length=10):
    """Stretch duration to target_length (seconds)"""
    input_length = len(x) / fs
    ts_ratio = input_length / target_length
    return pyrb.time_stretch(x, fs, ts_ratio)


def pitch_shift(x, fs=44100, octaves=1):
    """Shift the pitch by a given number of octaves"""
    return pyrb.pitch_shift(x, fs, 12*octaves)


def sound_format(x):
    """Convert to int16 for audio output"""
    return np.int16(x*32767)


def get_sound_pane(x, fs=44100):
    """Generate panel Audio pane"""
    return pn.pane.Audio(x, sample_rate=fs, sizing_mode="stretch_width")

In [None]:
def plot_spectrogram(x, fs=1, resampling_factor=1, ymax=0.00125, figsize=(5, 5)):
    nperseg = 2**(16 + (1 - (int(0.5 + resampling_factor/2))))
    f, t, Sxx = signal.spectrogram(x, fs, mode='magnitude', nperseg=nperseg)
    fig, ax = plt.subplots(1, 1, figsize=figsize)
    ax.pcolormesh(t, f, Sxx[:], shading='gouraud', cmap='hot')
    ax.set_ylabel('Frequency [Hz]')
    ax.set_xlabel('Time [sec]')
    ax.set_ylim(0, ymax)
    plt.close()
    return fig

### Take a look at the input...

In [None]:
# x_in = df["F"].values
# plot_spectrogram(x_in)

In [None]:
# # The raw input WARNING FOR YOUR EARS!
# get_sound_pane(sound_format(x_in))

### Now apply a pipeline created from the functions above...

In [None]:
def apply_audio_pipeline(x, target_length=10, shift_octaves=1):
    x = normalise(x)
    x = highpass_filter(x, filter_order=8, cutoff_freq=0.0001, fs=1)
    x = resample(x, fs=44100, resampling_factor=7)
    x = smooth_edges(x, fs=44100, t_fade=0.1)
    x = time_stretch(x, fs=44100, target_length=target_length)
    x = pitch_shift(x, fs=44100, octaves=shift_octaves)
    # plot_spectrogram(x, fs=44100, resampling_factor=7, ymax=800)
    return x

premade_audio_data = apply_audio_pipeline(df["F"].values, target_length=10, shift_octaves=1)

In [None]:
plot_spectrogram(premade_audio_data, fs=44100, resampling_factor=7, ymax=800)

In [None]:
get_sound_pane(
    sound_format(premade_audio_data),
    fs=44100
)

## Panel dashboard

In [None]:
class SoundDashboard:
    def __init__(self, df=df, t0=t0, t1=t1):
        """Initialise with the data from above, and create Panel objects"""
        self.default_t0t1 = t0, t1
        # self.df = fetch_data(t0, t1)
        self.df = df  # use the pre-fetched data from above for speed
        # self.audio_data = apply_audio_pipeline(self.df["F"].values)
        self.audio_data = premade_audio_data  # use the premade data from above for speed
        spectrogram_in = plot_spectrogram(self.df["F"].values)
        spectrogram_out = plot_spectrogram(self.audio_data, fs=44100, resampling_factor=7, ymax=800)
        wavfilename = self.write_file_for_download()
        self.widgets = {
            "time_range": pn.widgets.DatetimeRangeInput(
                start=dt.datetime(2021, 1, 1, 0, 0, 0), end=dt.datetime(2022, 3, 1, 0, 0, 0),
                value=(t0, t1)
            ),
            "button1": pn.widgets.Button(
                name="Fetch data", button_type="primary"
            ),
            "loading1": pn.indicators.Progress(active=False, sizing_mode="stretch_width"),
            "target_length": pn.widgets.IntSlider(
                name="Output length (seconds)",
                start=1, end=60, step=1, value=10
            ),
            "shift_octaves": pn.widgets.IntSlider(
                name="Shift by number of octaves",
                start=0, end=6, value=1
            ),
            "button2": pn.widgets.Button(
                name="Regenerate sound ➡️", button_type="primary"
            ),
            "loading2": pn.indicators.Progress(active=False, sizing_mode="stretch_width"),
            "file_download": pn.widgets.FileDownload(file=wavfilename)
        }
        self.panes = {
            "audio": get_sound_pane(sound_format(self.audio_data)),
            "spectrogram_in": pn.pane.Matplotlib(spectrogram_in),
            "spectrogram_out": pn.pane.Matplotlib(spectrogram_out)
        }
        self.widgets["button1"].on_click(self.update_data)
        self.widgets["button2"].on_click(self.update_audio)
        
    def update_data(self, event):
        """Fetch the data from VirES and reset the dashboard"""
        t0, t1 = self.widgets["time_range"].value
        if (t1 - t0) > dt.timedelta(days=7):
            self.widgets["button1"].name = "Time > 7 days not allowed !"
            self.widgets["time_range"].value = self.default_t0t1
            sleep(3)
            self.widgets["button1"].name = "Fetch data"
            return None
        self.widgets["button1"].name = "Busy..."
        self.widgets["loading1"].active = True
        self.df = fetch_data(t0, t1)
        spectrogram_in = plot_spectrogram(self.df["F"].values)
        self.panes["spectrogram_in"].object = spectrogram_in
        self.widgets["button1"].name = "Fetch data"
        self.widgets["loading1"].active = False
        self.update_audio(None)

    def update_audio(self, event):
        """Update the output spectrogram and audio"""
        self.panes["audio"].paused = True
        self.widgets["button2"].name = "Busy..."
        # Change contents of audio and spectrogram
        self.widgets["loading2"].active = True
        x = apply_audio_pipeline(
            self.df["F"].values,
            target_length=self.widgets["target_length"].value,
            shift_octaves=self.widgets["shift_octaves"].value
        )
        self.audio_data = x
        self.panes["audio"].object = sound_format(x)
        spectrogram_out = plot_spectrogram(x, fs=44100, resampling_factor=7, ymax=800)
        self.panes["spectrogram_out"].object = spectrogram_out
        self.write_file_for_download()
        # Reset button & loading widget
        self.widgets["button2"].name = "Regenerate sound ➡️"
        self.widgets["loading2"].active = False
        
    def write_file_for_download(self, filename="sonification.wav"):
        wavfile.write(filename, 44100, sound_format(self.audio_data))
        return filename

    def display(self):
        """GridSpec-based layout of all the widgets and panes"""
        gspec = pn.GridSpec(sizing_mode="stretch_both", max_height=800)
        gspec[:, 0] = pn.Column(
            pn.pane.Markdown("## Input data"),
            pn.pane.Markdown("`SW_OPER_MAGA_LR_1B: F`"),
            self.widgets["time_range"],
            self.widgets["button1"],
            self.widgets["loading1"],
            pn.pane.Markdown("## Input spectrogram"),
            self.panes["spectrogram_in"]
        )
        gspec[:, 1] = pn.Column(
            self.widgets["target_length"],
            self.widgets["shift_octaves"],
            self.widgets["button2"],
            background="WhiteSmoke"
        )
        gspec[:, 2] = pn.Column(
            pn.pane.Markdown("## Output spectrogram & audio"),
            self.widgets["loading2"],
            self.panes["spectrogram_out"],
            self.panes["audio"],
            self.widgets["file_download"]
        )
        return gspec
        

SoundDashboard().display().servable("Sounds from Swarm")