In [3]:
# @Author: Gebremichael
# @File: dialogic_ADPCM.py
import wave
import numpy as np

# table of index
IndexTable = [-1, -1, -1, -1, 2, 4, 6, 8, -1, -1, -1, -1, 2, 4, 6, 8]

# table of  quantizer step size
StepSizeTable = [7, 8, 9, 10, 11, 12, 13, 14, 16, 17, 19, 21, 23, 25, 28, 31, 34, 37, 41, 45, 50, 55, 60, 66, 73, 80, 88, 97, 107, 118, 130, 143, 157, 173, 190, 209, 230, 253, 279, 307, 337, 371, 408, 449, 494, 544, 598, 658, 724, 796, 876, 963, 1060, 1166, 1282, 1411, 1552, 1707, 1878, 2066, 2272, 2499, 2749, 3024, 3327, 3660, 4026, 4428, 4871, 5358, 5894, 6484, 7132, 7845, 8630, 9493, 10442, 11487, 12635, 13899, 15289, 16818, 18500, 20350, 22385, 24623, 27086, 29794, 32767]

# ADPCM_Encode.
# sample: a 16-bit PCM sample
# retval : a 4-bit ADPCM sample
predsample = 0
index = 0

# This method is the ADPCM encoder
def ADPCM_Encode(sample):
    global index
    global predsample
    global diffq
    global ore_diff
    global error_signal
    
    code = 0
    
    step_size = StepSizeTable[index]

    # compute diff and record sign and absolut value
          
    diff = sample - predsample
    ore_diff=diff
    #print('diff sample:' ,ore_diff)
    if diff < 0:
        code = 8
        diff = -diff

    # quantize the diff into ADPCM code
    # inverse quantize the code into a predicted diff
    tmpstep = step_size
    diffq = step_size >> 3
    if diff >= tmpstep:
        code = code | 0x04
        diff -= tmpstep
        diffq = diffq + step_size

    tmpstep = tmpstep >> 1
    if diff >= tmpstep:
        code = code | 0x02
        diff = diff - tmpstep
        diffq = diffq + (step_size >> 1)

    tmpstep = tmpstep >> 1
    if diff >= tmpstep:
        code = code | 0x01
        diffq = diffq + (step_size >> 2)
    
    # fixed predictor to get new predicted sample
    #predsample=predictor(code,predsample,diffq)
    
    #print("predsample",predsample)
    # find new stepsize index
    index += IndexTable[code]

    # check for overflow
    if index < 0:
        index = 0

    if index > 88:
        index = 88
    error_signal = sample - predsample
    
    # return new ADPCM code code & 0x0f == code
    return code & 0x0f



def predictor(code,predsample,diffq):
    # fixed predictor to get new predicted sample
    if code & 8:
        predsample = predsample - diffq
    else:
        predsample = predsample + diffq
         
    # check for overflow
    if predsample > 32767:
        predsample = 32767
    elif predsample < -32768:
        predsample = -32768
    #print("code:",code)
    #print("predsample:",predsample)
    #print("diffq:",diffq)
    return predsample


In [None]:
# ADPCM_Decode.
# code: a byte containing a 4-bit ADPCM sample.
# retval: 16-bit ADPCM sample

de_index = 0
de_predsample = 0

# # This method is the ADPCM Decoder 
def ADPCM_Decode(code):
    global de_index
    global de_predsample

    step_size = StepSizeTable[de_index]

    # inverse code into diff    
    diffq = step_size >> 3  # == step/8
    if code & 4:
        diffq += step_size

    if code & 2:
        diffq += step_size >> 1

    if code & 1:
        diffq += step_size >> 2

    #print('diffq:' ,diffq)
    
    # add diff to predicted sample
    if code & 8:
        diffq = -diffq

    de_predsample += diffq

    # check for overflow  clip the values to +/- 2^11 (supposed to be 16 bits)
    if de_predsample > 32767:
        de_predsample = 32767
    elif de_predsample < -32768:
        de_predsample = -32768

    # find new quantizer step size
    de_index += IndexTable[code]

    # check for overflow
    if de_index < 0:
        de_index = 0

    if de_index > 88:
        de_index = 88

    # save predict sample and de_index for next iteration
    # return new decoded sample
    # The original algorithm turned out to be 12bit, need to convert to 16bit
    return de_predsample

In [4]:
def train_model(X,Y,U,W,V):
    for epoch in range(nepoch):
        # Step 2.1: Check the loss on training data ............
        loss = 0.0

        # do a forward pass to get prediction
        for i in range(Y.shape[0]):
            x, y = X[i], Y[i]                    # get input, output values of each record
            prev_s = np.zeros((hidden_dim, 1))   # here, prev-s is the value of0 the previous activation of hidden layer; which is initialized as all zeroes
            for t in range(T):
                new_input = np.zeros(x.shape)    # we then do a forward pass for every timestep in the sequence
                new_input[t] = x[t]              # for this, we define a single input for that timestep
                #new_input[t]=ADPCM_Encode(x[t])
                mulu = np.dot(U, new_input)
                mulw = np.dot(W, prev_s)
                add = mulw + mulu
                s = sigmoid(add)
                mulv = np.dot(V, s)
                prev_s = s
            #print(mulu.shape)
            #calculate error 
            loss_per_record = (y - mulv)**2 / 2
            loss += loss_per_record
        loss = loss / float(y.shape[0])

        
        # Step 2.2: Check the loss on validation data................
        val_loss = 0.0
        for i in range(Y_val.shape[0]):
            x, y = X_val[i], Y_val[i]
            prev_s = np.zeros((hidden_dim, 1))
            for t in range(T):
                new_input = np.zeros(x.shape)
                new_input[t] = x[t]
                mulu = np.dot(U, new_input)
                mulw = np.dot(W, prev_s)
                add = mulw + mulu
                s = sigmoid(add)
                mulv = np.dot(V, s)
                prev_s = s

            loss_per_record = (y - mulv)**2 / 2
            val_loss += loss_per_record
        val_loss = val_loss / float(y.shape[0])
    
        print('Epoch: ', epoch + 1, ', Loss: ', loss, ', Val Loss: ', val_loss)
        #print('Epoch: ', epoch + 1, ', Loss: ', loss)

        # Step 2.3: Start actual training .......
        # Step 2.3.1: Forward Pass
        for i in range(Y.shape[0]):
            x, y = X[i], Y[i]

            layers = []
            prev_s = np.zeros((hidden_dim, 1))
            dU = np.zeros(U.shape)
            dV = np.zeros(V.shape)
            dW = np.zeros(W.shape)

            dU_t = np.zeros(U.shape)
            dV_t = np.zeros(V.shape)
            dW_t = np.zeros(W.shape)

            dU_i = np.zeros(U.shape)
            dW_i = np.zeros(W.shape)

            # forward pass
            for t in range(T):
                new_input = np.zeros(x.shape)
                new_input[t] = x[t]
                
                mulu = np.dot(U, new_input)
                mulw = np.dot(W, prev_s)
                add = mulw + mulu
                s = sigmoid(add)
                mulv = np.dot(V, s)
                layers.append({'s':s, 'prev_s':prev_s})
                prev_s = s


            # Step 2.3.2 : Backpropagate Error ...................    
            # derivative of pred
            dmulv = (mulv - y)


            # backward pass
            for t in range(T):
                dV_t = np.dot(dmulv, np.transpose(layers[t]['s']))
                dsv = np.dot(np.transpose(V), dmulv)

                ds = dsv
                dadd = add * (1 - add) * ds

                dmulw = dadd * np.ones_like(mulw)

                dprev_s = np.dot(np.transpose(W), dmulw)


                for i in range(t-1, max(-1, t-bptt_truncate-1), -1):
                    ds = dsv + dprev_s
                    dadd = add * (1 - add) * ds

                    dmulw = dadd * np.ones_like(mulw)
                    dmulu = dadd * np.ones_like(mulu)

                    dW_i = np.dot(W, layers[t]['prev_s'])
                    dprev_s = np.dot(np.transpose(W), dmulw)

                    new_input = np.zeros(x.shape)
                    new_input[t] = x[t]
                    dU_i = np.dot(U, new_input)
                    dx = np.dot(np.transpose(U), dmulu)

                    dU_t += dU_i
                    dW_t += dW_i

                dV += dV_t
                dU += dU_t
                dW += dW_t


                # Step 2.3.3 : Update weights ...................
                if dU.max() > max_clip_value:
                    dU[dU > max_clip_value] = max_clip_value
                if dV.max() > max_clip_value:
                    dV[dV > max_clip_value] = max_clip_value
                if dW.max() > max_clip_value:
                    dW[dW > max_clip_value] = max_clip_value


                if dU.min() < min_clip_value:
                    dU[dU < min_clip_value] = min_clip_value
                if dV.min() < min_clip_value:
                    dV[dV < min_clip_value] = min_clip_value
                if dW.min() < min_clip_value:
                    dW[dW < min_clip_value] = min_clip_value

            # update
            U -= learning_rate * dU
            V -= learning_rate * dV
            W -= learning_rate * dW  


def model(X,Y,U,W,V):   
    preds = []
    for i in range(Y.shape[0]):
        x, y = X[i], Y[i]
        prev_s = np.zeros((hidden_dim, 1))
        # Forward pass
        for t in range(T):
            mulu = np.dot(U, x)
            mulw = np.dot(W, prev_s)
            add = mulw + mulu
            s = sigmoid(add)
            mulv = np.dot(V, s)
            prev_s = s

        preds.append(mulv)

    preds = np.array(preds)
    return preds

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
import IPython.display as ipd
import librosa
import librosa.display
import numpy as np

import wave
from scipy.io import wavfile

np.set_printoptions(threshold=np.inf)
audio_file = 'C:/Users/GM/Desktop/audio_data/myspeech.wav'
fs, wavdata = wavfile.read(audio_file)
wavdata=wavdata[::1]

audio, fs = librosa.load(audio_file, sr = 44100)
#wavdata=audio[15000:20000]


# ===============Step 0: Data Preparation==============
X = []
Y = []
seq_len = 10
num_records = len(wavdata) - seq_len

for i in range(num_records - 10):
    X.append(wavdata[i:i+seq_len]) # the first 50 siquencial samples 
    Y.append(wavdata[i+seq_len])   # the 51st predicted single sample for the next it used to predictthe next 52nd sample
X = np.array(X)
X = np.expand_dims(X, axis=2)
Y = np.array(Y)
Y = np.expand_dims(Y, axis=1)

# ===============validation data preparation==============
X_val = []
Y_val = []
for i in range(num_records - 10, num_records):    
    X_val.append(wavdata[i:i+seq_len])
    Y_val.append(wavdata[i+seq_len])
X_val = np.array(X_val)
X_val = np.expand_dims(X_val, axis=2)
Y_val = np.array(Y_val)
Y_val = np.expand_dims(Y_val, axis=1)


# =================RNN Architectur========================
learning_rate = 0.0001    
nepoch = 25             
T = 10                   # length of sequence
hidden_dim = 100         
output_dim = 1

bptt_truncate = 5
min_clip_value = -10
max_clip_value = 10
    
U = np.random.uniform(0, 1, (hidden_dim, T))
W = np.random.uniform(0, 1, (hidden_dim, hidden_dim))
V = np.random.uniform(0, 1, (output_dim, hidden_dim))

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

#===============Fucnction Calling train_model==============
train_model(X,Y,U,W,V)

In [None]:
preds=model(X,Y,U,W,V)

plt.plot(preds[:, 0], 'b')
plt.show()

plt.plot(Y[:, 0], 'r')
plt.show()

In [1]:
#Y[:, 0]

In [2]:
#preds[:, 0]