In [None]:
import requests
from colorama import Fore,Back
import csv
from pyarabic.araby import TASHKEEL 
import time
from IPython.display import display, clear_output

from IPython.display import display


# Please consider running all cells in order . some cells depend on previous ones


## helping functions


In [None]:
from typing import Tuple
from termcolor import colored
import numpy as np



def extract_arabic_alphabits(text : str)->list[str]:
    """returns Arabic alphabets keeping diacritics

    Args:
        text (str): text we need to split

    Returns:
        list[str]: list of alphabets with diacritics if found
    """
    res = []
    for word in text.split():
        for char in word:
            if char not in TASHKEEL:
                res.append(char)
            else:
                res[-1] = res[-1]+char
                
    return res
            
            
def calculate_cer_and_display(reference: str, hypothesis: str) -> Tuple[float, str]:
    """calculate character error rate between reference and hypothesis
    

    Args:
        reference (str): string to calcaute cer based on
        hypothesis (str): string to treat as input (if we consider refrence as constant)

    Returns:
        Tuple[float, str]: float representing cer , and str representing feedback
    """
    
    reference = extract_arabic_alphabits(reference)
    hypothesis = extract_arabic_alphabits(hypothesis)

    m, n = len(reference), len(hypothesis)
    dp = np.zeros((m + 1, n + 1), dtype=int)

    for i in range(m + 1):
        dp[i][0] = i
    for j in range(n + 1):
        dp[0][j] = j

    for i in range(1, m + 1):
        for j in range(1, n + 1):
            if reference[i - 1] == hypothesis[j - 1]:
                dp[i][j] = dp[i - 1][j - 1]
            else:
                dp[i][j] = min(dp[i - 1][j] + 1,   
                                dp[i][j - 1] + 1,   
                                dp[i - 1][j - 1] + 1)  

    i, j = m, n
    result = []

    while i > 0 or j > 0:
        if i > 0 and j > 0 and reference[i - 1] == hypothesis[j - 1]:
            result.append(colored(reference[i - 1], 'green'))
            i -= 1
            j -= 1
        elif i > 0 and (j == 0 or dp[i][j] == dp[i - 1][j] + 1):
            result.append(colored(reference[i - 1], 'red'))  
            i -= 1
        elif j > 0 and (i == 0 or dp[i][j] == dp[i][j - 1] + 1):
            result.append(colored(hypothesis[j - 1], 'red'))  
            j -= 1
        else:
            result.append(colored(reference[i - 1], 'red') + '/' + colored(hypothesis[j - 1], 'red'))  
            i -= 1
            j -= 1

    result.reverse()
    cer = dp[m][n] / m
    return cer, ''.join(result)



## Syllables recitation

In [None]:
# load Al-Fatiha , This surah is created with Al-Hossary voice reciting al-Fatiha and passed on 'base' ASR model
moshaf_syl = []
with open('./Moshaf/SYL_AlFaiha_Hosary.csv' ,'r', encoding='utf-8') as fp:
    reader = csv.reader(fp)
    for line in reader:
        moshaf_syl.append(*line)
moshaf_syl

In [None]:

def syl_request(audiopth):
    """makes request to 'SYL' endpoint , to get ASR_syl service on a given audio file

    Args:
        audiopth (str , path like): path to audio file

    Returns:
        dict : server response
    """
    url = 'http://127.0.0.1:8000/SYL'
    file = {'file': open(audiopth, 'rb')}
    resp = requests.post(url=url, files=file) 
    return resp
def syl_recitation(audiopth,verse_number):
    """ performs recitation based on syllables , given audio file of recited verse and index of this specific verse.
    this function calls 'syl_request' function

    Args:
        audiopth (str , path like): path to audio file
        verse_number (int): index of verse 
    """
    resp=syl_request(audiopth)
    hypothesis = resp.json()['message']
    idx = verse_number-1
    reference = moshaf_syl[idx]
    
    cer, result_str = calculate_cer_and_display(reference, hypothesis)
    print(f"CER: {cer:.2f}")
    print(result_str)
    return(hypothesis)

In [None]:
audiopth = r"./testcases/syllables_wrong_testcases/002_1.wav"
verse_number = 2
syl_recitation(audiopth,verse_number)

In [None]:
# this cell utilizes previous funtions to perform recitation (focusng on syllables ) with mic instead of audio file.

import pyaudio
from collections import deque
import soundfile as sf
import os

FORMAT = pyaudio.paFloat32  
CHANNELS = 1  
RATE = 16000  
CHUNK = 1600  
p = pyaudio.PyAudio()
input_stream = p.open(format=FORMAT,
                        channels=CHANNELS,
                        rate=RATE,
                        input=True,
                        frames_per_buffer=CHUNK) 

#every 10 chunks equals 1 second , 35 chunks equals 3.5 seconds
MAX_LEN=35
que = deque(maxlen=MAX_LEN)

print(Fore.YELLOW,Back.GREEN , 'Recording...' , Fore.RESET,Back.RESET)
while len(que) != MAX_LEN:
    data = input_stream.read(CHUNK)
    que.append(data)
else:
    
    arr = np.frombuffer(b''.join(que),dtype=np.float32)
    audio_output_path = './syl_audio_output_sample.wav'
    sf.write(audio_output_path,arr,16_000)
    print(Fore.BLUE,Back.GREEN , 'Uploading...' , Fore.RESET,Back.RESET)
    syl_recitation(audio_output_path,2)
    os.remove(audio_output_path)
    
input_stream.stop_stream()
input_stream.close()
p.terminate()

## ASR recitation

In [None]:
def ASR_request(pth , modelType='tiny'):
    """makes request to 'tiny' or 'base'  endpoint , to get ASR service on a given audio file

    Args:
        audiopth (str , path like): path to audio file

    Returns:
        dict : server response
    """
    
    url = f'http://127.0.0.1:8000/ASR?ASR_type={modelType}'   #chose base or tiny. server defualt is tiny.
    file = {'file': open(pth, 'rb')}
    resp = requests.post(url=url, files=file) 
    return(resp)    

ASR_request('./testcases/X2/11.wav' , 'base').json()


In [None]:
# prepare requirements (loading surah , feedback functions) 
surah_que_main = deque()
with open ('./Moshaf/ASR_AlFaiha_Hosary.csv' , 'r' , encoding='utf-8') as fp:
    reader = csv.reader(fp)
    for line in reader:
        surah_que_main.append(*line)
        


def print_surah(word_limit=10):
    """prints feedback in terminal 

    Args:
        word_limit (int): number of words in each line . Defaults to 10.
    """
    current_line=[]
    for word in feedBack:
        current_line.append(word)
        if len(current_line) == word_limit:
            print(' '.join(current_line))
            current_line = []
            
    if current_line:
            print(' '.join(current_line))
            


In [None]:
# makes feedback in terminal. red words not pronounced. Green words pronounced. 
# prints recorded audio and also gives feedback on it 
# prints hint of word to say. recording and uploading indicator

surah_que=surah_que_main.copy()
feedBack = [colored(i,'red')for i in surah_que_main]


p = pyaudio.PyAudio()
input_stream = p.open(format=FORMAT,
                        channels=CHANNELS,
                        rate=RATE,
                        input=True,
                        frames_per_buffer=CHUNK)    
MAX_LEN=30
audio_surah_que = deque(maxlen=MAX_LEN)

display(print_surah())
try:
    feedBack_IDX= 0 
    while len(surah_que)!=0:
        
        #display_handle1 = display(output1, display_id=True)
        #update_recodring('r')
        print(Fore.GREEN, 'Recording...' , Fore.RESET,Back.RESET , end='\r')
        while len(audio_surah_que) != MAX_LEN:
            data = input_stream.read(CHUNK)
            audio_surah_que.append(data)
        else:
            arr = np.frombuffer(b''.join(audio_surah_que),dtype=np.float32)
            audio_output_path = './surah_audio_output_sample.wav'
            sf.write(audio_output_path,arr,16_000)
            print(Fore.RED, 'Uploading...' , Fore.RESET,Back.RESET,end='\r')
            #update_recodring('u')
            response = ASR_request(audio_output_path,'base')
            os.remove(audio_output_path)
            audio_surah_que.clear()
            
            resp = response.json()['message']
            resp= resp.split(' ')
            result = []
            
            for word in resp:
                
                if word == surah_que[0]:
                    feedBack[feedBack_IDX] = colored(surah_que[0],'green')
                    feedBack_IDX+=1
                    result.append(colored((word),'green'))
                    surah_que.popleft()
                else:
                    result.append(colored((word),'red'))
            
            if len(surah_que)==0:
                clear_output(wait=True)
                display(print_surah(),print('\n',(' '.join(result))))
                raise KeyboardInterrupt
                
            clear_output(wait=True)
            display(print_surah(),(surah_que[0]) ,print('\n',(' '.join(result))))
            #time.sleep(0.1)
except KeyboardInterrupt:
    input_stream.stop_stream()
    input_stream.close()
    p.terminate()
        

## KWS


In [None]:

def KWS_request(pth , modelType='CNN'):
    """Performing KWS service. through sending request to kws endpoing. it's either 'CNN' or 'LSTM' 

    Args:
        pth (str , path like): path to audio file to perform KWS on
        modelType (str, optional): KWS type. Defaults to 'CNN'.
    """
    url = f'http://127.0.0.1:8000/KWS?KWS_type={modelType}'   
    file = {'file': open(pth, 'rb')}
    resp = requests.post(url=url, files=file) 
    return(resp.json())    

## CNN

In [None]:
# this cell records from user his recitation , and gives feedback in terminal.
# prints feedback in red, green and percentages.
# audio length 2.5 seconds 

KWS_Surah = []
for i in range (4,8):
    KWS_Surah.append(colored(surah_que_main[i],'red'))
print(' '.join(KWS_Surah))

p = pyaudio.PyAudio()
input_stream = p.open(format=FORMAT,
                        channels=CHANNELS,
                        rate=RATE,
                        input=True,
                        frames_per_buffer=CHUNK)    
MAX_LEN=25
audio_surah_que = deque(maxlen=MAX_LEN)

try:
    counter = 0
    recitated_word_counter=4
    while True:
        print(Fore.GREEN, 'Recording...' , Fore.RESET,Back.RESET )
        while len(audio_surah_que) != MAX_LEN:
            data = input_stream.read(CHUNK)
            audio_surah_que.append(data)
        else:
            arr = np.frombuffer(b''.join(audio_surah_que),dtype=np.float32)
            audio_output_path = './surah_audio_output_sample.wav'
            sf.write(audio_output_path,arr,16_000)
            response = KWS_request(audio_output_path,'CNN')
            os.remove(audio_output_path)
            audio_surah_que.clear()
            idx=np.array(response['message']).argmax()
            
            if idx == recitated_word_counter and response['message'][idx] > 0.7:
                
                KWS_Surah[recitated_word_counter-4] = colored(surah_que_main[idx],'green')
                recitated_word_counter+=1
            
            clear_output(wait=True)
            display(print(' '.join(KWS_Surah)),(f"{response['message'][idx]:.2}   ",idx , surah_que_main[idx] ))
            time.sleep(0.1)
            
            
except KeyboardInterrupt:
    input_stream.stop_stream()
    input_stream.close()
    p.terminate()
        

## LSTM

In [None]:
KWS_Surah = []
for i in range (4,8):
    KWS_Surah.append(colored(surah_que_main[i],'red'))
print(' '.join(KWS_Surah))

p = pyaudio.PyAudio()
input_stream = p.open(format=FORMAT,
                        channels=CHANNELS,
                        rate=RATE,
                        input=True,
                        frames_per_buffer=CHUNK)    
MAX_LEN=30
audio_surah_que = deque(maxlen=MAX_LEN)

try:
    counter = 0
    recitated_word_counter=4
    while True:
        print(Fore.GREEN, 'Recording...' , Fore.RESET,Back.RESET )
        while len(audio_surah_que) != MAX_LEN:
            data = input_stream.read(CHUNK)
            audio_surah_que.append(data)
        else:
            arr = np.frombuffer(b''.join(audio_surah_que),dtype=np.float32)
            audio_output_path = './surah_audio_output_sample.wav'
            sf.write(audio_output_path,arr,16_000)
            
            response = KWS_request(audio_output_path,'LSTM')
            os.remove(audio_output_path)
            audio_surah_que.clear()
            idx=np.array(response['message']).argmax()
            
            if idx == recitated_word_counter and response['message'][idx] > 0.7:
                
                KWS_Surah[recitated_word_counter-4] = colored(surah_que_main[idx],'green')
                recitated_word_counter+=1
            
            clear_output(wait=True)
            display(print(' '.join(KWS_Surah)),(f"{response['message'][idx]:.2}   ",idx , surah_que_main[idx] ))
            time.sleep(0.1)
            
            
except KeyboardInterrupt:
    input_stream.stop_stream()
    input_stream.close()
    p.terminate()
        

## streaming and websockets

In [None]:
# this cell is to test mic and server are running correctly. 

import websocket
import _thread
import pyaudio
from colorama import Fore , Back

import pyaudio
FORMAT = pyaudio.paFloat32  
CHANNELS = 1  
RATE = 16000
CHUNK = 1600  

p = pyaudio.PyAudio()

#chunking parametrers
MAX_LEN=30
OVERLAP=25
global chunks_counter
chunks_counter = 0


def on_message(ws, message):
    print(Back.GREEN,message,Back.RESET)
def on_error(ws, error):
    print(Back.RED,'Error: ',error,Back.RESET)
def on_close(ws, close_status_code, close_msg):
    print(Back.RED , Fore.YELLOW,'CLOSED' , Back.RESET,Fore.RESET)
def on_open_streaming(ws):
    
    def run(*args):
        input_stream = p.open(format=FORMAT,
                        channels=CHANNELS,
                        rate=RATE,
                        input=True,
                        frames_per_buffer=CHUNK)    
        try:
            print('recording...')
            while True:
                data = input_stream.read(CHUNK)
                ws.send(data ,  websocket.ABNF.OPCODE_BINARY)     
        except KeyboardInterrupt:
            ws.close()
            input_stream.stop_stream()
            input_stream.close()
            p.terminate()
        
    _thread.start_new_thread(run, ())

def on_open_chunking(ws):
    """
    This function is used for chunking the audio data and sending it to the server.
    

    Args:
        depends on global variables
        MAX_LEN (int) : The maximum number of audio chunks to consider 
        OVERLAP (int) : number of chunks to shift i.e. the number of new audio chunks to add to queue
    """
    def run(*args):
        input_stream = p.open(format=FORMAT,
                        channels=CHANNELS,
                        rate=RATE,
                        input=True,
                        frames_per_buffer=CHUNK)    
        try:
            print('recording...')
            que = deque(maxlen=MAX_LEN)
            global chunks_counter
            chunks_counter=0
            while True:
                data = input_stream.read(CHUNK)
                que.append(data)
                chunks_counter+=1
                
                if  chunks_counter % OVERLAP ==0 and len(que) == MAX_LEN:
                    chunks_counter=0
                    desired_amount_of_audio = b''.join(que)
                    
                    ws.send(desired_amount_of_audio,  websocket.ABNF.OPCODE_BINARY)
                    
        except KeyboardInterrupt:
            ws.close()
            input_stream.stop_stream()
            input_stream.close()
            p.terminate()
        
    _thread.start_new_thread(run, ())
    


def init_websocket_connection(endpoint:str , streaming:bool = True):
    """creating websocket connection

    Args:
        endpoint (str): endpoint 
        streaming (bool): streaming or chunking mode. changes the 'on_open' function to plug
    """
    websocket.enableTrace(False)
    ws = websocket.WebSocketApp(f'ws://127.0.0.1:8000/{endpoint}',
                                on_open=on_open_streaming if streaming else on_open_chunking,
                                on_message=on_message,
                                on_error=on_error,
                                on_close=on_close)
    ws.run_forever()
    
    
def broadcast_application():
    init_websocket_connection('broadcast' , True)
def streaming_application():
    init_websocket_connection('audio_stream' , True)
def chunking_application():
    init_websocket_connection('audio_chunks' , False)
    



In [None]:
chunking_application()

In [None]:
streaming_application()
