<a href="https://colab.research.google.com/github/Kimhansav/everynocode_search_engine/blob/main/BP_judge_answer_zeroshot_pongjin_roberta_with_kornli.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#한국 버블 커뮤니티 오픈톡방 대화의 질문에 대한 답변을 선별하는 코드(zero-shot text classification)(pongjin/roberta_with_kornli)(로컬)

In [None]:
!pip install datasets
import random
import pandas as pd
import numpy as np
import re
import os
import tensorflow as tf
import urllib.request
import torch
from abc import ABC, abstractmethod
from IPython.display import display, HTML
from tqdm import tqdm
from transformers import shape_list, BertTokenizer, TFBertModel
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.preprocessing.sequence import pad_sequences
from transformers import RobertaTokenizerFast, RobertaForSequenceClassification, TextClassificationPipeline, pipeline, AutoTokenizer, Trainer, TrainingArguments
from datasets import Dataset, load_dataset, ClassLabel
from sklearn.model_selection import StratifiedKFold, train_test_split
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#모델 불러오기
HUGGINGFACE_MODEL_PATH = "pongjin/roberta_with_kornli"
tokenizer = AutoTokenizer.from_pretrained("Huffon/klue-roberta-base-nli")

In [None]:
#RoBERTa와 같이 token_type_ids를 사용하지 않는 모델의 경우, zero-shot pipeline을 바로 적용할 수 없습니다(transformers==4.7.0 기준)
#따라서 다음과 같이 변환하는 코드를 넣어줘야 합니다. 해당 코드 또한 위 깃허브의 코드를 수정하였습니다.
class ArgumentHandler(ABC):
    """
    Base interface for handling arguments for each :class:`~transformers.pipelines.Pipeline`.
    """

    @abstractmethod
    def __call__(self, *args, **kwargs):
        raise NotImplementedError()


class CustomZeroShotClassificationArgumentHandler(ArgumentHandler):
    """
    Handles arguments for zero-shot for text classification by turning each possible label into an NLI
    premise/hypothesis pair.
    """

    def _parse_labels(self, labels):
        if isinstance(labels, str):
            labels = [label.strip() for label in labels.split(",")]
        return labels

    def __call__(self, sequences, labels, hypothesis_template):
        if len(labels) == 0 or len(sequences) == 0:
            raise ValueError("You must include at least one label and at least one sequence.")
        if hypothesis_template.format(labels[0]) == hypothesis_template:
            raise ValueError(
                (
                    'The provided hypothesis_template "{}" was not able to be formatted with the target labels. '
                    "Make sure the passed template includes formatting syntax such as {{}} where the label should go."
                ).format(hypothesis_template)
            )

        if isinstance(sequences, str):
            sequences = [sequences]
        labels = self._parse_labels(labels)

        sequence_pairs = []
        for label in labels:
            # 수정부: 두 문장을 페어로 입력했을 때, `token_type_ids`가 자동으로 붙는 문제를 방지하기 위해 미리 두 문장을 `sep_token` 기준으로 이어주도록 함
            sequence_pairs.append(f"{sequences} {tokenizer.sep_token} {hypothesis_template.format(label)}")

        return sequence_pairs, sequences


In [None]:
#카카오톡 데이터 불러오기
#trained 버전으로 수정 필요
file_path = '/content/drive/My Drive/judge_question_result_short.xlsx'

In [None]:
#카카오톡 대화내용을 데이터프레임으로 받기
df = pd.read_excel(file_path)

#질문 딕셔너리, 답변 목록 리스트(이중 리스트) 생성
questions, answer_lists = {}, []

#질문으로 판별된 텍스트를 새 데이터프레임으로 생성
df_question = df[df['label'] == 'question']

print(df_question)
#새 데이터프레임의 index, text를 question 딕셔너리에 저장
questions = {text : index for (index, text) in zip(df_question.index, df_question['text'])}
print(questions)

In [None]:
# Ensure the classifier is using GPU
device = 0 if torch.cuda.is_available() else -1  # Use GPU if available, otherwise CPU
print(device)

In [None]:
#Classifier 정의
classifier = pipeline(
    "zero-shot-classification",
    args_parser=CustomZeroShotClassificationArgumentHandler(),
    model=HUGGINGFACE_MODEL_PATH,
    device = device,
    batch_size = 128
)

답변으로 쓰였던 질문이 다음에 또 질문으로써 등장하는 경우가 많이 존재함 -> 기존에 계획했던 질문 속 질문 문제에 해당한다.

질문 속 질문에 대해서 judge_question 단계에서는 높은 성능을 기대할 수가 없음. 질문 사이에 어떤 대화를 주고받았는지 모델이 알 수 없기 때문이다.

그렇기 때문에 judge_answer 이후에 한번 더 데이터 가공을 해야 한다.
A라는 질문에 대해서 답변을 선별할 때, B라는 질문이 답변으로 쓰였으면 다음 질문들 중에서 B가 질문으로 존재하는 질문-답변 쌍을 삭제해야 함.

zero-shot text classification에서 A, B라는 질문이 있다. 그리고 a, b, c, d라는 답변이 있다.
A에 어울리는 답변은 a이고, B에 어울리는 답변은 b, c, d이다.
근데 zero-shot 모델을 실행한 결과 A에 a, b, c, d가 연결되고 B에 b, c, d가 연결되었다.

이와 같은 경우 답변 목록을 딕셔너리로 만든 뒤 질문에 대한 답변으로 포함된 경우 True로 저장되고, 이후에는 답변 후보로 고려하지 않도록 한다면
A에서 이미 b, c, d가 True로 저장되고 B의 답변을 선별할 때 b, c, d가 답변 후보로 고려되지 않는 문제가 발생한다.
이는 어떻게 해결해야 하는가?

-> 답변으로 포함되는 기준값을 정하면 된다. 긍정을 의미하는 레이블 값이 0.7 이상이어야 레이블에 추가한다든가 등등
근데 이건 공식 데이터로 직접 실험해봐야 알 것 같다.

In [None]:
# #기존 알고리즘
# #전체 질문에 대해서 각 질문 이후 30개의 텍스트에 대해서 판별, 이때 특정 텍스트가 답변이 맞으면 답변 리스트에 추가한 뒤 다음 텍스트 판별

# #실험을 위한 하이퍼파라미터 설정
# #후보군으로 삼을 텍스트 개수 범위
# text_range = 20
# #답변 목록에 추가할지 기준이 되는 레이블값
# #질문-질문 비교기준
# qstandard = 0.63
# #질문-답변 비교기준
# astandard = 0.6

# # Pre-fetch the necessary DataFrame data to minimize access within loops
# question_texts = {question: df.iloc[questions[question] + 1 : questions[question] + range + 1]['text'].tolist() for question in questions.keys()}

# #질문 속 질문(중복 질문)을 제거하기 위해 사용할 딕셔너리와 리스트 초기화
# used_question_list = []
# #질문 속 질문인지 판별할 때 사용할 불리언
# in_question_texts = False
# #답변이 사용된 횟수를 셀 때 사용할 딕셔너리
# answer_used_count = {}


# answer_lists = []
# i = 0
# for question, texts in tqdm(question_texts.items(), desc="Processing questions"):
#     answer = []
#     for text in texts: #나중에 숫자 커지면 texts 대신 tqdm(texts, desc="Classifying texts", leave=False)
#         #불리언 설정
#         in_question_texts = True if text in question_texts else False

#         sequence_to_classify = question + ' '.join(answer) + '[END]' + text if in_question_texts == False else question + '[END]' + text

#         #동적 레이블
#         candidate_labels = ["질문-답변 쌍이다", "질문-답변 쌍이 아니다"] if in_question_texts == False else ["거의 같은 주제의 질문이다", "거의 같은 주제의 질문이 아니다"]

#         # Process the classification in one step
#         #동적 가설 적용
#         output = classifier(sequence_to_classify, candidate_labels, hypothesis_template = '[END]를 기준으로 두 글을 구분했을 때 {}.' if in_question_texts == False else '[END]를 기준으로 두 질문을 구분했을 때 {}.', multi_label=False)
#         expected_label = "질문-답변 쌍이다" if in_question_texts == False else "거의 같은 주제의 질문이다"
#         #경우에 따른 기준값 변화 적용
#         if (in_question_texts == True and output['labels'][0] == expected_label and output['scores'][0] > qstandard) or (in_question_texts == False and output['labels'][0] == expected_label and output['scores'][0] > astandard):
#           answer.append(text)
#           #답변이 사용된 횟수 카운트
#           if not text in answer_used_count:
#             answer_used_count[text] = 1
#           else:
#             answer_used_count[text] += 1
#           #질문 속 질문인지 기록하기 위한 조건문
#           if in_question_texts == True:
#               used_question_list.append(text)

#     answer_lists.append(answer) if len(answer) != 0 else answer_lists.append(['No answer'])

In [None]:
# #답변 사용된 개수 딕셔너리 사용
# for answer in answer_used_count.keys():
#   if answer_used_count[answer] > 1:


In [None]:
# # #데이터프레임 가공
# # df_question.rename(columns = {'text' : 'question'})
# # df_question['answer'] = answer_lists

# data = {question : answer for question, answer in zip(questions.keys(), answer_lists)}
# print(data)
# data_result= [{'question' : key, 'answer' : value} for key, value in data.items()]
# df_result = pd.DataFrame(data = data_result, columns = ['question', 'answer'])
# print(df_result)

In [None]:
# #질문 속 질문(중복 질문)인 행은 삭제
# #제거할 텍스트 목록
# used_question_set = set(used_question_list)

# # 데이터프레임에서 목록에 있는 텍스트를 포함하는 행 제거
# df_result_filtered = df_result[~df_result['question'].isin(used_question_set)]

# print(df_result_filtered)

In [None]:
#중복 질문 제거를 동시에 수행하기 위한 새로운 알고리즘

#실험을 위한 하이퍼파라미터 설정
#후보군으로 삼을 텍스트 개수 범위
text_range = 20
#답변 목록에 추가할지 기준이 되는 레이블값
#질문-질문 비교기준
qstandard = 0.6
#질문-답변 비교기준
astandard = 0.6

#질문-답변 쌍 딕셔너리 생성
qa_pair_dictionary = {index : {'질문' : question, '답변' : []} for index, question in zip(df_question.index, df_question['text'])}

#질문 속 질문인지 판별할 때 사용할 불리언
in_question_texts = False

all_texts = {index : item for index, item in zip(df.index, df['text'])}

for index, item in tqdm(all_texts.items(), desc = 'Processing Answer to Question'):

  candidate_qa_list = [] #현재 텍스트의 소속을 판정할 (질문-답변 딕셔너리) 리스트
  candidate_qa_index_list = []
  in_question_texts = True if df.iloc[index]['label'] == 'question' else False #판별할 텍스트가 질문인지 검사

  start = 0 if index < 20 else index - text_range - 1 #인덱스가 20 미만일 경우 검사 범위 조정
  for i in range(start, index): #현재 텍스트가 소속될 질문의 범위
    candidate = qa_pair_dictionary.get(i, None) #qa_pair_dictionary에서 i 인덱스에 해당하는 질답 딕셔너리 가져오기
    if candidate != None:
      candidate_qa_list.append(candidate) #결과 리스트에서 최대 확률인 질문을 인덱싱하기 위해 인덱스를 포함한 딕셔너리를 append
      candidate_qa_index_list.append(i)

  if len(candidate_qa_list) == 0:
    continue

  #데이터 배치처리
  sequence_to_classify = [qa_dict['질문'] + ' '.join(qa_dict['답변']) + '[END]' + item for qa_dict in candidate_qa_list] #[qa_dict['질문'] + '[END]' + item for qa_dict in candidate_qa_list] if in_question_texts == True else [qa_dict['질문'] + ' '.join(qa_dict['답변']) + '[END]' + item for qa_dict in candidate_qa_list]

  #동적 레이블
  candidate_labels = ["뒤의 질문은 앞의 글과 거의 같은 주제이다", "뒤의 질문은 앞의 글과 거의 같은 주제가 아니다"] if in_question_texts == True else ["질문-답변 쌍이다", "질문-답변 쌍이 아니다"] #"거의 같은 주제의 질문이다", "거의 같은 주제의 질문이 아니다"

  output = classifier(sequence_to_classify, candidate_labels, hypothesis_template = '[END]를 기준으로 두 질문을 구분했을 때 {}.' if in_question_texts == True else '[END]를 기준으로 두 글을 구분했을 때 {}.', multi_label=False)
  expected_label = "질문-답변 쌍이다" if in_question_texts == False else "뒤의 질문은 앞의 글과 거의 같은 주제이다"

  #output 딕셔너리에 질문 인덱스를 추가
  for i in range(len(output)):
    output[i]['index'] = candidate_qa_index_list[i]

  # 'scores'의 첫 번째 값(expected label일 확률)에 따라 내림차순으로 정렬
  sorted_output = sorted(output, key = lambda x : x['scores'][0], reverse=True)

  output = sorted_output[0] #텍스트가 소속될 질문

  #경우에 따른 기준값 변화 적용
  if (in_question_texts == True and output['labels'][0] == expected_label and output['scores'][0] > qstandard) or (in_question_texts == False and output['labels'][0] == expected_label and output['scores'][0] > astandard):
    #모순이 발생하지 않기 위해서 질문 속 질문으로 판별된 경우 즉시 qa_pair_dictionary에서 해당 질문을 삭제해야 함.
    if in_question_texts == True:
        del qa_pair_dictionary[index]

    qa_pair_dictionary[output['index']]['답변'].append(item)


  # print('\n',qa_pair_dictionary)
  # print('\n',used_question_index)


Processing Answer to Question: 100%|██████████| 7857/7857 [25:36<00:00,  5.11it/s]


In [None]:
# print(used_question_index)
# for index in used_question_index:
#   print(qa_pair_dictionary[index])

In [None]:
# print(qa_pair_dictionary.values())

In [None]:
data_result= [item for item in qa_pair_dictionary.values()]

#결과 데이터를 데이터프레임으로 변환
df_result = pd.DataFrame(data = data_result, columns = ['질문', '답변'])

In [None]:
df_result = df_result.rename(columns = {'질문' : 'question', '답변' : 'answer'})

In [None]:
# print(data_result)
# print(df_result)

In [None]:
# #score 테스트용 코드

# #실험을 위한 하이퍼파라미터 설정
# #후보군으로 삼을 텍스트 개수 범위
# range = 10
# #답변 목록에 추가할지 기준이 되는 레이블값
# #질문-질문 비교기준
# qstandard = 0.63
# #질문-답변 비교기준
# astandard = 0.6

# # Pre-fetch the necessary DataFrame data to minimize access within loops
# question_texts = {question: df.iloc[questions[question] + 1 : questions[question] + range + 1]['text'].tolist() for question in questions.keys()}


# question = 'deltaPercent의 절대값이 큰 값부터 데이터를 정렬하고 싶은데.deltaPercent가 양수, 음수 모두 존재할 때 절대값으로 바꾸는 방법 아시는 분 있으실까요? 예를 들어 deltaPercent 값이 -1, -3, 2, 5 이렇게 존재할 때, 이를 모두 절대값으로 바꾼 후 5, 3, 2, 1 순서로 정렬하려고 합니다!익스프레션 내에서 조건식을 사용할 수가 없어서 condition을 사용하는 수밖에 없겠는데요.. 쩝\n condition으로 0보다 작으면 -1 곱하면 되겠습니다 유저의 input url에 따른 rss 피드 긁어오기를 하고 싶은데 헷갈리는 부분이 있어 질문드립니다!'
# text =  '혹시 버블 워크로드 최적화하는 팁이 있을까요? 아직 레거시 요금제 쓰고 있긴 한데, 곧 요금제를 업그레이드 안 하면 감당이 안 될 거 같은데요. 계산해보니 예상 요금이 월 400만원이 나오게 생겼네요..ㅠㅠ'
# sequence_to_classify = question + '[END]' + text
# in_question_texts = True if text in question_texts else False
# candidate_labels = ["질문-답변 쌍이다", "질문-답변 쌍이 아니다"] if in_question_texts == False else ["거의 같은 주제의 질문이다", "거의 같은 주제의 질문이 아니다"]
# output = classifier(sequence_to_classify, candidate_labels, hypothesis_template = '[END]를 기준으로 두 글을 구분했을 때 {}.' if in_question_texts == False else '[END]를 기준으로 두 질문을 구분했을 때 {}.', multi_label=False)
# print(output)

In [None]:
#.csv 파일로 google drive에 저장
save_path = f'/content/drive/My Drive/judge_answer_result_short_pongjin_roberta_with_kornli_range{text_range}_qstandard{qstandard}_astandard{astandard}_같은주제질문.xlsx'

df_result.to_excel(save_path)