In [1]:
# <----- ToDo ----->
# 1. ASR + TTS Integration
# 2. Multilingual feature 

In [2]:
!pip install PyMuPDF -q
!pip install openai -q
!pip install gradio -q
!pip install gTTS -q

[0m

In [3]:
import urllib.request
import fitz
import re
import numpy as np
import tensorflow_hub as hub
import openai
import gradio as gr
import os
from tqdm.auto import tqdm
from sklearn.neighbors import NearestNeighbors
import random

from gtts import gTTS

caused by: ['/opt/conda/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io_plugins.so: undefined symbol: _ZN3tsl6StatusC1EN10tensorflow5error4CodeESt17basic_string_viewIcSt11char_traitsIcEENS_14SourceLocationE']
caused by: ['/opt/conda/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io.so: undefined symbol: _ZTVN10tensorflow13GcsFileSystemE']


In [4]:
import sklearn
sklearn.__version__

'1.2.2'

## Extraction and Preprocessing

In [5]:
def download_pdf(url, output_path):
    '''
    download file from URL and save it to `output_path`
    '''
    urllib.request.urlretrieve(url, output_path)


def preprocess(text):
    '''
    preprocess chunks
    1. Replace new line character with whitespace.
    2. Replace redundant whitespace with a single whitespace
    '''
    text = text.replace('\n', ' ')
    text = re.sub('\s+', ' ', text)
    return text


def pdf_to_text(path, start_page=1, end_page=None):
    '''
    convert PDF document to text
    '''
    doc = fitz.open(path)
    total_pages = doc.page_count

    if end_page is None:
        end_page = total_pages

    text_list = []

    for i in tqdm(range(start_page-1, end_page)):
        text = doc.load_page(i).get_text("text")
        text = preprocess(text)
        text_list.append(text)

    doc.close()
    return text_list


def text_to_chunks(texts, word_length=120, start_page=1):
    '''
    convert list of texts to smaller chunks of length `word_length`
    '''
    text_toks = [t.split(' ') for t in texts]
    page_nums = []
    chunks = []
    
    for idx, words in enumerate(text_toks):
        for i in range(0, len(words), word_length):
            chunk = words[i:i+word_length]
            if (i+word_length) > len(words) and (len(chunk) < word_length) and (
                len(text_toks) != (idx+1)):
                text_toks[idx+1] = chunk + text_toks[idx+1]
                continue
            chunk = ' '.join(chunk).strip()
            chunk = f'({idx+start_page})' + ' ' + '"' + chunk + '"'
            chunks.append(chunk)
    return chunks

## Semantic Search

In [6]:
class SemanticSearch:
    
    def __init__(self):
        self.use = hub.load('https://tfhub.dev/google/universal-sentence-encoder/4')
        self.fitted = False
    
    
    def fit(self, data, batch=1000, n_neighbors=5):
        self.data = data
        self.embeddings = self.get_text_embedding(data, batch=batch)
        n_neighbors = min(n_neighbors, len(self.embeddings))
        self.nn = NearestNeighbors(n_neighbors=n_neighbors)
        self.nn.fit(self.embeddings)
        self.fitted = True
    
    
    def __call__(self, text, return_data=True):
        inp_emb = self.use([text])
        neighbors = self.nn.kneighbors(inp_emb, return_distance=False)[0]
        
        if return_data:
            return [self.data[i] for i in neighbors]
        else:
            return neighbors
    
    
    def get_text_embedding(self, texts, batch=1000):
        embeddings = []
        for i in tqdm(range(0, len(texts), batch)):
            text_batch = texts[i:(i+batch)]
            emb_batch = self.use(text_batch)
            embeddings.append(emb_batch)
        embeddings = np.vstack(embeddings)
        return embeddings

## GPT-3

In [21]:
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()


openai.api_key = user_secrets.get_secret("openai-key")

sem_search = SemanticSearch()


def load_sem_search(path, start_page=1):
    global sem_search
    texts = pdf_to_text(path, start_page=start_page)
    chunks = text_to_chunks(texts, start_page=start_page)
    sem_search.fit(chunks)
    return '✅ Corpus Loaded! ✅'


def generate_text(prompt, engine="text-davinci-003"):
    try:
        completions = openai.Completion.create(
            engine=engine,
            prompt=prompt,
            max_tokens=512,
            n=1,
            stop=None,
            temperature=0.7,
        )
        message = completions.choices[0].text
        return message
    except openai.error.APIError as e:
        # Handle API error here, e.g. retry or log
        print(f"OpenAI API returned an API Error: {e}")
        pass
    except openai.error.APIConnectionError as e:
        # Handle connection error here
        print(f"Failed to connect to OpenAI API: {e}")
        pass
    except openai.error.RateLimitError as e:
        # Handle rate limit error (we recommend using exponential backoff)
        print(f"OpenAI API request exceeded rate limit: {e}")
        pass


def generate_answer(question, history):
    global sem_search
    try:
        topn_chunks = sem_search(question)
        prompt = ""
    
        prompt += "Instructions: You are Dook3, an AI book assistant that enhances the reading experience. Follow these guidelines:\n"\
                  "- Refer to the 'Chat History' for previous conversations with the user.\n"\
                  "- Use the 'Search Results' for better details related to the user's 'Query'.\n"\
                  "- If the 'Search Results' are not relevant or helpful, politely ask the user for more specific information.\n"\
                  "- Compose comprehensive replies based on the 'Search Results', do not forget to cite reference page numbers using {number} notation provided in the 'Search Results'.\n"\
                  "- Create separate answers for multiple subjects with the same name in 'Search Results'.\n"\
                  "- Only include information from the 'Search Results' (can be from 'Chat History' in some rare conversations).\n"\
                  "- Ensure the answer you generate is correct and refrain from providing false content.\n"\
                  "- Start your answer directly without using the term 'Answer:' or 'Dook3:'.\n\n"

        prompt += f"Chat History:\n\n{history}\n\n\n"
        prompt += 'Search Results:\n\n'

        for c in topn_chunks:
            prompt += c + '\n\n'

        prompt += f"\nQuery: {question}\n\n"
        answer = generate_text(prompt)
        return answer
    except:
        prompt = ""
        prompt += "Instructions: You are Dook3, an AI book assistant to enhance the reading experience of the users. Follow these guidelines:\n"\
                  "- Refer to the 'Chat History' for previous conversations with the user.\n"\
                  "- As the user has not provided any reading material, exercise caution while generating information based solely on your own knowledge and expertise.\n"\
                  "- Inform the user that without specific reading material, providing accurate answers may be challenging and could lead to potential misinformation.\n"\
                  "- Suggest the user provide relevant reading material for better assistance.\n"\
                  "- Rely on the 'Chat History' and your general knowledge to provide helpful insights, but be aware of the risk of generating false information.\n"\
                  "- Start your answer directly without using the terms 'Answer:' or 'Dook3:'.\n\n"

        prompt += f"Chat History:\n\n{history}\n\n\n"

        prompt += f"Query: {question}\n\n"
        answer = generate_text(prompt)
        return answer

### Loading Material

In [8]:
def mat_loader(dropdown, url, file):
    if url.strip() == '' and file == None and dropdown.strip() == '':
        return '[ERROR]: Please provide either a URL, a PDF or choose a book from the Dropdown.'
    
    if url.strip() != '' and file != None:
        return '[ERROR]: Please provide only one (either URL or PDF or Select from the Dropdown).'
    
    if file != None and dropdown.strip() != '':
        return '[ERROR]: Please provide only one (either URL or PDF or Select from the Dropdown).'
    
    if url.strip() != '' and dropdown.strip() != '':
        return '[ERROR]: Please provide only one (either URL or PDF or Select from the Dropdown).'

    if url.strip() != '':
        glob_url = url
        download_pdf(glob_url, 'corpus.pdf')
        return load_sem_search('corpus.pdf')
        
    if dropdown.strip() != '':
        print(dropdown)
        return load_sem_search(f"/kaggle/input/dook-books/{dropdown}.pdf")

    else:
        old_file_name = file.name
        file_name = file.name
        file_name = file_name[:-12] + file_name[-4:]
        os.rename(old_file_name, file_name)
        return load_sem_search(file_name)

### ASR

In [9]:
!mkdir whisper-sppech2txt
%cd whisper-sppech2txt
!git clone https://github.com/innovatorved/whisper-openai-gradio-implementation.git .
!pip install -r requirements.txt

/kaggle/working/whisper-sppech2txt
Cloning into '.'...
remote: Enumerating objects: 36, done.[K
remote: Counting objects: 100% (36/36), done.[K
remote: Compressing objects: 100% (26/26), done.[K
remote: Total 36 (delta 15), reused 27 (delta 8), pack-reused 0[K
Unpacking objects: 100% (36/36), 135.77 KiB | 3.99 MiB/s, done.
Collecting whisper@ git+https://github.com/openai/whisper.git@0b1ba3d46ebf7fe6f953acfd8cad62a4f851b49f (from -r requirements.txt (line 79))
  Cloning https://github.com/openai/whisper.git (to revision 0b1ba3d46ebf7fe6f953acfd8cad62a4f851b49f) to /tmp/pip-install-_cozlvd8/whisper_e685c5e21d594783a5d4df3b7c2b2087
  Running command git clone --filter=blob:none --quiet https://github.com/openai/whisper.git /tmp/pip-install-_cozlvd8/whisper_e685c5e21d594783a5d4df3b7c2b2087
  Running command git rev-parse -q --verify 'sha^0b1ba3d46ebf7fe6f953acfd8cad62a4f851b49f'
  Running command git fetch -q https://github.com/openai/whisper.git 0b1ba3d46ebf7fe6f953acfd8cad62a4f851b4

In [10]:
import whisper
import time

# You can choose your model from - see it on readme file and update the modelname
modelname = "base"
model = whisper.load_model(modelname)

def asr(audio):
    if audio == None : return "" 
    time.sleep(1)

    audio = whisper.load_audio(audio)
    audio = whisper.pad_or_trim(audio)

    # make log-Mel spectrogram and move to the same device as the model
    mel = whisper.log_mel_spectrogram(audio).to(model.device)

    #  Decode audio to Text
    options = whisper.DecodingOptions(fp16 = False)
    result = whisper.decode(model, mel, options)
    return result.text

100%|███████████████████████████████████████| 139M/139M [00:01<00:00, 95.3MiB/s]


### TTS

In [23]:
from io import BytesIO
import base64


def preprocess_tts_text(text):
    last_tuple = text[-1]
    bot_response = last_tuple[1]
    cleaned_text = re.sub(r'\(\d+\)', '', bot_response)
    cleaned_text = re.sub(r'(\w+)-(\w+)', r'\1 \2', cleaned_text)
    cleaned_text = cleaned_text.replace('\n', ' ')
    cleaned_text = re.sub('\s+', ' ', cleaned_text)
        
    return cleaned_text


def text_to_speech(voice, text):
    if voice == '':
        return '[ERROR]:   Please select your prefered voice.'
    
    if text == '':
        return '[ERROR]:   Please first ask a query to Dook3.'
    
    else:
        answer = preprocess_tts_text(text)
    
        if voice == 'Author 1':
            tts = gTTS(answer, lang='en', tld='us')
            tts.save('auth.wav')
            audio_bytes = BytesIO()
            tts.write_to_fp(audio_bytes)
            audio_bytes.seek(0)

            audio = base64.b64encode(audio_bytes.read()).decode("utf-8")
            audio_player = f'<audio src="data:audio/mpeg;base64,{audio}" controls autoplay></audio>'

            return audio_player


        if voice == 'Author 2':
            tts = gTTS(answer, lang='en', tld='co.in')
            tts.save('auth.wav')
            audio_bytes = BytesIO()
            tts.write_to_fp(audio_bytes)
            audio_bytes.seek(0)

            audio = base64.b64encode(audio_bytes.read()).decode("utf-8")
            audio_player = f'<audio src="data:audio/mpeg;base64,{audio}" controls autoplay></audio>'

            return audio_player

        if voice == 'Author 3':
            tts = gTTS(answer, lang='en', tld='com.au')
            tts.save('auth.wav')
            audio_bytes = BytesIO()
            tts.write_to_fp(audio_bytes)
            audio_bytes.seek(0)

            audio = base64.b64encode(audio_bytes.read()).decode("utf-8")
            audio_player = f'<audio src="data:audio/mpeg;base64,{audio}" controls autoplay></audio>'

            return audio_player

        if voice == 'Author 4':
            tts = gTTS(answer, lang='en', tld='co.uk')
            tts.save('auth.wav')
            audio_bytes = BytesIO()
            tts.write_to_fp(audio_bytes)
            audio_bytes.seek(0)

            audio = base64.b64encode(audio_bytes.read()).decode("utf-8")
            audio_player = f'<audio src="data:audio/mpeg;base64,{audio}" controls autoplay></audio>'

            return audio_player

        if voice == 'Author 5':
            tts = gTTS(answer, lang='en', tld='ie')
            tts.save('auth.wav')
            audio_bytes = BytesIO()
            tts.write_to_fp(audio_bytes)
            audio_bytes.seek(0)

            audio = base64.b64encode(audio_bytes.read()).decode("utf-8")
            audio_player = f'<audio src="data:audio/mpeg;base64,{audio}" controls autoplay></audio>'

            return audio_player

### Conversation Functions

In [12]:
def bot(user_input, history):
#     openai.api_key = api_key
    history = history or []
    my_hist = list(sum(history, ()))
    my_hist.append("User: " + user_input)
    my_inp = ' '.join(my_hist)
    if user_input=='':
        output = "Please enter your text."
    else:
#     if api_key == '':
#         output = "Hi there, to use Dook3, please enter your OpenAI key."
#     if user_input == '':
#         output = "Please provide some information for me to understand."        
#     else:
#     try:
        output = generate_answer(user_input, history)
#     except openai.error.AuthenticationError:
#         output = "The OpenAI Key that you entered is Invalid. Please provide a valid API key."
    my_hist.append("Dook3: " + output)
    history.append((user_input, output))
    print(my_hist)
    print(history)
    return history, history


# def retry(user_input, history):
#     if len(history) > 0:
#         history.pop()  # Remove the last entry (bot response) from history
#         my_hist = [f"User: {user_input}"]
#         my_hist.extend(history)  # Add the remaining history entries
#         my_inp = ' '.join(my_hist)
#         output = generate_answer(user_input, history)
#         my_hist.append(f"Dook3: {output}")
#         history.append((user_input, output))
#         print(my_hist)
#         return history, history
#     else:
#         return "No user input."    


def reset_state(history):
    return [], [], history

def reset_textbox():
    return gr.update(value="")

## Gradio UI

In [26]:
title = 'Dook3 - Your book assistant!'

voice_options = ["Author 1", "Author 2", "Author 3", "Author 4", "Author 5"]

book_options = [
    " ",
    "Electric Charges and Fields",
    "Probability",
    "Sapiens-A-Brief-History-of-Humankind",
    "Srimad-Bhagavad-Gita"
]


with gr.Blocks() as demo:
    
    gr.Markdown(f'<center><h1>{title}</h1></center>')
    
#     with gr.Row():
#         key = gr.Textbox(label='OpenAI API Key', placeholder='sk-', type='password')
    
    with gr.Tab(label="Dook3"):
        # <---------- Loading the Material ---------->
        with gr.Accordion(label="Load your Reading Material", open=False):
            with gr.Row().style(equal_height=True):
                with gr.Column():
                    books = gr.Dropdown(book_options, label="Please select your book!")
                    gr.Markdown("<center><h6>OR<h6></center>")
                    url = gr.Textbox(label='Enter PDF URL')
                with gr.Column():
                    file = gr.File(label='Upload PDF', file_types=['.pdf'])
            with gr.Row().style(equal_height=True):
                with gr.Column():
                    submit = gr.Button("Submit")
                    submit.style(size="sm")
                with gr.Column(): 
                    clear = gr.Button("Clear")
                    clear.style(size="sm")
            with gr.Row():
                load_status = gr.Textbox(label="Load Status", placeholder="❌ No Material loaded! ❌")
                load_status.style()

        submit.click(mat_loader, inputs=[books,url,file], outputs=[load_status])
    #     clear.click()


        # <---------- Chatbot Interface ---------->
        with gr.Tab(label="Chatbot"):
            chatbot = gr.Chatbot([], elem_id="chatbot", label="Dook3").style(height=350)
            state = gr.State()
            with gr.Row():
                with gr.Column(scale=1):
                    msg = gr.Textbox(show_label=False, placeholder="Send a message.").style(container=False)
                with gr.Column(scale=0.1, min_width=20):
                    send = gr.Button("Send")
                    send.style(full_width=True, size="sm")
            with gr.Row().style():
                audio = gr.Microphone(source="microphone", type="filepath", label="Just say it!", interactive=True, streaming=False)
                
            with gr.Row().style(equal_height=True):
                reset = gr.Button("New Conversation")
                reset.style(full_width=True, size="sm")
                regen = gr.Button("🔁Regenerate Response")
                regen.style(full_width=True, size="sm")
                
        # <---------- TTS ---------->
        with gr.Accordion(label="Text to Speech", open=False):
            with gr.Row().style(equal_height=True):
                with gr.Column(scale=5):
                    voices = gr.Radio(voice_options, label="Please select Author voice")
                with gr.Column(scale=1):
                    tts_btn = gr.Button(value="🎧")
                    tts_btn.style(full_width=True)
            with gr.Row():
                html = gr.HTML()
#                 html.visible = False
        
        
        # ASR input
        audio.change(asr, inputs=[audio], outputs=[msg])
        
        # User input + Press 'Enter':
        msg.submit(bot, inputs=[msg, state], outputs=[chatbot, state], scroll_to_output=True)
        msg.submit(lambda x: gr.update(value=''), [state],[msg], queue=False)

        # User input + Click 'Send': 
        send.click(bot, inputs=[msg, state], outputs=[chatbot, state], scroll_to_output=True)
        send.click(lambda x: gr.update(value=''), [state],[msg], queue=False)
        
        # TTS
        tts_btn.click(text_to_speech, inputs=[voices, chatbot], outputs=[html], scroll_to_output=True)

        # Regenerate Response
    #     regen.click(retry, inputs=[msg, state], outputs=[chatbot, state], scroll_to_output=True)

        prev = gr.State()

        # Clear Conversation
        reset.click(
            reset_state,
            inputs=[state],
            outputs=[chatbot, state, prev],
            show_progress=True,
        )
        reset.click(fn=reset_textbox, inputs=[], outputs=[msg], queue=False)

    #     reset.click(fn=None, _js="window.open('https://b615d2b10330d5ff5b.gradio.live')")
    
    with gr.Tab(label="Conversations"):
        # <---------- Previous Conversations ---------->
        with gr.Accordion(label="Previous Conversations", open=False):
            chatbot = gr.Chatbot([], elem_id="chatbot", label="Dook3").style(height=400)

            with gr.Row():
                with gr.Column(scale=1):
                    msg = gr.Textbox(show_label=False, placeholder="Send a message.").style(container=False)
                with gr.Column(scale=0.1, min_width=20):
                    send = gr.Button("Send")
                    send.style(full_width=True, size="sm")

            with gr.Row().style(equal_height=True):
                reset = gr.Button("New Conversation")
                reset.style(full_width=True, size="sm")
                regen = gr.Button("🔁Regenerate Response")
                regen.style(full_width=True, size="sm")

        # User input + Press 'Enter':
        msg.submit(bot, inputs=[msg, prev], outputs=[chatbot, prev], scroll_to_output=True)
        msg.submit(lambda x: gr.update(value=''), [prev],[msg], queue=False)

        # User input + Click 'Send': 
        send.click(bot, inputs=[msg, prev], outputs=[chatbot, prev], scroll_to_output=True)
        send.click(lambda x: gr.update(value=''), [prev],[msg], queue=False)
        

demo.queue()    
demo.launch()
# demo.launch(share=True, auth=("username", "password"), auth_message="Please enter the username and password provided to you by the owner of this Space.")

Kaggle notebooks require sharing enabled. Setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Running on local URL:  http://127.0.0.1:7868
Running on public URL: https://4893a1eba641720f11.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)




['User: hi', "Dook3: \nHi there! I'm Dook3, an AI book assistant here to enhance your reading experience. I'm here to help you with any queries you may have about books and reading. However, without specific reading material, providing accurate answers may be challenging and could lead to potential misinformation. It would be great if you could provide relevant reading material for better assistance."]
[('hi', "\nHi there! I'm Dook3, an AI book assistant here to enhance your reading experience. I'm here to help you with any queries you may have about books and reading. However, without specific reading material, providing accurate answers may be challenging and could lead to potential misinformation. It would be great if you could provide relevant reading material for better assistance.")]


In [14]:
# with gr.Blocks() as prev_conv:
    
#     # <---------- Previous Conversations ---------->
#     with gr.Accordion(label="Previous Conversations", open=False):
#         chatbot = gr.Chatbot([], elem_id="chatbot", label="Dook3").style(height=400)
        
#         with gr.Row():
#             with gr.Column(scale=1):
#                 msg = gr.Textbox(show_label=False, placeholder="Send a message.").style(container=False)
#             with gr.Column(scale=0.1, min_width=20):
#                 send = gr.Button("Send")
#                 send.style(full_width=True, size="sm")
            
#         with gr.Row().style(equal_height=True):
#             reset = gr.Button("New Conversation")
#             reset.style(full_width=True, size="sm")
#             regen = gr.Button("🔁Regenerate Response")
#             regen.style(full_width=True, size="sm")
            
#     # User input + Press 'Enter':
#     msg.submit(bot, inputs=[msg, prev, key], outputs=[chatbot, prev], scroll_to_output=True)
#     msg.submit(lambda x: gr.update(value=''), [prev],[msg], queue=False)
    
#     # User input + Click 'Send': 
#     send.click(bot, inputs=[msg, prev, key], outputs=[chatbot, prev], scroll_to_output=True)
#     send.click(lambda x: gr.update(value=''), [prev],[msg], queue=False)
    
# prev_conv.queue()
# prev_conv.launch()

In [15]:
# hf_tok = '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'

# demo = gr.load(name='aneesh-b/dookgpt', src='spaces', hf_token=hf_tok)
    
# demo.launch(auth=('username', 'password'),)