### 라이브러리 import

In [None]:
import os
import glob
import torch
import numpy as np
import pandas as pd
from tqdm import tqdm
from datetime import datetime
from typing import List
from operator import itemgetter

from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter

from sentence_transformers import SentenceTransformer
from ollama import ChatOllama  # 사용한 LLM이 Ollama API로 추정됨

### 메타데이터

In [None]:
# GPU 설정
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 메타데이터 분류 (전역 변수)
metadata_categories = {
    "construction_type": {
        '건축': ['건축물', '건설공사', '건설현장', '건설기계', '건설업체'],
        '토목': ['교량', '터널', '도로', '철도', '항만', '하천'],
        '조경': ['조경', '수목', '식재'],
        '설비': ['설비', '플랜트', '시스템', '기계'],
        '기타': []
    },
    "work_type": {
        '기초공사': ['기초', '말뚝', '지하', '굴착'],
        '구조공사': ['철골', '콘크리트', '거푸집', '동바리'],
        '설비공사': ['설비', '배관', '전기', '용접'],
        '마감공사': ['미장', '타일', '도배', '페인트'],
        '기타': []
    },
    "accident_type": {
        '추락': ['추락', '고소', '비계', '발판'],
        '낙하': ['낙하', '물체', '중량물'],
        '굴착': ['굴착', '터널', '지하'],
        '감전': ['전기', '감전'],
        '기타': []
    },
    "accident_object": {
        '건설기계': ['크레인', '펌프', '굴착기', '타워크레인'],
        '건설자재': ['철근', '콘크리트', '거푸집', '동바리'],
        '설비': ['전기설비', '배관', '용접기'],
        '기타': []
    }
}

# 파일명에서 메타데이터 추출
def extract_metadata_from_filename(filename):
    filename = os.path.splitext(filename)[0]
    metadata = {key: "기타" for key in metadata_categories}

    for meta_key, category_dict in metadata_categories.items():
        for category, keywords in category_dict.items():
            if any(keyword in filename for keyword in keywords):
                metadata[meta_key] = category
                break  

    return metadata

# 질문에서 키워드 추출하여 필터링 적용
def get_dynamic_filters(question):
    filters = {}

    for filter_key, category_dict in metadata_categories.items():
        for category, keywords in category_dict.items():
            if any(keyword in question for keyword in keywords):
                filters[filter_key] = category
                break  

    return filters

### 검색 및 필터링

In [None]:
# 검색 함수 (검색 후 메타데이터 필터링 적용)
def search_similar_sections(query, vectorstore, filters=None, k=5):
    results = vectorstore.similarity_search_with_score(query, k=k)
    
    if filters:
        filtered_results = []
        for doc, score in results:
            match = all(doc.metadata.get(key, "기타") == value for key, value in filters.items())
            if match:
                filtered_results.append((doc, score))
        results = filtered_results

    return [{
        'section': doc.page_content,
        'similarity': 1 - score,
        'metadata': doc.metadata
    } for doc, score in results]


### Vector DB 구축

In [None]:
# 벡터 데이터베이스 생성
def create_vector_database(documents, model_name='jhgan/ko-sbert-sts', batch_size=32):
    print(f"총 {len(documents)}개의 문서를 처리하여 벡터 데이터베이스를 생성합니다.")

    embeddings = HuggingFaceEmbeddings(model_name=model_name, model_kwargs={'device': device})
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1300, chunk_overlap=100)

    texts, metadatas = [], []
    for i in tqdm(range(0, len(documents), batch_size)):
        batch_docs = documents[i:i + batch_size]
        for doc in batch_docs:
            splits = text_splitter.split_text(doc["content"])
            if len(splits) == 0:
                print(f"경고: {doc['metadata']['filename']} 문서에서 텍스트를 추출하지 못함!")
                continue
            texts.extend(splits)
            metadatas.extend([doc["metadata"]] * len(splits))

    print(f"총 {len(texts)}개의 텍스트 조각이 생성되었습니다.")
    
    if len(texts) == 0:
        raise ValueError("오류: 벡터화할 텍스트가 없습니다. 문서 로드를 확인하세요!")

    return FAISS.from_texts(texts, embedding=embeddings, metadatas=metadatas)

def create_embeddings_batch(texts: List[str], model_name: str = "jhgan/ko-sbert-sts", batch_size: int = 32):
    """배치 처리로 임베딩 생성 (GPU 가속 적용)"""
    embedding = SentenceTransformer(model_name)
    embedding.to(device)  # GPU로 모델 이동
    embeddings = []
    
    for i in tqdm(range(0, len(texts), batch_size), desc="임베딩 생성 중"):
        batch = texts[i:i + batch_size]
        batch_embeddings = embedding.encode(
            batch,
            show_progress_bar=False,
            device=device,
            convert_to_numpy=True
        )
        embeddings.append(batch_embeddings)
    
    return np.vstack(embeddings)


In [None]:

def process_rag_batch(rag_chain, questions: List[str], batch_size: int = 8):
    """RAG 처리 배치 (GPU 가속 적용)"""
    results = []
    for i in tqdm(range(0, len(questions), batch_size), desc="RAG 처리 중"):
        batch_questions = questions[i:i + batch_size]
        batch_results = []
        for question in batch_questions:
            # GPU 메모리 정리
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
            result = rag_chain.invoke({"question": question})
            
            batch_results.append(result)
        results.extend(batch_results)
        if i % 50 == 0:
            print(f"답변 : {result}")
    return results

In [None]:

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
vector_db_path = f"./db/{timestamp}"
txt_path = "/home/wanted-1/potenup-workspace/Project/dacon/DACON-construction-accident-prevention/data/pdf_txt"

os.makedirs(vector_db_path, exist_ok=True)
txt_files = glob.glob(os.path.join(txt_path, '*.txt'))
documents = [{'file_path': f, 'filename': os.path.basename(f)} for f in txt_files]

processed_documents = []
for doc in tqdm(documents, desc="문서 처리 중"):
    try:
        with open(doc['file_path'], 'r', encoding='utf-8') as f:
            content = f.read()
            metadata = extract_metadata_from_filename(doc['filename'])
            processed_documents.append({"metadata": metadata, "content": content})
    except Exception as e:
        print(f"Error processing {doc['filename']}: {str(e)}")

vectorstore = create_vector_database(processed_documents, batch_size=32)
vectorstore.save_local(vector_db_path)

template = '''
{context}

### 질문:
{question}

### 지침: 당신은 건설 안전 전문가입니다.
테스트 데이터에 주어진 사고 상황에 대해, 검색된 문맥을 참고하여 답변을 한 문장으로 작성해 주세요.
- 서론, 부연 설명 없이 핵심 단어와 문구만 포함합니다.
- 최대 100 토큰 이하로만 간결하게 작성하세요.
- 대책에 대한 상세 설명을 적지 말고 대책만 예시처럼 작성하세요.
- 특수문자 를 추가하지말고 나열하세요.
- ","(comma) 사용을 최대한 줄여주세요 
- 예시: "안전관리 시스템 강화 및 사고 예방 프로토콜 개선"

[/INST]
'''

prompt = ChatPromptTemplate.from_template(template)

llm = ChatOllama(model='gemma3:27b', temperature=0.0)

rag_chain = (
    {
        'context': lambda inputs: "\n\n".join([res['section'] for res in search_similar_sections(
            inputs['question'], vectorstore, filters=get_dynamic_filters(inputs['question']), k=3
        )]),
        'question': itemgetter("question")
    }
    | prompt 
    | llm
    | StrOutputParser()
)

# 테스트 실행 및 결과 저장
print("테스트 실행 시작... 총 테스트 샘플 수:", len(combined_test_data))

questions = combined_test_data['question'].tolist()
test_results = process_rag_batch(rag_chain, questions, batch_size=8)
pred_embeddings = create_embeddings_batch(test_results, batch_size=32)

# 결과 저장
submission = pd.read_csv('/home/wanted-1/potenup-workspace/Project/dacon/DACON-construction-accident-prevention/sample_submission.csv', encoding='utf-8-sig')
submission.iloc[:,1] = test_results
submission.iloc[:,2:] = pred_embeddings

# 최종 결과를 CSV로 저장
submission.to_csv(f'./{timestamp}_submission.csv', index=False, encoding='utf-8-sig')

if torch.cuda.is_available():
    torch.cuda.empty_cache()