In [6]:
#retrieve the question and comments in FDA meeting videos

import configparser, os, re
config = configparser.ConfigParser()
config.read('./keys.ini')
os.environ['GOOGLE_API_KEY'] = config['GOOGLE']['GOOGLE_API_KEY']
os.environ['GOOGLE_CSE_ID'] = config['GOOGLE']['GOOGLE_CSE_ID']
openai_api_key = config['OPENAI']['OPENAI_API_KEY']
os.environ['OPENAI_API_KEY'] = openai_api_key

In [7]:
from typing import Any, List, Mapping, Optional
from langchain.llms.base import LLM
from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.chat_models import ChatOpenAI
#llm = ChatOpenAI(temperature=0, model_name="gpt-4-1106-preview")
from langchain.docstore.document import Document
import requests
import re

class CustomLLM2(LLM):
    @property
    def _llm_type(self) -> str:
        return "custom"

    def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:
        if isinstance(stop, list):
            stop = stop + ["\n###","\nObservation:","\n问题","\nQuestion:"]
        HOST = 'localhost:5000'
        URI = f'http://{HOST}/v1/chat/completions'

        response = requests.post(
            URI,
            json={
                "messages": [
                {
                    "role": "user",
                    "content": prompt
                  }
                ],
                "mode": "instruct",
                "instruction_template": "Alpaca",
            },
        )
        response.raise_for_status()
        return response.json()['choices'][0]['message']['content']
  
    @property
    def _identifying_params(self) -> Mapping[str, Any]:
        """Get the identifying parameters."""
        return {}

#use one 1 chain to summary the story dialogue from main page content of the URL
def story_summary_stuff(docs, prompt_template = ""):
    #input: docs of the web page
    from langchain.chains.llm import LLMChain
    from langchain.prompts import PromptTemplate
    from langchain.chains.combine_documents.stuff import StuffDocumentsChain
    # Define prompt
    if prompt_template == "":
        prompt_template = """in the following subtitle, is it a specific question about clinical trial or patient?  
        If "No",  only answers "No". Otherwise, summarize the question.

        "{subtitle}"
        
        Output:"""
    prompt = PromptTemplate.from_template(prompt_template)

    # Define LLM chain
    #llm = ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo-16k")
    llm_chain = LLMChain(llm=llm, prompt=prompt)

    # Define StuffDocumentsChain
    stuff_chain = StuffDocumentsChain(
        llm_chain=llm_chain, document_variable_name="subtitle"
    )
    
    return stuff_chain.run(docs)

def get_sec(time_str):
    """Get seconds from time."""
    h, m, s = time_str.split(':')
    return int(h) * 3600 + int(m) * 60 + int(s)

def save_snapshot(vid, time, path = "./cache/FDA/"):
    import cv2
    vidcap = cv2.VideoCapture(path+"/video/"+vid+".mp4")
    vidcap.set(cv2.CAP_PROP_POS_MSEC,int(time)*1000)      # just cue to <time> sec. position
    success,image = vidcap.read()
    if success:
        cv2.imwrite(path+"/image/"+vid+"/"+str(time)+".jpg", image)     # save frame as JPEG file
    return(path+"/image/"+vid+"/"+str(time)+".jpg");

#search keyword and summarize the following sentences in the next 200 characters
def search_keyword(srt, keyword = "question", next = 1000, prefix = "", limit = 3):
    res = []; time_stamp = []; link = []; count = 0;
    for m in re.finditer(keyword, srt):
        text = srt[m.start():(m.start()+next)]
        #get the first time stamp
        time = re.findall('\d+:\d+:\d+',text); time = time[0];
        #remove time stamps
        text = text.splitlines(); tmp = "";
        for t in text:
            if (len(t) > 5) & (not(":" in t)):
                tmp = tmp + t + "\n";
        output_srt = Document(page_content=tmp, metadata="");
        answer = story_summary_stuff([output_srt])
        if not("No" in answer):
            time_stamp.append(str(get_sec(time)));
            link.append(prefix + str(get_sec(time)));
            res.append(answer);
            count += 1;
            if count > limit:
                break;
            #res.append(prefix + str(get_sec(time)) + "\n" + answer)
            #print(tmp);print(res[-1]);
    return time_stamp, link, res

#youtube loader and return docs for the transcript with time stamps
def loader(link:str, language=["zh"], db_loc = "./cache/YTDBT", overwrite = False, path = "./cache/FDA/"):
    #load video info from link
    from langchain.document_loaders import YoutubeLoader
    try:
        loader = YoutubeLoader.from_youtube_url(
            link, add_video_info=True, language=language
        )
        docs = loader.load()
    except Exception as e:
        print(e)
        return(None);
        
    #check whether the vid is already in the DB
    this_id = docs[0].metadata['source']
    from langchain.vectorstores import Chroma
    from langchain.embeddings import OpenAIEmbeddings
    db = Chroma(persist_directory=db_loc, embedding_function=OpenAIEmbeddings())
    tmp = db.get()['ids']
    #this_db_list = [x.split("_")[0] for x in tmp]
    #this_db_set = set(this_db_list)
    this_db_set = set(tmp)
    if this_id in this_db_set:
        if overwrite == False:
            return(link+" link already in the db, skip");
    print("adding link "+link);

    #check whether the video has been downloaded
    from pathlib import Path
    if not(Path(path + "/video/" + this_id + ".mp4")):
        downloadYouTube(this_id, path = path + "/video/")
    
    #format the transcript into SRT
    from youtube_transcript_api import YouTubeTranscriptApi
    from youtube_transcript_api.formatters import SRTFormatter
    transcript = YouTubeTranscriptApi.get_transcript(docs[0].metadata['source'],languages=language)
    formatter = SRTFormatter()
    srt_formatted = formatter.format_transcript(transcript)
    with open("./cache/Output.srt", "w", encoding="utf-8") as text_file:
        text_file.write(srt_formatted)

    #get the summary of questions
    time_stamp, link, res = search_keyword(srt_formatted, prefix = "https://youtu.be/"+docs[0].metadata['source']+"?t=")
    from langchain.embeddings import OpenAIEmbeddings
    from langchain.docstore.document import Document
    output_doc = []; ids = [];
    for i in range(len(res)):
        snapshot_path = save_snapshot(this_id, time_stamp[i], path);
        #write the summary into DB
        meta = docs[0].metadata
        meta['time'] = time_stamp[i];
        meta['vlink'] = link[i];
        meta['snapshot'] = snapshot_path
        output_doc.append(Document(page_content=res[i], metadata=dict(meta)));
        ids.append(meta['source']+"_"+str(meta['time']));
    db.add_documents(output_doc, ids = ids)
    return(output_doc)

def downloadYouTube(vid, path = "./cache/FDA/video/"):
    from pytube import YouTube
    import os
    videourl = "https://www.youtube.com/watch?v="+vid
    yt = YouTube(videourl)
    yt = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()
    if not os.path.exists(path):
        os.makedirs(path)
    yt.download(output_path = path, filename = vid+".mp4")


In [8]:
from langchain.chat_models import ChatOpenAI
#llm = ChatOpenAI(temperature=0, model_name="gpt-4-1106-preview")
llm = CustomLLM2()
res = loader("https://www.youtube.com/watch?v=-iHseGn2LhQ&list=PLweTl9OQEsJhOM8W6ZAigP7eJxBU_yQM6&index=1&ab_channel=U.S.FoodandDrugAdministration",language=["en","zh-Hant"], db_loc = "./cache/test", overwrite = True)

adding link https://www.youtube.com/watch?v=-iHseGn2LhQ&list=PLweTl9OQEsJhOM8W6ZAigP7eJxBU_yQM6&index=1&ab_channel=U.S.FoodandDrugAdministration


In [38]:
import chromadb
from json2html import *

#output documents 
def run(doc:list, output = "output.html", path = "./cache/FDA/"):
    tmp = [];
    for k in range(len(doc)):
        this = {"link": doc[k].metadata["vlink"]+".vlink",
                "question": doc[k].page_content,
                "snapshot": ".image"+doc[k].metadata["snapshot"]
               }
        tmp.append(this);

    html = json2html.convert(json = tmp)
    #convert link
    html = html.replace("http","<a href=\"http")
    html = html.replace(".vlink</td>","\" target=\"_blank\">vlink</a>")
    #convert image
    html = html.replace(".image","<img src=\"http")
    html = html.replace("jpg</td>","jpg\"></td>")

    with open(path + output, 'w', encoding="utf-8") as f:
        f.write(html)
    
    return("success: write query to output.html")

In [39]:
run(res)

'success: write query to output.html'

In [14]:
#output documents 
def run(doc:list, output = "output.html", path = "./cache/FDA/"):
    tmp = [];
    for k in range(len(doc)):
        this = {"link": doc[k].metadata["vlink"]+".vlink",
                "question": doc[k].page_content,
                "snapshot": ".image"+doc[k].metadata["snapshot"]
               }
        tmp.append(this);
    return(tmp)
tmp = run(res)

In [28]:
from IPython.display import display, Markdown, Latex
tmp = "![alt text](./cache/FDA/image/-iHseGn2LhQ/3616.jpg \"Title\")"
display(Markdown('*some markdown* $\phi$'))
# If you particularly want to display maths, this is more direct:
display(Latex('\phi'))
display(Markdown(tmp))
from markdown_pdf import MarkdownPdf
pdf = MarkdownPdf(toc_level=2)

*some markdown* $\phi$

<IPython.core.display.Latex object>

![alt text](./cache/FDA/image/-iHseGn2LhQ/3616.jpg "Title")

In [None]:
import pandas as pd
df = pd.DataFrame(tmp)
mk = df.to_markdown(index=False,tablefmt='fancy_grid')