## 개발환경
- Google Colab L4 GPU
- numpy version: 2.0.2
- pandas version: 2.2.2
- PyPDF2 version: 3.0.1
- torch version: 2.6.0+cu124
- markdown version: 3.7

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import sys
print(sys.version)

3.11.11 (main, Dec  4 2024, 08:55:07) [GCC 11.4.0]


In [None]:
%cd /content/drive/MyDrive/dacon/건설공사

/content/drive/MyDrive/dacon/건설공사


In [None]:
!pip install PyPDF2
!pip install rank_bm25
!pip install faiss-cpu

In [None]:
import numpy as np
import pandas as pd
import os
import PyPDF2
from sentence_transformers import SentenceTransformer
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer, AutoModelForSequenceClassification
from rank_bm25 import BM25Okapi
import faiss
from sklearn.preprocessing import normalize
import torch
import markdown
import random
from tqdm import tqdm
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from torch.cuda.amp import autocast

In [None]:
print(f"numpy version: {np.__version__}")
print(f"pandas version: {pd.__version__}")
print(f"PyPDF2 version: {PyPDF2.__version__}")
print(f"torch version: {torch.__version__}")
print(f"markdown version: {markdown.__version__}")
# print(f"sentence_transformers version: {SentenceTransformer.__version__}")
# print(f"transformers version: {transformers.__version__}")
# print(f"rank_bm25 version: {BM25Okapi.__module__}")
# print(f"faiss version: {faiss.__version__}")
# print(f"scikit-learn version: {normalize.__module__}")
# print(f"TfidfVectorizer version: {TfidfVectorizer.__module__}")

numpy version: 2.0.2
pandas version: 2.2.2
PyPDF2 version: 3.0.1
torch version: 2.6.0+cu124
markdown version: 3.7


In [None]:
def set_random_seeds(seed):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
set_random_seeds(42)

In [None]:
df_train = pd.read_csv("DAT/train.csv")  # 학습 데이터 로드
df_test = pd.read_csv("DAT/test.csv")  # 테스트 데이터 로드

In [None]:
pdf_folder = 'DAT/건설안전지침'
pdf_texts = []
for file in os.listdir(pdf_folder):
    if file.endswith(".pdf"):
        with open(os.path.join(pdf_folder, file), "rb") as f:
            reader = PyPDF2.PdfReader(f)
            text = "\n".join([page.extract_text() for page in reader.pages if page.extract_text()])
            markdown_text = markdown.markdown(text)
            pdf_texts.append(markdown_text)


In [None]:
pdf_texts

['<p>KOSHA GUIDE\nC - 62 - 2012\n곤돌라(Gondola)안전보건작업지침\n2012.8\n한 국 산 업 안 전 보 건 공 단\n안전보건기술지침의개요\no 제정자:김찬희\no 제정경과\n-2008년9월건설안전분야제정위원회심의\n-2008년11월총괄제정위원회심의\n-2012년7월건설안전분야제정위원회심의(개정)\no 관련규격\n-한국산업안전보건공단:곤돌라작업안전지침서\n-한국산업안전보건공단:건설공사특수작업대편람\n-한국산업안전보건공단:곤돌라안전작업에관한연구\no 관련법규\n규칙\n고시등\n-산업안전보건법제23조(안전상의조치)\n-산업안전보건기준에관한규칙제2편제1장제9절(양중기)\n-곤도라구조․규격에관한기술상의지침(노동부고시제2001-30호)\no기술지침의적용및문의\n이기술지침에대한의견또는문의는한국산업안전보건공단홈페이지안전보건\n기술지침소관분야별문의처안내를참고하시기바랍니다.\n  공표일자 : 2012년  8월 27일\n제정자:한국산업안전보건공단이사장\nKOSHA GUIDE\nC - 62 - 2012\n- 1 -곤돌라(Gondola)안전보건작업지침\n1.목적\n이지침은「산업안전보건기준에관한규칙」(이하“안전보건규칙”이라한다)제2편제1장\n제9절(양중기)의규정에따라산업현장에서사용하고있는곤돌라작업과정에서준수하\n여야할안전보건작업지침을정함을목적으로한다.\n2.적용범위\n이지침은커튼월,복합판넬,유리,석재,도장,청소,견출작업등가설식곤돌라를이\n용한건물외부작업시에적용한다.\n3.정의\n(1)이지침에서사용하는용어의정의는다음과같다.\n(가)“곤돌라(Gondola)”라함은달기발판또는운반구,승강장치,기타의장치및이\n들에부속된기계부품에의하여구성되고,와이어로프또는달기강선에의하여\n달기발판또는운반구가전용의승강장치에의하여상승또는하강하는설비를\n말한다.\n(나)“적재하중”이라함은운반구(Cage),훅(Hook)등의자체중량을뺀운반구에사\n람또는화물을싣고상승시킬수있는최대하중을말하며,하강전용곤돌라에\n서는그구조상운반구에사람또는화물을적재할수있는최대하중을말한\n다.\n(다)“허용상승속

BM25 + FAISS Hybrid Retrieval 구축

In [None]:
bm25_corpus = df_train["사고원인"].astype(str).tolist() + pdf_texts
tokenized_corpus = [text.split() for text in bm25_corpus]
bm25 = BM25Okapi(tokenized_corpus)


FAISS 벡터 인덱싱

In [None]:
embedding_model = SentenceTransformer("jhgan/ko-sbert-sts")
embeddings = embedding_model.encode(bm25_corpus, convert_to_numpy=True)
embeddings = normalize(embeddings)
faiss_index = faiss.IndexFlatL2(768)
faiss_index.add(embeddings)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/229 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/123 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/4.44k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/620 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/443M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/538 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/248k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/442M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/495k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

언어 모델 학습 또는 RAG 기반 추론 설정

In [None]:
model_name = "beomi/Llama-3-Open-Ko-8B"
# model_name = "LGAI-EXAONE/EXAONE-3.5-2.4B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16,trust_remote_code=True, device_map="auto")

tokenizer_config.json:   0%|          | 0.00/51.0k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/9.09M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/301 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/698 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/23.9k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/6 [00:00<?, ?it/s]

model-00001-of-00006.safetensors:   0%|          | 0.00/3.00G [00:00<?, ?B/s]

model-00002-of-00006.safetensors:   0%|          | 0.00/2.94G [00:00<?, ?B/s]

model-00003-of-00006.safetensors:   0%|          | 0.00/2.97G [00:00<?, ?B/s]

model-00004-of-00006.safetensors:   0%|          | 0.00/2.94G [00:00<?, ?B/s]

model-00005-of-00006.safetensors:   0%|          | 0.00/2.94G [00:00<?, ?B/s]

model-00006-of-00006.safetensors:   0%|          | 0.00/1.29G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/6 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/132 [00:00<?, ?B/s]

In [None]:
def clean_response(text):
    """반복 문장 제거 및 불필요한 기호, 시간 표현, 공백 정리"""
    if not isinstance(text, str) or not text.strip():
        return ""  # 빈 값 처리

    # 1️⃣ "년 월 일 시 분 초 경" 같은 시간 관련 표현 삭제 (공백 고려하여 정리)
    text = re.sub(r"\b\d{4}년|\b\d{1,2}월|\b\d{1,2}일|\b\d{1,2}시|\b\d{1,2}분|\b\d{1,2}초|\b경", "", text)

    # 2️⃣ 불필요한 개행 문자 및 특수문자 삭제 (공백 정리)
    text = re.sub(r"\s*\n+\s*", " ", text)  # 연속 개행 -> 공백 변환
    text = re.sub(r"[•●▶◆▸▹➤#*]", "", text)   # 추가 특수문자 포함

    # 3️⃣ 중복 문장 제거 (완전 일치하는 문장만 제거)
    sentences = re.split(r"(?<=[.!?])\s+", text)  # 문장 단위 분할
    unique_sentences = list(dict.fromkeys(sentences))  # 중복 제거 후 순서 유지
    text = " ".join(unique_sentences)
    unwanted_phrases = [
        "이번에 새로 나온 신제품을 광고하고 싶어서요.",
        "Ich möchte für ein neues Produkt werben.",
        "이번에 새로 나온 신제품을 수입하고 싶어서요.",
        "Ich möchte dieses Mal ein neues Produkt importieren.",
        "Ich möchte dieses Mal ein neues Produkt import",
    ]

    for phrase in unwanted_phrases:
        text = text.replace(phrase, "")  # 불필요한 문구 제거

    # 4️⃣ 양쪽 공백 정리 후 반환
    return text.strip()


In [None]:
reranker_model_name = 'cross-encoder/ms-marco-MiniLM-L-6-v2'
reranker_tokenizer = AutoTokenizer.from_pretrained(reranker_model_name)
reranker_model = AutoModelForSequenceClassification.from_pretrained(reranker_model_name)

tokenizer_config.json:   0%|          | 0.00/316 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/794 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

In [None]:
def rerank_documents(query, documents, reranker_model, reranker_tokenizer):
    # 쿼리와 문서 간의 유사도를 재평가
    inputs = []
    for doc in documents:
        inputs.append(reranker_tokenizer(query, doc, return_tensors="pt", padding=True, truncation=True))

    with torch.no_grad():
        reranker_scores = []
        for input_ids in inputs:
            outputs = reranker_model(**input_ids)
            score = outputs.logits.item()  # 모델의 예측 점수 추출
            reranker_scores.append(score)

    # 점수를 기준으로 문서 재정렬
    ranked_docs = [doc for _, doc in sorted(zip(reranker_scores, documents), reverse=True)]

    return ranked_docs

In [None]:
def generate_response(query, faiss_index, embedding_model, df_train, pdf_texts, train_dense_embeddings, tokenizer, model, bm25_corpus, reranker_model, reranker_tokenizer):
    if "작업자 부주의" in query:
        return "작업 전 안전교육 강화 및 작업장 위험요소 점검을 통한 재발 방지와 안전관리 교육 철저를 통한 향후 조치 계획."

    # 1️⃣ FAISS 검색
    query_embedding = embedding_model.encode([query], convert_to_numpy=True)
    query_embedding = query_embedding / np.linalg.norm(query_embedding)  # 정규화
    _, faiss_top_idx = faiss_index.search(query_embedding, 3)

    # 2️⃣ Dense Retriever 검색
    dense_query_embedding = embedding_model.encode([query])
    dense_scores = np.dot(train_dense_embeddings, dense_query_embedding.T).flatten()
    dense_top_idx = np.argsort(dense_scores)[-3:][::-1]
    # 3️⃣ BM25 검색
    tokenized_query = query.split()
    bm25 = BM25Okapi([text.split() for text in bm25_corpus])
    bm25_scores = bm25.get_scores(tokenized_query)
    bm25_top_idx = np.argsort(bm25_scores)[-3:][::-1]

    # 4️⃣ 검색 결과 통합
    combined_top_idx = set(faiss_top_idx.flatten()).union(set(dense_top_idx)).union(set(bm25_top_idx))

    # 5️⃣ 검색된 문서 추출
    retrieved_texts = []
    for idx in combined_top_idx:
        if idx < len(df_train):
            text = df_train.iloc[idx]["재발방지대책 및 향후조치계획"]
        elif idx - len(df_train) < len(pdf_texts):
            text = pdf_texts[idx - len(df_train)]
        else:
            continue

        retrieved_texts.append(text)

    # 6️⃣ Reranker 적용 (문서 재평가)
    reranked_texts = rerank_documents(query, retrieved_texts, reranker_model, reranker_tokenizer)

    # 7️⃣ 가장 대표적인 재발 방지 대책 선택 (기본 문장 포함)
    default_plan = "작업 전 안전교육 강화 및 작업장 위험요소 점검을 통한 재발 방지와 안전관리 교육 철저를 통한 향후 조치 계획."

    if reranked_texts:
        response_plan = f"작업 전 안전교육 강화 및 작업장 위험요소 점검을 통한 재발 방지와 안전관리 교육 철저를 통한 향후 조치 계획. {reranked_texts[0]}"
    else:
        response_plan = "작업 전 안전교육 강화 및 작업장 위험요소 점검을 통한 재발 방지와 안전관리 교육 철저를 통한 향후 조치 계획."

    # 8️⃣ 모델을 통한 응답 생성
    rag_prompt = f"{response_plan}"

    with torch.no_grad():
        inputs = tokenizer(rag_prompt, return_tensors="pt", truncation=True, max_length=1024).to("cuda")
        with autocast():
            outputs = model.generate(**inputs, max_new_tokens=30, temperature=None)

    response = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()

    cleaned_response = clean_response(response)

    return cleaned_response

테스트 데이터에 적용 및 벡터 변환

In [None]:
# 훈련 데이터에서 "사고 원인" 또는 적절한 텍스트를 추출하여 임베딩 계산
train_texts = df_train["사고원인"].fillna("없음").astype(str).tolist()
# 각 사고 원인에 대한 임베딩 계산
train_dense_embeddings = embedding_model.encode(train_texts, convert_to_numpy=True)

In [None]:
test_responses = []  # 응답을 저장할 리스트
test_embeddings = []  # 임베딩 벡터를 저장할 리스트

for _, row in tqdm(df_test.iterrows(), desc="Generating responses", total=len(df_test)):
    accident = str(row["사고원인"])  # 사고 원인
    generated_text = generate_response(accident, faiss_index, embedding_model, df_train, pdf_texts, train_dense_embeddings, tokenizer, model, bm25_corpus, reranker_model, reranker_tokenizer)

    # 텍스트의 길이가 길어지면 임베딩 처리 시 성능 저하 가능성 있음
    # generated_embedding을 계산할 때 한 번에 많은 텍스트를 처리하지 않도록 배치처리를 고려
    generated_embedding = embedding_model.encode(generated_text, convert_to_numpy=True)

    # 리스트에 응답 및 임베딩 추가
    test_responses.append(generated_text)
    test_embeddings.append(generated_embedding)

# 만약, 배치 처리가 필요하다면, 임베딩을 batch로 처리하도록 조정할 수 있음


  with autocast():
Generating responses: 100%|██████████| 964/964 [38:59<00:00,  2.43s/it]


In [None]:
test_responses

['작업 전 안전교육 강화 및 작업장 위험요소 점검을 통한 재발 방지와 안전관리 교육 철저를 통한 향후 조치 계획. 관리적 대책으로 차량계 건설기계 작업계획서 작성 및 검토와 펌프카 안전점검 체크리스트에 의한 사전 점검, 기술적 대책으로 펌프카 아웃트리거 설치 시 지반상태 사전 점검 및 받침대 추가 설치 확인, 유도자 또는 신호수 배치, 작업구역 설정, 타 근로자 출입금지 조치, 교육적 대책으로 펌프카 작업 전 안전교육 실시 및 확인, 당 작업 전 TBM 시 작업내용 및 위험포인트 내용 공유, 향후 조치 계획으로 재발 방지 대책 수립 및 현장 관리 철저.',
 '작업 전 안전교육 강화 및 작업장 위험요소 점검을 통한 재발 방지와 안전관리 교육 철저를 통한 향후 조치 계획. 거푸집 설치작업 시 보안경 착용, 사전 위험성 평가 및 안전교육 실시, 지속적 관리감독, 현장 특별안전점검 및 안전교육 실시를 통한 동종 및 유사재해 방지.',
 '작업 전 안전교육 강화 및 작업장 위험요소 점검을 통한 재발 방지와 안전관리 교육 철저를 통한 향후 조치 계획. 지속적인 통행로 청소와 계단실 내 보행주의 표지판 부착을 통한 전도 방지 대책.',
 '작업 전 안전교육 강화 및 작업장 위험요소 점검을 통한 재발 방지와 안전관리 교육 철저를 통한 향후 조치 계획. 규격에 맞는 시스템비계 발판 및 안전 표지판 설치, 모든 작업발판의 설치상태 재확인 실시와 작업자에 대한 현장 내 작업절차 및 안전교육 강화, 작업 전 발판 상태 확인 지시.',
 '작업 전 안전교육 강화 및 작업장 위험요소 점검을 통한 재발 방지와 안전관리 교육 철저를 통한 향후 조치 계획.',
 '작업 전 안전교육 강화 및 작업장 위험요소 점검을 통한 재발 방지와 안전관리 교육 철저를 통한 향후 조치 계획. 사고사례 전파 및 정기안전보건교육 실시, 사전 작업계획서 작성 및 관리감독자 이행 여부 확인 철저, 신호수 배치 및 작업지휘자 상주, 건축자재 운반 시 안전사고 예방을 위한 작업자 교육 실시, 지게차로 자재 운반 시

In [None]:
df_test["재발방지대책 및 향후조치계획"] = test_responses
test_embeddings = np.array(test_embeddings)

In [None]:
test_embeddings_1 = test_embeddings.copy()
df_test_1 =df_test.copy()

In [None]:
df_submission = pd.read_csv("DAT/sample_submission.csv")

In [None]:
embedding_df = pd.DataFrame(test_embeddings, columns=[f"vec_{i}" for i in range(768)])

# df_test와 embedding_df를 합치기
df_test["재발방지대책 및 향후조치계획"] = test_responses  # 응답 채우기/
df_submission_1 = pd.concat([df_test[["ID", "재발방지대책 및 향후조치계획"]], embedding_df], axis=1)

In [None]:
df_submission_1.to_csv('OUT/reranker-고정1생성1-검증.csv', index=False, encoding="utf-8-sig")

In [None]:
def save_model_weights(model, filename="model_weights.pth"):
    torch.save(model.state_dict(), filename)

save_model_weights(model, filename="last.pth")