In [None]:
!pip install -q -U datasets
!pip install -q -U bitsandbytes
!pip install -q -U accelerate
!pip install -q -U peft
!pip install -q -U trl
!pip install -U transformers

In [None]:
!pip install faiss-gpu-cu12
!pip install -U langchain-community
!pip install -U langchain-huggingface

In [None]:
import os
import torch
import transformers
from datasets import load_from_disk
from transformers import (
    BitsAndBytesConfig,
    AutoModelForCausalLM,
    AutoTokenizer,
    Trainer,
    TextStreamer,
    pipeline
)
from peft import (
    LoraConfig,
    prepare_model_for_kbit_training,
    get_peft_model,
    get_peft_model_state_dict,
    set_peft_model_state_dict,
    TaskType,
    PeftModel
)
from trl import SFTTrainer
import pandas as pd
from datasets import load_dataset, Dataset, concatenate_datasets

from langchain.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from tqdm import tqdm

In [None]:
# 모델 수정 필요

from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig

model_id = "Qwen/Qwen2.5-7B-Instruct"

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,  # 추가적인 양자화 적용
    bnb_4bit_quant_type="nf4",  # NormalFloat4 양자화 방식 사용
    bnb_4bit_compute_dtype=torch.float16  # FP16으로 계산하여 속도 최적화
)  # 4비트 양자화 활성화

tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
    model_id, quantization_config=bnb_config, device_map="auto", trust_remote_code=True
)

In [None]:
model.eval()

In [None]:
print("모델 타입:", type(model))
print("토크나이저 타입:", type(tokenizer))

In [None]:
import pandas as pd

# 경로 확인
df = pd.read_csv('/content/drive/MyDrive/데이콘_팀공유/똑바로 다시 시작/test_df.csv', encoding = 'utf-8-sig')

df['공사종류(대분류)'] = df['공사종류'].str.split(' / ').str[0]
df['공사종류(중분류)'] = df['공사종류'].str.split(' / ').str[1]
df['공종(대분류)'] = df['공종'].str.split(' > ').str[0]
df['공종(중분류)'] = df['공종'].str.split(' > ').str[1]
df['사고객체(대분류)'] = df['사고객체'].str.split(' > ').str[0]
df['사고객체(중분류)'] = df['사고객체'].str.split(' > ').str[1]
df['인적사고'] = df['인적사고'].str.replace(r'\(.*?\)', '', regex=True)

# best_query
# query = """
# 철근콘크리트공사 작업 중 떨어짐 사고 발생.
# 키워드: 안전난간 미설치, 안전고리 미착용.
# 떨어짐 방지를 위한 안전대책 및 재발 방지 조치는?
# """


# 테스트 데이터 통합 생성
combined_df_data = df.apply(
    lambda row: {
        "question": (
            f" '{row['공종(중분류)']}' 작업 중 '{row['인적사고']}' 발생. \n"
            f"키워드: '{row['사고원인']}' \n"
            f"'{row['인적사고']}' 방지를 위한 조치는?"
        )
    },
    axis=1
)
# DataFrame으로 변환
combined_test_data = pd.DataFrame(list(combined_df_data))
combined_test_data.head()

In [None]:
######### 다시 벡터스토어 복원 방법: 연결 끊겼을 때 / 경로 수정 필수
import faiss
import pickle
import numpy as np
from langchain.vectorstores import FAISS
from langchain.embeddings.huggingface import HuggingFaceEmbeddings

# FAISS 인덱스 로드
faiss_index_path = "/content/drive/MyDrive/faiss_index.bin"
index = faiss.read_index(faiss_index_path)

# 원본 텍스트 로드
faiss_texts_path = "/content/drive/MyDrive/faiss_texts.pkl"
with open(faiss_texts_path, "rb") as f:
    all_splits = pickle.load(f)

# 원본 임베딩 벡터 로드 (새로 추가)
faiss_embeddings_path = "/content/drive/MyDrive/faiss_embeddings.npy"
embeddings_list = np.load(faiss_embeddings_path)

print("FAISS 벡터 스토어 로드 완료!")

# 임베딩 모델 로드 (이전에 사용했던 모델과 동일해야 함)
embedding_model_name = "jhgan/ko-sbert-nli"
embedding = HuggingFaceEmbeddings(model_name=embedding_model_name)

# (텍스트, 임베딩) 쌍을 생성하여 복원
text_embedding_pairs = list(zip(all_splits, embeddings_list))

vector_store = FAISS.from_embeddings(
    text_embeddings=text_embedding_pairs,
    embedding=embedding  # 임베딩 모델 객체 전달
)

# FAISS 인덱스를 다시 적용
vector_store.index = index

print("FAISS 벡터 스토어가 정상적으로 복원되었습니다!")

In [None]:
!pip install kiwipiepy
!pip install rank_bm25

In [None]:
from kiwipiepy import Kiwi
from langchain.vectorstores import FAISS
from langchain.schema import Document
from langchain.retrievers import BM25Retriever, EnsembleRetriever

# Kiwi 형태소 분석기 초기화 (BM25 토크나이저 용도)
kiwi = Kiwi()

def kiwi_tokenizer(text):
    return [token.form for token in kiwi.tokenize(text)]  # 문장을 형태소 단위로 나눔

# 벡터 검색을 위한 FAISS Vector Store의 데이터 가져오기
docs = [Document(page_content=text) for text in all_splits]  # FAISS의 text 데이터를 Document 형식으로 변환

# FAISS 기반 Retriever 생성
retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 3})

# BM25 기반 Retriever 생성 (형태소 분석기 이용)
bm25_retriever = BM25Retriever.from_documents(docs, tokenizer=kiwi_tokenizer)

# BM25 + FAISS 결합 (앙상블 검색)
ensemble_retriever = EnsembleRetriever(
    retrievers=[bm25_retriever, retriever],  # 두 개의 검색 시스템 결합
    weights=[0.8, 0.2]  # BM25와 FAISS를 동일한 가중치( 8:2)로 설정
)

In [None]:
text_generation_pipeline = pipeline(
    model=model,
    tokenizer=tokenizer,
    task="text-generation",
    do_sample=False, # False 로 하면 같은 입력에 같은 출력
    #temperature=0.1, # 0~1 까지 0에 가까울 수록 보수적인 답변 즉 창의성이 떨어짐 , do_sample = False 일떄 사용 불가
    return_full_text=False,
    max_new_tokens=128, # 문장 최대 길이 조정
    batch_size=8  # 배치 크기 지정
)

In [None]:
print("모델 타입:", type(model))
print("토크나이저 타입:", type(tokenizer))

In [None]:
from langchain_huggingface import HuggingFacePipeline
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA
from transformers import pipeline


prompt_template = """
역할: 당신은 건설 안전 전문가입니다. 검색 결과를 바탕으로 질문에 대한 조치 사항을 간결하게 작성하세요.

질문에 대한 재발 방지 대책 및 향후 조치 계획만 간결하게 답변하세요.
검색된 내용에 기반하여 조치를 작성하세요.
목차, 번호, 특수기호 없이 핵심 내용만 서술하세요.
불필요한 부연 설명, 연결어, 주어 제거
반드시 마침표로 끝나는 문장으로 작성

{context}

질문:
{question}

답변:
"""

llm = HuggingFacePipeline(pipeline=text_generation_pipeline)

# 커스텀 프롬프트 생성
prompt = PromptTemplate(
    input_variables=["context", "question"],
    template=prompt_template,
)

# RAG 체인 생성
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff", # stuff -> 그냥 합쳐서 llm에 전달 / map_reduce -> 합친거 요약해서 llm 전달 / refine -> 순차적으로 문서 추가해서 답변 보완
    retriever=retriever,
    return_source_documents=False, # 생성된 답변과 함께 사용된 소스 문서들도 반환할 수 있습니다. 출처 알려주는 개념
    chain_type_kwargs={"prompt": prompt}
)

In [None]:
print(type(qa_chain))  # 정상적으로 정의된 객체인지 확인

In [None]:
# 상위 5개 질문 가져오기
test_queries = combined_test_data["question"][:5].tolist()  #

# 답변 생성
responses = []
for idx, query in enumerate(test_queries):

    # 답변 생성
    response = qa_chain.invoke(query)

    # 결과 저장
    responses.append({"질문": query, "답변": response["result"]})

# 결과 출력
df_responses = pd.DataFrame(responses)
df_responses