# Cochlear Implant Simulator

This program uses vocoder principles to simulate the signal processing of an Advanced Bionics Harmony Speech Processor and produces an audio output that can be played over headphones/loudspeakers for listening. 

## Library imports

In [9]:
# Import necessary functions
import numpy as np
#import pyaudio as pa
from IPython.display import Audio
from scipy.signal import resample
from scipy.io.wavfile import read as wavread
from scipy.io.wavfile import write as wavwrite

# Import the rest of the GpyT subpackage functions for the demo here
from GpyT.Frontend.readWav import readWavFunc
from GpyT.Frontend.tdFilter import tdFilterFunc
from GpyT.Agc.dualLoopTdAgc import dualLoopTdAgcFunc
from GpyT.WinBuf.winBuf import winBufFunc
from GpyT.Filterbank.fftFilterbank import fftFilterbankFunc
from GpyT.Filterbank.hilbertEnvelope import hilbertEnvelopeFunc
from GpyT.Filterbank.channelEnergy import channelEnergyFunc
from GpyT.NoiseReduction.noiseReduction import noiseReductionFunc
from GpyT.PostFilterbank.specPeakLocator import specPeakLocatorFunc
from GpyT.PostFilterbank.currentSteeringWeights import currentSteeringWeightsFunc
from GpyT.PostFilterbank.carrierSynthesis import carrierSynthesisFunc
from GpyT.Mapping.f120Mapping import f120MappingFunc
from GpyT.Electrodogram.f120Electrodogram import f120ElectrodogramFunc
from GpyT.Validation.validateOutput import validateOutputFunc
from GpyT.Vocoder.vocoder import vocoderFunc

#%load_ext line_profiler

## Setup parameters
These parameters dictate how the signal processing blocks work. Do not change these.

In [10]:
stratWindow = 0.5*(np.blackman(256)+np.hanning(256))
stratWindow = stratWindow.reshape(1,stratWindow.size)  

parStrat = {
    'wavFile' : 'AzBio_3sent_65dBSPL.wav',  # this should be a complete absolute path to your sound file of choice
    'fs' : 17400, # this value matches implant internal audio rate. incoming wav files resampled to match
    'nFft' : 256,
    'nHop' : 20,
    'nChan' : 15, # do not change
    'startBin' : 6,
    'nBinLims' : np.array([2,2,1,2,2,2,3,4,4,5,6,7,8,10,56]),
    'window' : stratWindow,
    'pulseWidth' : 18, # DO NOT CHANGE
    'verbose' : 0
    }

parReadWav = {
    'parent' : parStrat,
    'tStartEnd' : [],
    'iChannel' : 1,
    }

parPre = {
    'parent' : parStrat,
    'coeffNum' : np.array([.7688, -1.5376, .7688]),
    'coeffDenom' : np.array([1, -1.5299, .5453]),
    }

envCoefs = np.array([-19,55,153,277,426,596,784,983,
                   1189,1393,1587,1763,1915,2035,2118,2160,
                   2160,2118,2035,1915,1763,1587,1393,1189,
                   983,784,596,426,277,153,55,-19])/(2**16) 

parAgc = {
    'parent' : parStrat,
    'kneePt' : 4.476,
    'compRatio' : 12,
    'tauRelFast' : -8/(17400*np.log(.9901))*1000,
    'tauAttFast' : -8/(17400*np.log(.25))*1000,
    'tauRelSlow' : -8/(17400*np.log(.9988))*1000,
    'tauAttSlow' : -8/(17400*np.log(.9967))*1000,
    'maxHold' : 1305,
    'g0' : 6.908,
    'fastThreshRel' : 8,
    'cSlowInit' : 0.5e-3,
    'cFastInit' : 0.5e-3,
    'controlMode' : 'naida',
    'clipMode' : 'limit',
    'decFact' : 8,
    'envBufLen' : 32,
    'gainBufLen' : 16,
    'envCoefs' : envCoefs
}

parWinBuf = {
    'parent' : parStrat,
    'bufOpt' : []
    }

parFft = {
    'parent' : parStrat,
    'combineDcNy' : False,
    'compensateFftLength' : False,
    'includeNyquistBin' : False
    }

parHilbert = {
    'parent' : parStrat,
    'outputOffset' : 0,
    'outputLowerBound' : 0,
    'outputUpperBound' : np.inf
    }

parEnergy = {
    'parent' : parStrat,
    'gainDomain' : 'linear'
    }

parNoiseReduction = {
    'parent' : parStrat,
    'gainDomain' : 'log2',
    'tau_speech' : .0258,
    'tau_noise' : .219,
    'threshHold' : 3,
    'durHold' : 1.6,
    'maxAtt' : -12,
    'snrFloor' : -2,
    'snrCeil' : 45,
    'snrSlope' : 6.5,
    'slopeFact' : 0.2,
    'noiseEstDecimation': 1,
    'enableContinuous' : False,
    'initState' : {'V_s' : -30*np.ones((15,1)),'V_n' : -30*np.ones((15,1))},
    }

parPeak = {
    'parent' : parStrat,
    'binToLocMap' : np.concatenate((np.zeros(6,),np.array([256, 640, 896, 1280, 1664, 1920, 2176,       # 1 x nBin vector of nominal cochlear locations for the center frequencies of each STFT bin
                                    2432, 2688, 2944, 3157, 3328, 3499, 3648, 3776, 3904, 4032,         # values from 0 .. 15 in Q9 format
                                    4160, 4288, 4416, 4544, 4659, 4762, 4864, 4966, 5069, 5163,         # corresponding to the nominal steering location for each 
                                    5248, 5333, 5419, 5504, 5589, 5669, 5742, 5815, 5888, 5961,         # FFT bin
                                    6034, 6107, 6176, 6240, 6304, 6368, 6432, 6496, 6560, 6624, 
                                    6682, 6733, 6784, 6835, 6886, 6938, 6989, 7040, 7091, 7142, 
                                    7189, 7232, 7275, 7317, 7360, 7403, 7445, 7488, 7531, 7573, 
                                    7616, 7659]),7679*np.ones((53,))))/512
}

parSteer = {
    'parent' : parStrat,
    'nDiscreteSteps' : 9,
    'steeringRange' : 1.0
    }

parCarrierSynth = {
    'parent' : parStrat,
    'fModOn' : .5,
    'fModOff': 1.0,
    'maxModDepth' : 1.0,
    'deltaPhaseMax' : 0.5
    }

parMapper = {
    'parent' : parStrat,
    'mapM' : 500*np.ones(16),
    'mapT' : 50*np.ones(16),
    'mapIdr' : 60*np.ones(16),
    'mapGain' : 0*np.ones(16),
    'mapClip' : 2048*np.ones(16),
    'chanToElecPair' : np.arange(16),   
    'carrierMode' : 1
    }

parElectrodogram = {
    'parent' : parStrat,
    'cathodicFirst' : True,
    'channelOrder' : np.array([1,5,9,13,2,6,10,14,3,7,11,15,4,8,12]), # DO NOT CHANGE (different order of pulses will have no effect in vocoder output)
    'enablePlot' : True,
    'outputFs' : [], # DO NOT CHANGE (validation depends on matched output rate, vocoder would not produce different results at higher or lower Fs when parameters match accordingly) [default: [],(uses pulse width ~ 55555.55)]
    }

parValidate = {
    'parent' : parStrat,
    'lengthTolerance' : 50, 
    'saveIfSimilar' : True,  # save even if the are too similar to default strategy
    'differenceThreshold' : 1,
    'maxSimilarChannels' : 8,
    'elGramFs' : parElectrodogram['outputFs'],  # this is linked to the previous electrodogram generation step, it should always match [55556 Hz]
    'outFile' : None           # This should be the full path including filename to a location where electrode matrix output will be saved after validation
    }

## Strategy
You may choose to convert one or more of these code blocks (1-9) to HLS

In [11]:
def harmony(sig_smp_wavIn): 
    
    results = {} #initialize return structure
    
## 1. Pre-emphasis
# The input signal is high-pass filtered, with filter characteristics matching the pre-emphasis filter applied to the 
# audio inputs (microphones and auxiliary) of Advanced Bionic's Harmony speech processor.
    sig_smp_wavPre = tdFilterFunc(parPre, sig_smp_wavIn)
    
## 2. Automatic gain control
    agc = dualLoopTdAgcFunc(parAgc, sig_smp_wavPre) 
    
## 3. Window and filter into channels
    sig_frm_audBuffers = winBufFunc(parWinBuf, agc['wavOut'])                        # buffering
    sig_frm_fft = fftFilterbankFunc(parFft, sig_frm_audBuffers)                      # stft
    sig_frm_hilbert = hilbertEnvelopeFunc(parHilbert, sig_frm_fft)                   # get hilbert envelopes   
    sig_frm_energy = channelEnergyFunc(parEnergy, sig_frm_fft, agc['smpGain'])       # estimate channel energy   
    
## 4. Apply noise reduction to channel envelopes
    sig_frm_gainNr = noiseReductionFunc(parNoiseReduction,sig_frm_energy)[0]         # estimate noise reduction
    sig_frm_hilbertMod = sig_frm_hilbert + sig_frm_gainNr                            # apply noise reduction gains to envelope
    
## 5. Find peaks in spectrum
    sig_3frm_fft = sig_frm_fft[:,2::3]                                               # subsample every third FFT input frame
    sig_3frm_peakFreq, sig_3frm_peakLoc = specPeakLocatorFunc(parPeak, sig_3frm_fft) # find spectral peaks

    # upsample back to full framerate (and add padding)
    sig_frm_peakFreq = np.repeat(np.repeat(sig_3frm_peakFreq, 1, axis=0), 3, axis=1)
    sig_frm_peakFreq = np.concatenate((np.zeros((sig_frm_peakFreq.shape[0], 2)), sig_frm_peakFreq),axis=1)
    sig_frm_peakFreq = sig_frm_peakFreq[:,:sig_frm_fft.shape[1]]
    sig_frm_peakLoc = np.repeat(np.repeat(sig_3frm_peakLoc, 1, axis=0), 3, axis=1)
    sig_frm_peakLoc = np.concatenate((np.zeros((sig_frm_peakLoc.shape[0], 2)), sig_frm_peakLoc), axis=1)
    sig_frm_peakLoc = sig_frm_peakLoc[:,:sig_frm_fft.shape[1]]
    
## 6. Calculate current steering weights and synthesize the carrier signals
    # steer current based on peak location 
    sig_frm_steerWeights = currentSteeringWeightsFunc(parSteer, sig_frm_peakLoc) 

    # carrier synthesis based on peak frequencies
    sig_ft_carrier, sig_ft_idxFtToFrm = carrierSynthesisFunc(parCarrierSynth, sig_frm_peakFreq) 
    
## 7. Map to Fidelity 120 stimulation strategy
    # Combine envelopes, carrier, current steering weights and compute outputs     
    sig_ft_ampWords = f120MappingFunc(parMapper, sig_ft_carrier, sig_frm_hilbertMod, sig_frm_steerWeights, sig_ft_idxFtToFrm)   
    
## 8. Create electrodogram for vocoder imput
    elGram = f120ElectrodogramFunc(parElectrodogram, sig_ft_ampWords)  
    np.savetxt("elGram.csv", elGram, delimiter=",")
    
## 9. Create simulator output
    results['audioOut'],results['audioFs'] = vocoderFunc(elGram, saveOutput=False)
    
    ## return output
    return results

In [12]:
np.set_printoptions(threshold=np.inf)
csvData= open("elGram.csv")
elGram = np.loadtxt(csvData, delimiter=",")
#print(elGram)
vocoderFunc(elGram, saveOutput=False)

PermissionError: [Errno 13] Permission denied: 'timeIdx_[10842-11119].csv'

## Run vocoder and play back audio
Do not convert to HLS

In [None]:
# read audio
sig_smp_wavIn, sourceName = readWavFunc(parReadWav)    

In [None]:
# import numpy
# a = numpy.asarray([ [1,2,3], [4,5,6], [7,8,9] ])
# print(a)
# numpy.savetxt("foo.csv", a, delimiter=",")

In [None]:
# run simulator
# %prun results = harmony(sig_smp_wavI
# results = harmony(sig_smp_wavIn)
# print(results)


In [None]:
# simulator output playback
# output1 = np.float32(np.concatenate((np.zeros(results['audioFs']),results['audioOut'])))
# wavwrite("output.wav",results['audioFs'],output1)
# Audio("output.wav")

In [None]:
# audio input playback
# Audio(sourceName)