1. 환경설정


In [1]:
# 필요한 모듈 임포트
from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer
from datasets import load_dataset, Dataset
import torch



2. Teacher 모델 로드

사전 학습된 모델(T5)을 Teacher 모델로 사용합니다.

In [2]:
# T5 모델과 토크나이저 로드
teacher_model_name = "t5-small"
teacher_tokenizer = AutoTokenizer.from_pretrained(teacher_model_name)
teacher_model = AutoModelForSeq2SeqLM.from_pretrained(teacher_model_name)

# Teacher 모델을 활용한 질답 파이프라인 생성
qa_pipeline = pipeline("text2text-generation", model=teacher_model, tokenizer=teacher_tokenizer)


Hardware accelerator e.g. GPU is available in the environment, but no `device` argument is passed to the `Pipeline` object. Model will be on CPU.


PDF에서 text로 변환하는 함수

In [21]:
import PyPDF2
import re

def pdf_to_text(pdf_path, skip_start_pages=0, skip_last_pages=0, header_lines=1, footer_lines=1):
    with open(pdf_path, 'rb') as pdf_file:
        pdf_reader = PyPDF2.PdfReader(pdf_file)
        text = ""
        num_pages = len(pdf_reader.pages)
        
        print(f"Total pages in PDF: {num_pages}")

        # Adjust the range to process the correct pages
        start_page = skip_start_pages
        end_page = num_pages - skip_last_pages

        for page_num in range(start_page, end_page):
            page = pdf_reader.pages[page_num]
            page_text = page.extract_text()
            
            if page_text:
                print(f"Page {page_num + 1}: {len(page_text)} characters extracted")
                lines = page_text.splitlines(True)[header_lines:-footer_lines]
                text += "".join(lines)
            else:
                print(f"Page {page_num + 1} is empty or could not be read")
        
        return text


# Define the input and output file paths
pdf_file_path = "/home/kkwon/AHN/paper_ft/datas/3362743.3362963.pdf"
output_file_path = "/home/kkwon/AHN/paper_ft/datas/paper1.txt"

# Extract text from the PDF file
# Adjust skip_start_pages and skip_last_pages as needed
raw_text = pdf_to_text(pdf_file_path, skip_start_pages=0, skip_last_pages=0, header_lines=2, footer_lines=1)

# Save the extracted text to a text file
with open(output_file_path, 'w', encoding='utf-8') as f:
    f.write(raw_text)

print(f"Text extraction complete. Total characters extracted: {len(raw_text)}")


In [3]:
#We use the Bi-Encoder to encode all passages, so that we can use it with semantic search
bi_encoder = SentenceTransformer('multi-qa-MiniLM-L6-cos-v1')
bi_encoder.max_seq_length = 256     #Truncate long passages to 256 tokens
top_k = 32                          #Number of passages we want to retrieve with the bi-encoder

#The bi-encoder will retrieve 100 documents. We use a cross-encoder, to re-rank the results list to improve the quality
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

# As dataset, we use Simple English Wikipedia. Compared to the full English wikipedia, it has only
# about 170k articles. We split these articles into paragraphs and encode them with the bi-encoder

data_filepath = "/home/kkwon/AHN/paper_ft/datas/structured_paper_with_paragraphs.jsonl.gz"

# if not os.path.exists(wikipedia_filepath):
#     util.http_get('http://sbert.net/datasets/simplewiki-2020-11-01.jsonl.gz', wikipedia_filepath)

passages = []
with gzip.open(data_filepath, 'rt', encoding='utf8') as fIn:
    for line in fIn:
        data = json.loads(line.strip())

        #Add all paragraphs
        #passages.extend(data['paragraphs'])

        #Only add the first paragraph
        passages.append(data['paragraphs'][0])

print("Passages:", len(passages))

# We encode all passages into our vector space. This takes about 5 minutes (depends on your GPU speed)
corpus_embeddings = bi_encoder.encode(passages, convert_to_tensor=True, show_progress_bar=True)

Passages: 6


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

3. Unlabeled 데이터 준비

라벨이 없는 텍스트 데이터를 불러옵니다.

In [7]:
# 예시: Hugging Face에서 논문 데이터셋 로드 (PubMed 데이터)
dataset = load_dataset(
    "csv",
    data_files="/home/kkwon/AHN/paper_ft/cleaned_paper.csv",
    split="train[:100]",
    encoding="utf-8",  # 파일 인코딩 지정
    delimiter=","      # CSV 구분자 지정
) # 100개 샘플
# 컬럼 이름이 'Text'인 경우
unlabeled_texts = dataset["Text"]  # 기존 'abstract' 대신 'Text' 사용



4. Pseudo-Label 생성

Teacher 모델을 사용해 질문-답변 쌍을 생성합니다.

In [8]:
def generate_qa_pairs(text):
    # 중요한 문장을 기반으로 질문 생성
    question_prompt = f"Generate a question from this text: {text}"
    question = qa_pipeline(question_prompt, max_length=50, num_return_sequences=1)[0]["generated_text"]

    # 질문에 대한 답변 생성
    answer_prompt = f"Answer this question based on the text: {question} Text: {text}"
    answer = qa_pipeline(answer_prompt, max_length=50, num_return_sequences=1)[0]["generated_text"]

    return {"context": text, "question": question, "answer": answer}

# Unlabeled 데이터에서 Pseudo-label 생성
qa_dataset = [generate_qa_pairs(text) for text in unlabeled_texts[:10]]  # 10개 샘플 처리


5. Student 모델 학습

생성된 Pseudo-label 데이터셋으로 Student 모델 학습을 진행합니다.

In [9]:
# Pseudo-label 데이터셋을 Hugging Face Dataset 객체로 변환
qa_dataset = Dataset.from_list(qa_dataset)

# Student 모델과 토크나이저 준비
student_model_name = "t5-small"
student_tokenizer = AutoTokenizer.from_pretrained(student_model_name)
student_model = AutoModelForSeq2SeqLM.from_pretrained(student_model_name)

# 데이터 전처리: 입력(질문+문맥), 출력(답변)
def preprocess_function(examples):
    inputs = [f"question: {q}  context: {c}" for q, c in zip(examples["question"], examples["context"])]
    targets = examples["answer"]
    model_inputs = student_tokenizer(inputs, max_length=512, truncation=True)
    with student_tokenizer.as_target_tokenizer():
        labels = student_tokenizer(targets, max_length=128, truncation=True)
    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

# 데이터셋 변환
tokenized_dataset = qa_dataset.map(preprocess_function, batched=True)

# PyTorch DataLoader 생성
from torch.utils.data import DataLoader
train_loader = DataLoader(tokenized_dataset, batch_size=4, shuffle=True)


Map:   0%|          | 0/10 [00:00<?, ? examples/s]



6. 모델 학습

Student 모델 학습 코드를 작성합니다.

In [10]:
from transformers import AdamW
from torch.nn import CrossEntropyLoss

# Optimizer 설정
optimizer = AdamW(student_model.parameters(), lr=5e-5)

# 학습 루프
student_model.train()
for epoch in range(3):  # 3번 반복
    for batch in train_loader:
        inputs = {key: val.to("cuda") for key, val in batch.items() if key != "labels"}
        labels = batch["labels"].to("cuda")

        # 모델 예측과 손실 계산
        outputs = student_model(**inputs, labels=labels)
        loss = outputs.loss

        # 역전파 및 가중치 업데이트
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

    print(f"Epoch {epoch + 1} completed with loss: {loss.item()}")




RuntimeError: each element in list of batch should be of equal size

7. 학습된 Student 모델 평가

SQuAD 또는 KorQuAD 같은 QA 데이터셋을 사용하여 성능 평가를 진행합니다.

In [None]:
from datasets import load_metric

# 평가 데이터셋 로드
squad_dataset = load_dataset("squad", split="validation[:100]")  # 100개 샘플 평가

# 평가 루프
student_model.eval()
metric = load_metric("squad")
for sample in squad_dataset:
    input_text = f"question: {sample['question']}  context: {sample['context']}"
    inputs = student_tokenizer(input_text, return_tensors="pt", max_length=512, truncation=True).to("cuda")

    outputs = student_model.generate(**inputs, max_length=50)
    predicted_answer = student_tokenizer.decode(outputs[0], skip_special_tokens=True)

    # Metric 업데이트
    metric.add(prediction=predicted_answer, reference=sample["answers"]["text"][0])

# 결과 출력
print(metric.compute())


8. Self-Distillation 반복

학습된 Student 모델을 Teacher 모델로 사용하여 위 과정을 반복합니다.

In [None]:
# 학습된 Student 모델을 새로운 Teacher로 설정
teacher_model = student_model
qa_pipeline = pipeline("text2text-generation", model=teacher_model, tokenizer=student_tokenizer)
