In [None]:
!pip install gradio

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import gradio as gr
import os
import numpy as np
os.system("pip install pdfminer.six rank_bm25 torch transformers")

0

In [None]:
from gradio.mix import Series
#import re
from rank_bm25 import BM25Okapi
import string 
import torch
from transformers import pipeline
import pdfminer
from pdfminer.high_level import extract_text


In [None]:
len_doc = 500
overlap = 15 
param_top_k_retriver = 15
param_top_k_ranker = 3


In [None]:
qa_model = pipeline("question-answering", 
                    #model = "deepset/minilm-uncased-squad2")
                    model = "deepset/roberta-base-squad2"
                    #model="vblagoje/bart_lfqa"
                    )

Downloading:   0%|          | 0.00/571 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/473M [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/79.0 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/878k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/446k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/772 [00:00<?, ?B/s]

In [None]:
def read_pdf(file):
  text = extract_text(file.name)
  # Split text into smaller docs
  docs = []
  
  i = 0
  while i < len(text):
    docs.append(text[i:i+len_doc])
    i = i + len_doc - overlap

  return docs

In [None]:
def read_text(text):
  docs = []
  
  i = 0
  while i < len(text):
      docs.append(text[i:i+len_doc])
      i = i + len_doc - overlap
  return docs

In [None]:
def bm25_tokenizer(text):
    stop_w = ['a', 'the', 'am', 'is' , 'are', 'who', 'how', 'where', 'when', 'why', 'what']
    tokenized_doc = []
    for token in text.lower().split():
        token = token.strip(string.punctuation)

        if len(token) > 0 and token not in stop_w:
            tokenized_doc.append(token)
    return tokenized_doc

In [None]:
def retrieval(query, top_k_retriver, docs, bm25_):

    bm25_scores = bm25_.get_scores(bm25_tokenizer(query))
    top_n = np.argsort(bm25_scores)[::-1][:top_k_retriver]
    bm25_hits = [{'corpus_id': idx, 
                  'score': bm25_scores[idx], 
                  'docs':docs[idx]} for idx in top_n if bm25_scores[idx] > 0]
    bm25_hits = sorted(bm25_hits, key=lambda x: x['score'], reverse=True)
    
    return bm25_hits

In [None]:
def qa_ranker(query, docs_, top_k_ranker):
    ans = []
    for doc in docs_:
        answer = qa_model(question = query, 
                            context = doc)
        answer['doc'] = doc
        ans.append(answer)
    return sorted(ans, key=lambda x: x['score'], reverse=True)[:top_k_ranker]

In [None]:
def cstr(s, color='black'):
    return "<text style=color:{}>{}</text>".format(color, s)
def cstr_bold(s, color='black'):
    return "<text style=color:{}><b>{}</b></text>".format(color, s)
def cstr_break(s, color='black'):
    return "<text style=color:{}><br>{}</text>".format(color, s)


In [None]:
def print_colored(text, start_idx, end_idx, confidence):
    conf_str = '- Confidence: ' +  confidence
    a = cstr(' '.join([text[:start_idx], \
                        cstr_bold(text[start_idx:end_idx], color='blue'), \
                        text[end_idx:], \
                        cstr_break(conf_str, color='grey')]), color='black')
    return a

In [None]:
def final_qa_pipeline(file, query):
    docs = read_pdf(file)
    tokenized_corpus = []
    for doc in docs:
        tokenized_corpus.append(bm25_tokenizer(doc))
    
    bm25 = BM25Okapi(tokenized_corpus)
    
    top_k_retriver, top_k_ranker = param_top_k_retriver, param_top_k_ranker
    lvl1 = retrieval(query, top_k_retriver, docs, bm25)

    if len(lvl1) > 0:
        fnl_rank = qa_ranker(query, [l["docs"] for l in lvl1], top_k_ranker)
        top1 = print_colored(fnl_rank[0]['doc'], fnl_rank[0]['start'], fnl_rank[0]['end'], str(np.round(100*fnl_rank[0]["score"],1))+"%")  
        if len(lvl1)>1:
            top2 = print_colored(fnl_rank[1]['doc'], fnl_rank[1]['start'], fnl_rank[1]['end'], str(np.round(100*fnl_rank[1]["score"],1))+"%")
        else:
            top2 = "None"
        return (top1, top2)
    else:
        return ("No match","No match")

In [None]:
def final_qa_pipeline2(context, query):
    docs = read_text(context)
    tokenized_corpus = []
    for doc in docs:
        tokenized_corpus.append(bm25_tokenizer(doc))
    
    bm25 = BM25Okapi(tokenized_corpus)
    
    top_k_retriver, top_k_ranker = param_top_k_retriver, param_top_k_ranker
    lvl1 = retrieval(query, top_k_retriver, docs, bm25)

    if len(lvl1) > 0:
        fnl_rank = qa_ranker(query, [l["docs"] for l in lvl1], top_k_ranker)
        top1 = print_colored(fnl_rank[0]['doc'], fnl_rank[0]['start'], fnl_rank[0]['end'], str(np.round(100*fnl_rank[0]["score"],1))+"%")  
        if len(lvl1)>1:
            top2 = print_colored(fnl_rank[1]['doc'], fnl_rank[1]['start'], fnl_rank[1]['end'], str(np.round(100*fnl_rank[1]["score"],1))+"%")
        else:
            top2 = "None"
        return (top1, top2)
    else:
        return ("No match","No match")

In [None]:
demo = gr.Blocks()

with demo:
    gr.Markdown("Question Answering System: Enter a PDF File or the relevant Context")
    with gr.Tabs():
        with gr.TabItem("PDF Input"):
            file_input=gr.File(label="Input PDF File")
            text_input=gr.inputs.Textbox(label="Question:")
            text_output=[gr.outputs.HTML(label="Top 1 answer"), gr.outputs.HTML(label="Top 2 answer")]
            #text_input = gr.Textbox()
            #text_output = gr.Textbox()
            pdf_button = gr.Button("Answer")
        
        with gr.TabItem("Text Input"):
            #with gr.Row():
            text_input1=gr.inputs.Textbox(label="Context:")
            text_input2=gr.inputs.Textbox(label="Question:")
            text_output2=[gr.outputs.HTML(label="Top 1 answer"), gr.outputs.HTML(label="Top 2 answer")]    
            text_button = gr.Button("Answer")

    pdf_button.click(final_qa_pipeline, inputs=[file_input,text_input], outputs=text_output)
    text_button.click(final_qa_pipeline2, inputs=[text_input1,text_input2], outputs=text_output2)



In [None]:
demo.launch(debug=True,share=True)

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
Running on public URL: https://23956.gradio.app

This share link expires in 72 hours. For free permanent hosting, check out Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.


(<gradio.routes.App at 0x7f66ec006050>,
 'http://127.0.0.1:7860/',
 'https://23956.gradio.app')