In [None]:
!pip install -q transformers accelerate bitsandbytes
!pip -q install langchain pypdf chromadb sentence-transformers faiss-gpu rank_bm25
!pip install datasets
!pip install jq

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig,AutoConfig, pipeline
from langchain.llms import HuggingFacePipeline

## 양자화 하기 위한 설정 ##
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
)

## Llama2 불러오고 토크나이저 불러오고 양자화 시키는 코드 ##
model_id = "tilyupo/llama-2-7b-hf-trivia-ca2q"
model = AutoModelForCausalLM.from_pretrained(model_id,quantization_config=bnb_config,device_map="auto")
!huggingface-cli login
model_tokenizer="meta-llama/Llama-2-7b-chat-hf"
tokenizer = AutoTokenizer.from_pretrained(model_tokenizer)

In [None]:
text_generation_pipeline = pipeline(
    model=model,
    tokenizer=tokenizer,
    task="text-generation",
    temperature=0.2,
    return_full_text=True,
    max_new_tokens=300,
)
llm_pipeline = HuggingFacePipeline(pipeline=text_generation_pipeline)

In [None]:
from langchain.prompts import PromptTemplate
## prompt 설정해주는 코드 안넣고 싶었는데 이걸 넣어야 코드가 진행되서 넣었음
prompt_template = """
### CONTEXT ###
{context}

### QUESTION ###
{question}
 """
prompt = PromptTemplate(
  input_variables=["context", "question"],
  template=prompt_template
)

In [None]:
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever
## 벡터 임베딩 모델 불러오고 'context'저장된 파일 text split한 후 FAISS 벡터 저장소에 임베딩 시킨 벡터 저장
embed_model_id="sentence-transformers/all-MiniLM-L6-v2"
embeddings = HuggingFaceEmbeddings(model_name=embed_model_id,model_kwargs={"device":"cuda"})

In [None]:
import re
import string
import collections
## EM Score / F1 Score 계산하기 위한 코드
def normalize_answer(s):
  """Lower text and remove punctuation, articles and extra whitespace."""
  def remove_articles(text):
    regex = re.compile(r'\b(a|an|the)\b', re.UNICODE)
    return re.sub(regex, ' ', text)
  def white_space_fix(text):
    return ' '.join(text.split())
  def remove_punc(text):
    exclude = set(string.punctuation)
    return ''.join(ch for ch in text if ch not in exclude)
  def lower(text):
    return text.lower()
  return white_space_fix(remove_articles(remove_punc(lower(s))))
def get_tokens(s):
  if not s: return []
  return normalize_answer(s).split()

def compute_exact(a_gold, a_pred):
  if a_pred == None:
    return 0
  return int(normalize_answer(a_gold) == normalize_answer(a_pred))

def compute_f1(a_gold, a_pred):
  if a_pred == -1:
    return -1
  gold_toks = get_tokens(a_gold)
  pred_toks = get_tokens(a_pred)
  common = collections.Counter(gold_toks) & collections.Counter(pred_toks)
  num_same = sum(common.values())
  if len(gold_toks) == 0 or len(pred_toks) == 0:
    # If either is no-answer, then F1 is 1 if they agree, 0 otherwise
    return int(gold_toks == pred_toks)
  if num_same == 0:
    return 0
  precision = 1.0 * num_same / len(pred_toks)
  recall = 1.0 * num_same / len(gold_toks)
  f1 = (2 * precision * recall) / (precision + recall)
  return f1
def find_answer_pred(result):
  answer_pred = -1
  answer_tag = "### ANSWER ###"
  start_idx = result.find(answer_tag)
  if start_idx == -1:
      print("ANSWER section not found.")
      return answer_pred
  else:
    start_idx += len(answer_tag)
    end_idx = result.find("###", start_idx)
    if end_idx == -1:
        answer_section = result[start_idx:].strip()
    else:
        answer_section = result[start_idx:end_idx].strip()
    answer_pred = answer_section.split('\n')[0]
  return answer_pred

In [None]:
from langchain.schema.runnable import RunnablePassthrough
from datasets import load_dataset
import numpy as np
from langchain.chains import RetrievalQA
from langchain.docstore.document import Document
dataset = load_dataset('trivia_qa','unfiltered')
test_data = dataset['validation']

In [None]:
import sys
import re
from langchain_community.retrievers import BM25Retriever
exact_scores=[]
f1_scores=0
i=1
for item in test_data:
    text_groups=[]
    question = item["question"]
    try:
      context = item['search_results']['search_context'][0]
    except IndexError:
      continue
    answer_list = item["answer"]['aliases']
    lines = context.split('\n')
    for j in range(0,len(lines),4):
      text_groups.append('. '.join(lines[j:j+4]))
    text_groups = np.array(text_groups)
    doc = [Document(page_content=text, metadata={"source": "local"}) for text in text_groups]
    db = FAISS.from_documents(doc, embeddings)
    faiss_retriever = db.as_retriever(
                            search_type="similarity",
                            search_kwargs={'k': 3}
                            )

    qa_chain = RetrievalQA.from_chain_type(
        llm=llm_pipeline,
        chain_type="stuff",
        retriever=faiss_retriever,
        return_source_documents=True,
        chain_type_kwargs={"prompt": prompt}
    )
    result = qa_chain.invoke(question)
    answer_pred = find_answer_pred(result['result'])
    f1_score=-1
    for answer in answer_list:
      score = compute_f1(answer,answer_pred)
      if score>f1_score:
        print(f"{answer} = {answer_pred} : {score}")
        f1_score=score
    if f1_score!=-1:
      i=i+1
      f1_scores+=f1_score
    else:
      i=i-1
      if i==0:
        i=1
    print(i)
    print(f"정답 : {answer_list}")
    print(f"예측 : {answer_pred}")
    print(f1_score)
    print(f1_scores/i)


In [None]:
from langchain.schema.runnable import RunnablePassthrough
from datasets import load_dataset
import numpy as np
from langchain.chains import RetrievalQA
from langchain.docstore.document import Document
dataset = load_dataset('squad',split='validation')
exact_scores=[]
f1_scores=0
i=1

## squad 데이터셋 갯수 만큼 question에 맞는 answer를 생성하기 위한 for문
for item in dataset:

    question = item["question"]
    answer = item["answers"]["text"][0]
    texts = item['context'].split('.')
    doc =  [Document(page_content=text, metadata={"source": "local"}) for text in texts]
    db = FAISS.from_documents(doc, embeddings)
    faiss_retriever = db.as_retriever(
                            search_type="similarity",
                            search_kwargs={'k': 2}
                            )
    bm25_retriever = BM25Retriever.from_texts(item['context'],metadatas=[{'source': 'local'}])
    bm25_retriever.k = 2
    ensemble_retriever = EnsembleRetriever(
        retrievers=[bm25_retriever, faiss_retriever],weigths=[0.5,0.5]
    )

    qa_chain = RetrievalQA.from_chain_type(
        llm=llm_pipeline,
        chain_type="stuff",
        retriever=faiss_retriever,
        return_source_documents=True,
        chain_type_kwargs={"prompt": prompt}
    )

    result = qa_chain.invoke(question)
    ## prompt 설정한거 전부 버리고 정답만을 출력하기 위한 코드 -> prompt에 맞게 출력한것을 보고 싶으면 >> print(result)
    answer_pred = find_answer_pred(result['result'])

    ## EM Score/F1 Score 계산
    exact_score = compute_exact(answer,answer_pred)
    f1_score = compute_f1(answer,answer_pred)
    if f1_score!=-1:
      f1_scores+=f1_score
    else:
      i=i-1
      if i==0:
        i=1

    print(i)
    print(f"정답 : {answer}")
    print(f"예측 : {answer_pred}")
    print(f1_score)
    print(f1_scores/i)

