In [None]:
!pip install typing_extensions==4.7.1 --upgrade

In [None]:
!pip install peft datasets transformers bitsandbytes

In [None]:
!python -m pip install --upgrade pip
# 내부적으로 사용하는 툴의 오류를 해결하기 위해

In [None]:
!pip install langchain

In [None]:
!pip install langchain-community

In [None]:
!pip install chromadb

In [None]:
!pip install langchain_openai

In [None]:
!pip install --upgrade transformers peft

In [None]:
!pip install --upgrade torch transformers peft


In [None]:
!pip uninstall numpy -y
!pip install "numpy<2"
!pip install faiss-gpu

In [None]:
from huggingface_hub import login

# 허깅 페이스 로그인

In [None]:
import torch
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer, DataCollatorForSeq2Seq
# AutroModelForCausalLM: GPT 기반의 생성형 모델 로드하는 클래스 / DataCollatorForSeq2Seq : 데이터 배치, 데이터 로더와 함꼐 사용, 함수 제공공
from datasets import load_dataset
from peft import LoraConfig, get_peft_model, PeftModel
#PeftModel: 모델 로드드
import bitsandbytes as bnb
# bnb: 양자화에 도움을 줌/8bit의 연산을 지원해주는 라이브러리/모델을 좀 더 최적화하는데 도움
import torch.nn.functional as F
# 신경망에 적용하는 함수들

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

In [None]:
from langchain.schema import Document
import json

# 파일 경로
file_paths = {
    "term": "./metadata/term.json",
    "law": "./metadata/load_traffic_law.json",
    "modifier": "./metadata/modiflier.json",
    "car_case": "./metadata/car_to_car.json",
    "law_meta": "./metadata/precedent.json"
}

# 교통사고 케이스용 필드 상수
CASE_ID = "사건 ID"
CASE_TITLE = "사건 제목"
CASE_SITUATION = "사고상황"
BASE_RATIO = "기본 과실비율"
MODIFIERS = "케이스별 과실비율 조정예시"
LAW_REFERENCES = "관련 법규"
LEGAL_NOTES = "참고 판례"
REASON = "기본 과실비율 해설"

# JSON 로드 함수
def load_json(path):
    with open(path, 'r', encoding='utf-8') as f:
        return json.load(f)

# 리스트형 JSON 변환 (term, modifier, law_meta)
def convert_list_to_documents(data_list, doc_type):
    return [
        Document(page_content=json.dumps(item, ensure_ascii=False), metadata={"type": doc_type})
        for item in data_list
    ]

def convert_car_case_documents(data_list):
    documents = []

    def safe_value(value):
        if isinstance(value, list):
            return ", ".join(map(str, value))
        elif isinstance(value, dict):
            return json.dumps(value, ensure_ascii=False)
        elif value is None:
            return ""  # null도 허용 안 되므로 빈 문자열로 처리
        else:
            return str(value)

    for item in data_list:
        if not isinstance(item, dict):
            continue

        # page_content는 원본 전체 JSON 문자열
        content = json.dumps(item, ensure_ascii=False)

        # 기본 과실비율 해설이 리스트일 수 있음 → 문자열로 병합
        reason = item.get(REASON)
        if isinstance(reason, list):
            reason = "\n".join(map(str, reason))

        metadata = {
            "type": "car_case",
            "id": safe_value(item.get(CASE_ID)),
            "title": safe_value(item.get(CASE_TITLE)),
            "situation": safe_value(item.get(CASE_SITUATION)),
            "base_ratio": safe_value(item.get(BASE_RATIO)),
            "modifiers": safe_value(item.get(MODIFIERS)),
            "law": safe_value(item.get(LAW_REFERENCES)),
            "legal_notes": safe_value(item.get(LEGAL_NOTES)),
            "reason": safe_value(reason)
        }

        documents.append(Document(page_content=content, metadata=metadata))
    return documents

# 도로교통법 law JSON → 문서화
def convert_law_json_to_documents(data_dict):
    documents = []

    def normalize(item):
        return json.dumps(item, ensure_ascii=False) if isinstance(item, dict) else str(item)

    for law_name, content in data_dict.items():
        if isinstance(content, dict):
            for clause, text in content.items():
                lines = [normalize(x) for x in (text if isinstance(text, list) else [text])]
                full_text = f"{law_name} {clause}\n" + "\n".join(lines)
                documents.append(Document(page_content=full_text, metadata={"type": "law"}))
        else:
            lines = [normalize(x) for x in (content if isinstance(content, list) else [content])]
            full_text = f"{law_name}\n" + "\n".join(lines)
            documents.append(Document(page_content=full_text, metadata={"type": "law"}))

    return documents


# 문서화 실행
term_docs       = convert_list_to_documents(load_json(file_paths["term"]), "term")
modifier_docs   = convert_list_to_documents(load_json(file_paths["modifier"]), "modifier")
law_meta_docs   = convert_list_to_documents(load_json(file_paths["law_meta"]), "law_metadata")
car_case_docs   = convert_car_case_documents(load_json(file_paths["car_case"]))
law_docs        = convert_law_json_to_documents(load_json(file_paths["law"]))


# 전체 문서 리스트
all_docs = term_docs + modifier_docs + car_case_docs + law_meta_docs + law_docs


In [None]:
from typing_extensions import TypeIs

##### 모델 설정

In [None]:
model_name = 'saltlux/Ko-Llama3-Luxia-8B'

In [None]:
# 4-bit 양자화된 모델 로드를 위한 설정
bnb_config={
    'load_in_4bit':True,                        # 4비트 양자화 적용할 것인지
    'bnb_4bit_compute_dtype':torch.float16,     # 4비트의 연산을 수행할 때 어떤 데이터 타입을 쓸 것인지-torch.float16: 속도 최적화
    'bnb_4bit_quant_type':'nf4',                # 양자화 방식에 대한 type: nf4: 4-bit의 normal float(교안 참고): 성능 개선을 위해: 정규화화
    'device_map':'auto'                         # GPU가 여러 대일때 모두 사용할 수 있게끔 자동 설정
}

In [None]:
# 토크나이저 및 모델 로드 (모델 로드 시 4-bit 양자화 설정)
tokenizer = AutoTokenizer.from_pretrained(model_name)
base_model = AutoModelForCausalLM.from_pretrained(model_name, **bnb_config)

In [None]:
# from langchain.schema import Document
import json

# 교통사고 케이스용 필드 상수
PRECEDENT = "참고 판례"

# 데이터 전처리 함수
def preprocess_data(examples):  # input: dictionary

    # input 데이터의 질문과 문맥
    inputs = [court + caseid for court, caseid in zip(examples['court'], examples['case_id'])]

    # 정답 데이터 추출
    answer_texts = [content for content in examples['content']]

    # 토큰화
    model_inputs = tokenizer(
        inputs,
        truncation=True,
        padding='max_length',
        max_length=4096,
        return_tensors='pt'
    )
    labels = tokenizer(
        answer_texts,
        truncation=True,
        padding='max_length',
        max_length=4096,
        return_tensors='pt'
    )['input_ids']  # 입력해주는 토큰의 id -> 정답 데이터

    # # input_ids 기분으로 Labels 길이 맞춤
    # max_length = model_inputs['input_ids'].shape[1]
    # labels = labels[:, :max_length] # 라벨의 길이 맞춰줌

    # input_ids 기분으로 Labels 길이 맞춤
    max_length = model_inputs['input_ids'].shape[1]
    labels = labels[:, :max_length] # 라벨의 길이 맞춰줌
    
    # 패딩된 부분을 -100으로 설정 : loss 계산에서 무시될 수 있게끔
    labels[labels == tokenizer.pad_token_id] = -100

    model_inputs['label'] = labels

    return model_inputs

from datasets import load_dataset

dataset = load_dataset("json", data_files="./metadata/precedent.json")


In [None]:
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

In [None]:
precedent_doc = dataset['train'].map(preprocess_data, batched=True, remove_columns=dataset['train'].column_names)

##### 파인 튜닝을 위한 LoRA 설정

In [None]:
# 파인튜닝을 위한 LoRA 설정
lora_config = LoraConfig(
    r=32,
    lora_alpha=64,
    lora_dropout=0.1,
    bias='none',# 가중치 행렬만, 편향은 없음
    task_type='CAUSAL_LM'# lora가 적용될 대상 모델 타입과 맞추어준다.
)

In [None]:
#LoRA 적용

model = get_peft_model(base_model, lora_config)
model.enable_input_require_grads()          # 모델에 대해 입력값으로 받은 값에 대해 gradient를 사용하게끔하는 설정
model.gradient_checkpointing_enable()       # 체크포인트로 중간 저장: 모델이 커지게 되면 중간에 수행해준 내용들을 저장하는것이 효율적
model.print_trainable_parameters()          # 학습가능한 파라미터의 수를 출력

##### Training 설정

In [None]:
# 설정
data_collator = DataCollatorForSeq2Seq(tokenizer, model=model)  # 동적 데이터를 저장하면서 자동으로 다음 데이터를 불러옴: model의 타입에 맞추어 tokenizer을 사용함/

training_args = TrainingArguments(
    output_dir = './q_lora_korqa',      # 
    eval_strategy='no',              # 검증: 하지 않음
    save_strategy='steps',              #
    per_device_train_batch_size=4,      # 배치 사이즈: 트레인
    per_device_eval_batch_size=4,       # 배치 사이즈: eval
    gradient_accumulation_steps=8,      # 가중치를 누산하여 한번에 계산 : 8번만큼의 가중치를 누적해 놓았다가 한번에 계산
    learning_rate=2e-4,                 # 학습률
    weight_decay=0.01,                  # L2 정규화 적용 비율
    num_train_epochs=3,                 # 몇번의 epoch을 진행할 것인지
    logging_dir='./logs',               # log 남길 dir
    logging_steps=100,                  # log 남길 빈도수
    save_total_limit=2,                 # 체크포인트 최대 개수: 가장 최근의 2개 저장
    fp16=True,                          # 
    push_to_hub=False,                  # hub: hugginface에서의 허브 - true: 자동으로 hub에 저장
    report_to='none'                    # 학습결과를 표현할 수 있는 툴에 전달달
)

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=precedent_doc,
    # eval_dataset=all_docs,
    tokenizer=tokenizer,
    data_collator=data_collator
)

In [None]:
# 모델 학습
trainer.train()

In [None]:
# 모델 저장
trainer.save_model("workspace/q_lora_korqa/checkpoint")

In [None]:
from transformers import AutoConfig

trained_model_path='./q_lora_korqa/checkpoint-5661'

config = AutoConfig.from_pretrained(model_name) # 원본 모델에 대한 설정 로드
config.save_pretrained(trained_model_path)      # 체크포인트 경로에 설정 저장: 재사용하기 위해해

In [None]:
# 학습시킨 adaptor 이어붙이기
adapter_model_path = './q_lora_korqa/checkpoint-15'

base_model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype='auto', device_map='auto')

model = PeftModel.from_pretrained(base_model, adapter_model_path)



In [None]:
all_docs = []
with open('all_docs', 'r') as f:
    for line in f:
        all_docs.append(line.strip())

In [None]:
OPENAI_API_KEY= # openAPI 넣기

##### API 제한 -> 순서대로 넣기

In [None]:
from langchain.vectorstores import Chroma  # persist 지원
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter

# 1. 청크 크기 조정 (500~1000 권장)
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=50,
    length_function=len,
    is_separator_regex=False,
)

# 2. 문서 분할
all_splits = text_splitter.split_documents(law_meta_docs)

embedding_model = OpenAIEmbeddings(model='text-embedding-3-large', openai_api_key=OPENAI_API_KEY)

# 4. Chroma DB에 배치 처리로 저장
batch_size = 100  # 한 번에 처리할 청크 수
vectorstore = Chroma.from_documents(
    documents=all_splits[:batch_size],  # 첫 배치
    embedding=embedding_model,
    persist_directory="./vectordb1"
)

# 남은 청크를 순차적으로 추가
for i in range(batch_size, len(all_splits), batch_size):
    batch = all_splits[i:i+batch_size]
    vectorstore.add_documents(
        documents=batch,
        embedding=embedding_model
    )

vectorstore.persist()  



In [None]:
from langchain.vectorstores import Chroma # persist 지원
from langchain_openai import OpenAIEmbeddings



embedding_model = OpenAIEmbeddings(model='text-embedding-3-large', openai_api_key=OPENAI_API_KEY)
# embedding_model = HuggingFaceEmbeddings(model_name="upskyy/bge-m3-korean", encode_kwargs={'normalize_embeddings': True})


vectorstore = Chroma.from_documents(
    documents=law_meta_docs,
    embedding=embedding_model,
    persist_directory="./vectordb3",
    collection_metadata={"hnsw:space": "cosine"}
)
vectorstore.persist()  

In [None]:
!pip install --upgrade typing_extensions


In [None]:
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI
from transformers import pipeline
from langchain.llms import HuggingFacePipeline


# 2. 프롬프트 템플릿
prompt = PromptTemplate(
    template="""
아래 문서 내용을 바탕으로 사용자가 물어본 용어나 법률 조항, 판례에 대해 정확하고 간결하게 설명해 주세요.

질문: {question}

문서: {context}

답변 형식:
- 용어/조항 정의: [정확한 설명]
- 출처가 명시된 경우: 관련 법률/조문 번호/판례명을 반드시 포함

답변:
""",
    input_variables=["question", "context"]
)

# 3. 리트리버 설정
retriever_chroma = vectorstore.as_retriever(
    search_kwargs={
        "k": 5
        }
)
pipe = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    max_new_tokens=512,
    temperature=0.7,
)

# 1.4 LangChain 호환 래퍼 적용
llm = HuggingFacePipeline(pipeline=pipe)



In [None]:
# 4. QA 체인 구성
qa_chain_chroma = RetrievalQA.from_chain_type(
    llm=llm,
    retriever=retriever_chroma,
    chain_type="stuff",
    chain_type_kwargs={"prompt": prompt}
)

# 3. 직접 generate() 호출
def generate_text(querry, max_length=512):
    inputs = tokenizer(querry, return_tensors="pt").to(model.device)
    outputs = model.generate(
        **inputs,
        max_length=max_length,
        temperature=0.7,
        top_p=0.9
    )
    return tokenizer.decode(outputs[0], skip_special_tokens=True)

# 실행 예시
print(generate_text("서울지방법원91가합8733"))
# # 5. 실제 질의 실행
# query = "서울지방법원91가합8733에 대해 알려줘"
# res_chroma = qa_chain_chroma.invoke({"query": query})

# # 6. 출력
# print("✅ 필터 적용 후 >> Chroma 답변:\n", res_chroma["result"])



# res = qa_chain.invoke({"query": query})

# # 6. 출력
# print("✅ 답변:\n", res["result"])

In [None]:
# 4. QA 체인 구성
qa_chain_chroma = RetrievalQA.from_chain_type(
    llm=llm,
    retriever=retriever_chroma,
    chain_type="stuff",
    chain_type_kwargs={"prompt": prompt}
)

# 3. 직접 generate() 호출
def generate_text(querry, max_length=512):
    inputs = tokenizer(querry, return_tensors="pt").to(model.device)
    outputs = model.generate(
        **inputs,
        max_length=max_length,
        temperature=0.7,
        top_p=0.9
    )
    return tokenizer.decode(outputs[0], skip_special_tokens=True)

# 실행 예시
print(generate_text("서울지방법원 91가합8733에 대해 알려줘"))
# # 5. 실제 질의 실행
# query = "서울지방법원91가합8733에 대해 알려줘"
# res_chroma = qa_chain_chroma.invoke({"query": query})

# # 6. 출력
# print("✅ 필터 적용 후 >> Chroma 답변:\n", res_chroma["result"])



# res = qa_chain.invoke({"query": query})

# # 6. 출력
# print("✅ 답변:\n", res["result"])

---

In [None]:
prompt = PromptTemplate(
    template="""
다음 '사고 상황 설명'에 대해 '문서'의 내용만 참고하여 과실 비율 및 법적 판단을 생성해 주세요.

사고 상황: {question}

문서 내용: {context}

답변 형식:
1. 과실비율: A차량 xx% vs B차량 xx%
2. 사고 원인 및 판단 근거:
   - [사고의 전개, 각 차량의 행위, 과실 요소 등을 구체적으로 설명]
3. 관련 법률 조항:
   - [예: 도로교통법 제10조 제2항, 제27조 제1항 등]
4. 참고 판례:
   - [예: 대법원 2023다12345, 서울중앙지법 2022가단123456 등]

조건:
- 반드시 문서 내용만 참고해 판단하세요.
- 법률 조항과 판례가 명시되어 있지 않으면 유사하거나 추정 가능한 근거를 제시해도 됩니다.
- 전체 답변은 포맷에 맞게 간결하고 전문적으로 작성하세요.
""",
    input_variables=["question", "context"]
)

# 3. 리트리버 설정 : vector store 설정
retriever_chroma = chroma_vector_store.as_retriever(
    search_kwargs={
        "k": 5,
        "filter": {
            "case_id": "2009가단22343"
        }
    }
)

# 4. QA 체인 구성
qa_chain_chroma = RetrievalQA.from_chain_type(
    llm=model,
    retriever=retriever_chroma,
    chain_type="stuff",
    chain_type_kwargs={"prompt": prompt}
)


# 5. 실제 질의 실행
query = "서울지방법원91가합8733에 대해 알려줘"
res_chroma = qa_chain_chroma.invoke({"query": query})

# 6. 출력
print("✅ 필터 적용 후 >> Chroma 답변:\n", res_chroma["result"])


# 사용 chain 넣기
res = qa_chain.invoke({"query": query})

# 6. 출력
print("✅ 답변:\n", res["result"])