In [1]:
import alsaaudio
import time
from multiprocessing import Process
from threading import Thread
from pathlib import Path
from threading import Thread, Lock
import wave

In [2]:
alsaaudio.pcms()

['null',
 'jack',
 'pulse',
 'custom',
 'default',
 'sysdefault:CARD=excelsiorcard',
 'dmix:CARD=excelsiorcard,DEV=0',
 'dmix:CARD=excelsiorcard,DEV=3',
 'dsnoop:CARD=excelsiorcard,DEV=0',
 'dsnoop:CARD=excelsiorcard,DEV=3',
 'hw:CARD=excelsiorcard,DEV=0',
 'hw:CARD=excelsiorcard,DEV=3',
 'plughw:CARD=excelsiorcard,DEV=0',
 'plughw:CARD=excelsiorcard,DEV=3',
 'usbstream:CARD=excelsiorcard']

In [3]:
class Dummy:
    def __getattr__(self, attr):
        return lambda *args, **kwargs: None

In [4]:
class SleepSpindleRealTimeStimulator():
    def __init__(self, soundname=None, lsl_streamer=Dummy(), stimulation_delay=0.0, inter_stim_delay=0.0):
        """
        params: 
            stimulation_delay (float): simple delay between a detection and a stimulation
            inter_stim_delay (float): time to wait between a stimulation and the next detection 
        """
        if soundname is None:
            self.soundname = 'stimulus.wav' # CHANGE HERE TO THE SOUND THAT YOU WANT. ONLY ADD THE FILE NAME, NOT THE ENTIRE PATH
        else:
            self.soundname = soundname
#         self._sound = Path(".").parent.parent / 'sounds' / self.soundname
        self._sound = f"../sounds/{self.soundname}"
        self._thread = None
        self._lock = Lock()
        self.last_detected_ts = time.time()
        self.wait_t = 0.4  # 400 ms
        self.delayer = None
        self.lsl_streamer = lsl_streamer

        # Initialize Alsa stuff
        # Open WAV file and set PCM device
        with wave.open(str(self._sound), 'rb') as f: 
            device = 'custom'
            
            self.duration = f.getnframes() / float(f.getframerate())
            
            format = None

            # 8bit is unsigned in wav files
            if f.getsampwidth() == 1:
                format = alsaaudio.PCM_FORMAT_U8
            # Otherwise we assume signed data, little endian
            elif f.getsampwidth() == 2:
                format = alsaaudio.PCM_FORMAT_S16_LE
            elif f.getsampwidth() == 3:
                format = alsaaudio.PCM_FORMAT_S24_3LE
            elif f.getsampwidth() == 4:
                format = alsaaudio.PCM_FORMAT_S32_LE
            else:
                raise ValueError('Unsupported format')

            self.periodsize = f.getframerate() // 8

            try:
                self.pcm = alsaaudio.PCM(channels=f.getnchannels(), rate=f.getframerate(), format=format, periodsize=self.periodsize, device=device)
            except alsaaudio.ALSAAudioError as e:
#                 print("WARNING: Could not open ALSA device as it is already playing a sound. To test stimulation, stop recording and try again.")
                self.pcm = Dummy()

                raise e
                
            # Store data in list to avoid reopening the file
            self.wav_list = []
            while True:
                data = f.readframes(self.periodsize)  
                if data:
                    self.wav_list.append(data)
                else: 
                    break
                    
        print(f"DEBUG: Stimulator will play sound {self.soundname}, duration: {self.duration:.3f} seconds")


    def play_sound(self):
        '''
        Open the wav file and play a sound
        '''
        print(len(self.wav_list[0]))
        for data in self.wav_list:
            self.pcm.write(data) 
            
        # Added this to make sure the thread does not stop before the sound is done playing
        time.sleep(self.duration)
    
    def stimulate(self, detection_signal):
        for sig in detection_signal:
            # We detect a stimulation
            if sig:
                # Record time of stimulation
                ts = time.time()
                
                # Check if time since last stimulation is long enough
                if ts - self.last_detected_ts > self.wait_t:
                    if not isinstance(self.delayer, Dummy):
                        # If we have a delayer, notify it
                        self.delayer.detected()
                        # Send the LSL marer for the fast stimulation 
                        self.send_stimulation("FAST_STIM", False)
                    else:
                        self.send_stimulation("STIM", True)

                self.last_detected_ts = ts

    def send_stimulation(self, lsl_text, sound):
        # Send lsl stimulation
        self.lsl_streamer.push_marker(lsl_text)
        # Send sound to patient
        if sound:
            with self._lock:
                if self._thread is None: 
                    self._thread = Thread(target=self._t_sound, daemon=True)
                    self._thread.start()

                
    def _t_sound(self):
        self.play_sound()
        with self._lock:
            self._thread = None
    
    def test_stimulus(self):
        with self._lock:
            if self._thread is None:
                self._thread = Thread(target=self._t_sound, daemon=True)
                self._thread.start()

    def add_delayer(self, delayer):
        self.delayer = delayer
        self.delayer.stimulate = lambda: self.send_stimulation("DELAY_STIM", True)

    def __del__(self):
        print("DEBUG: releasing PCM")
        del self.pcm

In [5]:
sound = 'stimulus.wav'

In [6]:
def sound_process():

    stimulator = SleepSpindleRealTimeStimulator(soundname=sound)
    stimulator.play_sound()
    time.sleep(1)
    stimulator.play_sound()
    del stimulator
    print("Done")

In [7]:
sound_process()

DEBUG: Stimulator will play sound stimulus.wav, duration: 1.588 seconds
22048
22048
DEBUG: releasing PCM
Done


In [8]:
sound_proc = Process(target=sound_process)
sound_proc.start()
sound_proc.join()

DEBUG: Stimulator will play sound stimulus.wav, duration: 1.588 seconds
22048
22048
DEBUG: releasing PCM
Done


In [9]:
sound_proc.join()