<a target="_blank" href="https://colab.research.google.com/github/UpstageAI/cookbook/blob/main/cookbooks/upstage/Solar-Full-Stack LLM-101/05_3_OracleDB.ipynb">
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

In [1]:
import os
from langchain.embeddings import OpenAIEmbeddings
from langchain.prompts import PromptTemplate
from langchain.chat_models import ChatOpenAI
from openai import OpenAI # openai==1.52.2

api_key = "up_zsOzpjQ8Ow7NFmiWQPTh2x7P4Y4MQ"
data_path = "/Users/susie/Desktop/Temp_Laptop2/Python_Files/G/24-2/NLP/Team/baseline/"

client = OpenAI(
    api_key="up_zsOzpjQ8Ow7NFmiWQPTh2x7P4Y4MQ",
    base_url="https://api.upstage.ai/v1/solar"
)

In [2]:
PDF_PATH = os.path.join(data_path, 'ewha.pdf')
CSV_PATH = os.path.join(data_path, 'testset.csv')
problems = os.path.join(data_path, "problems.csv") # 추가할 KB
# TABLE_PAGES 필요없음

In [3]:
# STEP 1. KB 구축
# step 1.1 ewha.pdf에서 pdf parsing
import requests
from langchain.schema import Document
from bs4 import BeautifulSoup

def extract_text_or_table(pdf_path):
    api_key = "up_zsOzpjQ8Ow7NFmiWQPTh2x7P4Y4MQ"
    url = "https://api.upstage.ai/v1/document-ai/document-parse"
    headers = {"Authorization": f"Bearer {api_key}"}
    documents = []

    with open(pdf_path, "rb") as file:
        response = requests.post(url, headers=headers, files={"document": file})

    if response.status_code == 200:
        data = response.json()
        html_content = data.get("content", {}).get("html", "")
        if not html_content:
            print("Error: No HTML content found in API response.")
            return []

        soup = BeautifulSoup(html_content, "html.parser")

        categories = {
            "table": "table",
            "figure": "figure",
            "chart": "img[data-category='chart']",
            "heading1": "h1",
            "header": "header",
            "footer": "footer",
            "caption": "caption",
            "paragraph": "p[data-category='paragraph']",
            "equation": "p[data-category='equation']",
            "list": "p[data-category='list']",
            "index": "p[data-category='index']",
            "footnote": "p[data-category='footnote']"
        }

        for category, selector in categories.items():
            elements = soup.select(selector)
            for element in elements:
                content = element.get_text(strip=True)
                metadata = {"category": category, "html": str(element)}
                documents.append(Document(page_content=content, metadata=metadata))

        if not documents:
            print("No sections were extracted.")
        return documents
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return []

# step 1.2 pdf parsing한 것을 cleaning text 
import re

def clean_extracted_text(text):
    # 문장 중간의 줄바꿈 제거
    cleaned_text = re.sub(r'(?<=[a-z,])\n(?=[a-z])', ' ', text)
    # 문장 끝 줄바꿈 유지
    cleaned_text = re.sub(r'(?<=[.?!])\s*\n', '\n', cleaned_text)
    
    return cleaned_text

# step 1.3 추가 KB, KB 통합
import csv

def load_problem_data(csv_path):
    problems = []
    with open(csv_path, "r", encoding="utf-8") as file:
        reader = csv.reader(file)
        for row in reader:
            # 모든 데이터를 하나의 context 필드로 통합
            context = " ".join(row)
            problems.append({"type": "problem", "content": context})
    return problems

def combine_kb(documents, problems):
    combined_kb = [{"type": "pdf", "content": doc} for doc in documents]
    combined_kb += [{"type": "problem", "content": problem["content"]} for problem in problems]
    return combined_kb

def ensure_text_format(kb):
    if isinstance(kb, list):
        processed_kb = []
        for item in kb:
            if isinstance(item, dict):
                if "content" in item:
                    content = item["content"]
                    if isinstance(content, dict) and "page_content" in content:
                        processed_kb.append([content["page_content"]])
                    elif isinstance(content, str):
                        processed_kb.append([content])
            elif isinstance(item, str):
                processed_kb.append([item])
        return processed_kb
    else:
        raise ValueError("KB should be a list.")

# step 1.4 전체 KB 임베딩하고 <Embedding 모델> 검색할 수 있게 <VectorStore 모델> 함 - 토큰 너무 많아지는 것을 방지하기 위해 split
import numpy as np
from openai import OpenAI
from langchain.vectorstores import FAISS
from langchain.schema import Document

def split_text(text, max_length=1000):
    return [text[i:i + max_length] for i in range(0, len(text), max_length)]

def preprocess_texts(texts):
    cleaned_texts = []
    for text in texts:
        if not isinstance(text, str):
            text = str(text)
        if len(text.strip()) > 0:
            cleaned_texts.append(text.strip())
    return cleaned_texts

class UpstageEmbeddings:
    def __init__(self, client, model="embedding-query"):
        self.client = client
        self.model = model

    def embed_documents(self, texts):
        embeddings = []
        for text in texts:
            try:
                response = self.client.embeddings.create(
                    model=self.model,
                    input=text
                )
                embeddings.append(response.data[0].embedding)
            except Exception as e:
                print(f"Error generating embedding for text: {text[:50]}... \n{e}")
        return embeddings

    def embed_query(self, text):
        try:
            response = self.client.embeddings.create(
                model=self.model,
                input=text
            )
            return response.data[0].embedding
        except Exception as e:
            print(f"Error generating embedding for query: {text[:50]}... \n{e}")
            return None

def create_vector_store(kb_list):
    documents = []
    for entry in kb_list:
        for text in entry:
            processed_texts = preprocess_texts([text])
            for processed_text in processed_texts:
                chunks = split_text(processed_text, max_length=1000)
                documents.extend([Document(page_content=chunk) for chunk in chunks])

    embeddings = UpstageEmbeddings(client=client)

    vector_store = FAISS.from_documents(documents, embeddings)
    return vector_store

# STEP 2. 모델링 <llm 모델> <QA 방식> - 언어 모델, 모델링 방식, 프롬프팅 준비
from openai import OpenAI

def create_qa_chain(vector_store, model, prompt_template):
    client = OpenAI(
        api_key="up_zsOzpjQ8Ow7NFmiWQPTh2x7P4Y4MQ",
        base_url="https://api.upstage.ai/v1/solar"
    )

    retriever = {"embedding": vector_store}
    
    def qa_model(prompt):
        response = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "user", "content": prompt}
            ]
        )
        return response.choices[0].message.content
    
    return {"llm": qa_model, "retriever": retriever, "prompt": prompt_template}

# STEP 3. samples에서 질문과 정답 가져오기
import pandas as pd

def read_data(csv_path):
    data = pd.read_csv(csv_path)
    return data['prompts'].tolist(), data['answers'].tolist()

import re

def extract_answer(text):
    match = re.search(r"\([A-Z]\)", text)
    return match.group(0) if match else "N/A"

# STEP 4. 답변 생성
import wikipediaapi
import re

wiki_wiki_ko = wikipediaapi.Wikipedia(language='ko', user_agent='NLP_Team/1.0 (ewhanthbeot@ewhain.net)')
wiki_wiki_en = wikipediaapi.Wikipedia(language='en', user_agent='NLP_Team/1.0 (ewhanthbeot@ewhain.net)')

# Retrieve에서 관련성 높은 것만 필터링.
def filter_context_by_relevance(question, context, threshold=0.5):
    question_embedding = client.embeddings.create(
        model="embedding-query",
        input=question
    ).data[0].embedding

    context_embedding = client.embeddings.create(
        model="embedding-query",
        input=context
    ).data[0].embedding

    # Compute similarity (cosine similarity)
    similarity = np.dot(question_embedding, context_embedding) / (
        np.linalg.norm(question_embedding) * np.linalg.norm(context_embedding)
    )

    if similarity >= threshold:
        return context
    else:
        return None

# KB에서 검색. 관련성 높은 것만 필터링. (검색 시 질문에서 필요없는 것을 삭제하면 오히려 성능이 낮아짐.)
def retrieve_context(question, vector_store):
    try:
        question_embedding = client.embeddings.create(
            model="embedding-query",
            input=question
        ).data[0].embedding

        results = vector_store.similarity_search_by_vector(question_embedding, k=3)
        return "\n\n".join([doc.page_content for doc in results])
    except Exception as e:
        print(f"Error during retrieval: {e}")
        return "No relevant context found."

def retrieve_context_with_filter(question, vector_store):
    raw_context = retrieve_context(question, vector_store)
    if not raw_context.strip():
        return "No relevant context found."

    filtered_context = filter_context_by_relevance(question, raw_context)
    if not filtered_context:
        print("검색된 문서가 질문과 관련이 없어 제외되었습니다.")
        return "No relevant context found."

    return filtered_context

# Wiki에서 검색. 관련성 높은 것만 필터링. 검색 시 질문에서 필요없는 건 삭제.
def extract_high_quality_keywords(query):
    keywords = query.split()
    stopwords = {"question", "which", "of", "the", "following", "describes", "a", "key", "in", "that", "is", "to", "as", "early", "as"}
    high_quality_keywords = [word.strip().lower() for word in keywords if word.lower() not in stopwords and len(word) > 3]
    return high_quality_keywords

def fetch_from_wikipedia(query, language, max_keywords=5):
    wiki_api = wiki_wiki_ko if language == 'ko' else wiki_wiki_en
    high_quality_keywords = extract_high_quality_keywords(query)
    documents = []

    for keyword in high_quality_keywords[:max_keywords]:
        print(f"Searching Wikipedia for keyword: {keyword}")
        try:
            search_results = wiki_api.search(keyword, results=1)
            if search_results:
                page = wiki_api.page(search_results[0])
                if page.exists():
                    summary = page.summary[:500]  # 요약을 500자로 제한
                    # `Document` 객체로 변환
                    documents.append(Document(page_content=summary, metadata={"source": "Wikipedia", "title": page.title}))
        except Exception as e:
            print(f"Error fetching Wikipedia page for keyword '{keyword}': {e}")
    
    if documents:
        return documents  # `Document` 객체 리스트 반환
    else:
        print("No relevant Wikipedia page found.")
        return []

def add_to_vector_store(documents, vector_store):
    for doc in documents:
        vector_store.add_documents([doc])

# llm 호출. 언어 감지 후 호출.
def detect_language(text):
    if re.search(r'[가-힣]', text):
        return 'ko'
    elif re.search(r'[a-zA-Z]', text):
        return 'en'
    else:
        return 'unknown'

def run_llm_with_retry(question, combined_context, qa_chain, max_retries=5):
    language = detect_language(question)
    keywords = extract_high_quality_keywords(question)  # Only high-quality keywords
    retries = 0

    while retries < max_retries:
        prompt = qa_chain["prompt"].format(context=combined_context, question=question)
        result = qa_chain["llm"](prompt)
        predicted_answer = extract_answer(result)

        if predicted_answer != "N/A":
            return predicted_answer

        print(f"답변이 N/A로 표시됨: 다시 확인 중...")

        if keywords:
            keyword = keywords.pop(0)
            wiki_context = fetch_from_wikipedia(keyword, language)
            combined_context = f"PDF Context:\n{combined_context}\n\nWikipedia Context:\n{wiki_context or 'N/A'}"
        else:
            break

        retries += 1

    print("최대 재시도 횟수에 도달했습니다.")
    return "N/A"

def normalize_answer(answer):
    if answer.startswith("(") and answer.endswith(")"):
        return answer[1:-1]
    return answer.strip()

# Main 실행

In [4]:
# STEP 1. KB 구축 (ewha.pdf와 문제 데이터 로드 및 VectorStore 생성)
documents = extract_text_or_table(PDF_PATH)
cleaned_documents = [
    {"page_content": clean_extracted_text(doc.page_content), "metadata": doc.metadata}
    for doc in documents
    if isinstance(doc.page_content, str)
]

In [5]:
problems = load_problem_data(problems)

kb = combine_kb(cleaned_documents, problems)
list_kb = ensure_text_format(kb)
vector_store = create_vector_store(list_kb)

`embedding_function` is expected to be an Embeddings object, support for passing in a function will soon be removed.


In [6]:
# STEP 2. 모델링 <llm 모델> <QA 방식> - 언어 모델, 모델링 방식, 프롬프팅 준비
model = "solar-pro"
custom_prompt = """
다음 문서에서 이유를 찾고 질문에 답하세요: \n
{context}\n
1. Tables in the document follow a hierarchical structure:
- **Example**: 
    ```
    [['대학', '학부/학과/전공', '입학정원'], 
    ['사범대학', '교육학과\n유아교육과...', '27\n29...']]
    ```
- This indicates:
    - A university (e.g., 사범대학) contains colleges (e.g., 교육학과).
    - A college can further have departments or majors (e.g., 특수교육과 includes 유아특수교육전공).
\n
질문: {question}\n
답변은 선택지에서 선택하세요."""
qa_chain = create_qa_chain(vector_store, model, custom_prompt)

In [7]:
# STEP 3. samples에서 질문과 정답 가져오기 - csv 파일에서 -> list로 변경
prompts, answers = read_data(CSV_PATH)

In [None]:
correct = 0
results = []

for i, question in enumerate(prompts):
    print(f"질문 {i + 1}: {question}")

    pdf_context = retrieve_context_with_filter(question, vector_store)

    if not pdf_context.strip() or pdf_context == "No relevant context found.":
        print("PDF에서 관련 정보를 찾을 수 없습니다. Wikipedia를 검색합니다.")
        language = detect_language(question)
        wiki_docs = fetch_from_wikipedia(question, language)
        
        if wiki_docs:
            add_to_vector_store(wiki_docs, vector_store)
            wiki_context = "\n\n".join([doc.page_content for doc in wiki_docs])
        else:
            wiki_context = "N/A"
    else:
        wiki_context = None

    combined_context = f"PDF Context:\n{pdf_context}\n\nWikipedia Context:\n{wiki_context or 'N/A'}"
    predicted_answer = run_llm_with_retry(question, combined_context, qa_chain)

    normalized_predicted = normalize_answer(predicted_answer)
    normalized_actual = normalize_answer(answers[i])

    result_data = {
        "문제": question,
        "선택지": prompts[i],
        "예상 정답": normalized_predicted,
        "실제 정답": normalized_actual,
        "PDF Context": pdf_context,
        "Wikipedia Context": wiki_context,
    }
    results.append(result_data)

    print(f"예상 답: {normalized_predicted}, 실제 답: {normalized_actual}\n")
    if normalized_predicted == normalized_actual:
        correct += 1

accuracy = correct / len(prompts) * 100
print(f"총 {len(prompts)}개의 질문 중 {correct}개 맞힘. 정확도: {accuracy:.2f}%")

results_df = pd.DataFrame(results)
results_df.to_csv("results.csv", index=False)

질문 1: QUESTION1) 재학 중인 학생이 휴학을 하려면 학기 개시일로부터 며칠 이내에 휴학을 신청하야하나요?
(A) 30일
(B) 45일 
(C) 60일
(D) 90일
답변이 N/A로 표시됨: 다시 확인 중...
Searching Wikipedia for keyword: question1)
Error fetching Wikipedia page for keyword 'question1)': 'Wikipedia' object has no attribute 'search'
No relevant Wikipedia page found.
예상 답: D, 실제 답: D

질문 2: QUESTION2) '재입학은 a회에 한하여 할 수 있다. 다만 제 28조제4호에 의하여 제적된 자는 제적된 날부터 b년이 경과한 후 재입학 할 수 있다.' a와 b가 상수일 때 a+b의 값을 구하면?
(A) 2
(B) 3
(C) 4
(D) A,B,C 중 답 없음
예상 답: B, 실제 답: A

질문 3: QUESTION3) 학생이 소속 학과 또는 전공 이외의 전공 교과목을 총장이 정하는 바에 따라 몇학점 이상 취득한 때에 부전공을 이수한것으로 인정받을 수 있는가?
(A) 15학점
(B) 18학점
(C) 21학점
(D) 25학점
예상 답: C, 실제 답: C

질문 4: QUESTION4) 다음 보기의 학생들 중 제적을 당하지 않는 사람을 고르면?
(A) 팜 : 징계에 의해 퇴학처분을 받았음 
(B) 엘모 : 성적이 평점평균 1.2 로 학사경고를 연속 3회 받았음
(C) 라마 : 수업료 기타 납입금을 소정 기일 내 납입하지 못하였음
(D) 로라 : 휴학기간 경과 후 3주가 지났으나 갑자기 큰 사고가 났다는 정당한 이유 때문에 복학하지 못하였음
예상 답: D, 실제 답: D

질문 5: QUESTION5) 2019학년도 휴먼기계바이오공학부의 입학 정원은 몇 명인가? 
(A) 90명 
(B) 100명 
(C) 110명 
(D) 120명
예상 답: B, 실제 답: C

질문 6: QUESTIO