# 내부 문서 수집

In [None]:
# 모듈 설치
!pip install pymupdf pdfplumber

### 데이터 불러오기

In [None]:
# 추후 git hub 진행 후, github 에서 다운 받는 것으로 수정 요망
import gdown
file_id = '1CVlJFI-FGAT4cFhPy5KoZO2aav3ZRD1l'
download_url = f'https://drive.google.com/uc?id={file_id}'
gdown.download(download_url, 'data.zip', quiet=False)

In [None]:
!unzip data.zip

## PDF 텍스트 변환

In [None]:
import fitz  # PyMuPDF
import glob
import os


def merge_pdfs_to_txt(file_paths, output_file):
    with open(output_file, "w", encoding="utf-8") as out_file:
        for file_path in file_paths:
            print(f"Processing {file_path}...")
            try:
                with fitz.open(file_path) as pdf:
                    for page_num in range(len(pdf)):
                        text = pdf[page_num].get_text()
                        if text.strip():  # 빈 텍스트가 아닐 때만 추가
                            out_file.write(text)
                            out_file.write("\n\n")  # 페이지 구분
            except Exception as e:
                print(f"Error processing {file_path}: {e}")

    print(f"텍스트 파일 저장 완료: {output_file}")

In [None]:
# 디렉토리 내 모든 PDF 파일 경로
file_paths = glob.glob(os.path.join('./raw', "*.pdf"))

# 출력 TXT 파일 경로
output_txt_file = "merged_texts.txt"

# 함수 실행
merge_pdfs_to_txt(file_paths, output_txt_file)

## 특수문자 제거

In [None]:
import re

def preprocess_text(text):
    # 1. HTML 태그 제거
    text = re.sub(r"<[^>]+>", "", text)

    # 2. 의미 없는 문양 제거 (###, ***, --- 등)
    text = re.sub(r"(#+|[-*]{2,})", "", text)

    # 3. URL 또는 이메일 주소 제거
    text = re.sub(r"http[s]?://\S+|www\.\S+|[\w\.-]+@[\w\.-]+", "", text)

    # 4. 연속된 공백, 탭, 줄바꿈 정리
    text = re.sub(r"\s+", " ", text)

    # 5. 구문에 중요한 특수문자와 수식 관련 기호는 그대로 유지
    # 미제거 특수문자 : 숫자와 관련된 특수문자, 쉼표, 마침표

    return text.strip()

# 병합된 텍스트 파일 읽기
with open(output_txt_file, "r", encoding="utf-8") as f:
    merged_text = f.read()

# 전처리 실행
cleaned_text = preprocess_text(merged_text)
print(cleaned_text)

# 전처리된 텍스트를 파일로 저장
preprocessed_txt_file = "./preprocessed_texts.txt"
with open(preprocessed_txt_file, "w", encoding="utf-8") as f:
    f.write(cleaned_text)

print(f"전처리된 텍스트 파일 저장 완료: {preprocessed_txt_file}")

# 외부 문서 수집

In [None]:
!pip install jq

## 데이터 불러오기

In [None]:
import os
import zipfile
import json
from collections import defaultdict

# 압축 파일 해제
# 검사할 폴더 경로
folder_path = './'  # 폴더 경로로 수정
zip_file_path = './Other.zip'  # 압축 파일 경로

# 압축 파일 해제
try:
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        extract_path = os.path.splitext(zip_file_path)[0]  # 압축 해제 경로
        zip_ref.extractall(extract_path)
        print(f"Extracted: {zip_file_path} to {extract_path}")
    # 압축 파일 삭제 (원한다면)
except Exception as e:
    print(f"Error extracting {zip_file_path}: {e}")

## 필요 없는 키워드 파일 삭제

### 1차 소거

In [None]:
# # 제거할 키워드
# keywords = ['가사', '형사', '특허', '저작권', '상법']

# # 검사할 폴더 경로
# folder_path = './Other/QA데이터'


# # 폴더 내의 모든 파일을 순회
# for root, dirs, files in os.walk(folder_path):
#     for filename in files:
#         file_path = os.path.join(root, filename)
#         if filename.endswith('.json'):
#             try:
#                 # JSON 파일 읽기
#                 with open(file_path, 'r', encoding='utf-8') as f:
#                     data = json.load(f)

#                 # JSON 내용에서 텍스트 추출
#                 content = []

#                 def extract_text(data):
#                     if isinstance(data, dict):
#                         for value in data.values():
#                             extract_text(value)
#                     elif isinstance(data, list):
#                         for item in data:
#                             extract_text(item)
#                     else:
#                         content.append(str(data))

#                 extract_text(data)

#                 content_str = ' '.join(content)

#                 # 키워드가 내용에 포함되어 있는지 확인
#                 if any(keyword in content_str for keyword in keywords):
#                     # 파일 삭제
#                     os.remove(file_path)
#                     print(f"Removed: {file_path}")
#             except Exception as e:
#                 print(f"Error processing {file_path}: {e}")

In [None]:
# # 삭제 후 총 파일 개수 검토

# # 폴더 내의 모든 파일 개수 계산 (하위 디렉토리 포함)
# file_count = 0

# for root, dirs, files in os.walk(folder_path):
#     file_count += len(files)

# print(f"총 파일 개수: {file_count}개")

### 2차 소거

In [None]:
# #3. 빈출 법을 추출해 지워도 될만한 내용들은 지우기 - 14753개


# # 검사할 폴더 경로 (실제 경로로 변경하세요)
# folder_path = './Other/QA데이터'

# # 단어 빈도를 저장할 딕셔너리
# word_counts = defaultdict(int)

# # 정규식을 사용하여 '**~~법**'에 해당하는 단어를 찾기 위한 패턴
# pattern = re.compile(r'\b(\S*법)\b')

# # 폴더 내의 모든 JSON 파일을 순회
# for root, dirs, files in os.walk(folder_path):
#     for filename in files:
#         if filename.endswith('.json'):
#             file_path = os.path.join(root, filename)
#             try:
#                 # JSON 파일 읽기
#                 with open(file_path, 'r', encoding='utf-8') as f:
#                     data = json.load(f)

#                 # JSON 내용에서 텍스트 추출
#                 content_list = []

#                 def extract_text(data):
#                     if isinstance(data, dict):
#                         for value in data.values():
#                             extract_text(value)
#                     elif isinstance(data, list):
#                         for item in data:
#                             extract_text(item)
#                     else:
#                         content_list.append(str(data))

#                 extract_text(data)

#                 # 리스트를 문자열로 합치기
#                 content = ' '.join(content_list)

#                 # 텍스트에서 '**~~법**'에 해당하는 단어를 찾기
#                 matches = pattern.findall(content)
#                 for word in matches:
#                     word_counts[word] += 1

#             except Exception as e:
#                 print(f"Error processing {file_path}: {e}")

# # 빈도수가 100번 이상인 단어들만 필터링하여 정렬
# frequent_words = {word: count for word, count in word_counts.items() if count >= 100}
# sorted_words = sorted(frequent_words.items(), key=lambda x: x[1], reverse=True)

# # 결과 출력
# print("100번 이상 출현한 '**~~법' 단어들:")
# for word, count in sorted_words:
#     print(f"{word}: {count}번")

In [None]:
# # 2차 소거 '건축법', '학교보건법', '초·중등교육법', '주택임대차보호법' ,'유아교육법', '도로교통법', '사립학교법', '고등교육법'

# # 제거할 키워드 리스트
# keywords = ['건축법', '학교보건법', '초·중등교육법', '주택임대차보호법' ,'유아교육법', '도로교통법', '사립학교법', '고등교육법']

# # 검사할 폴더 경로 (실제 경로로 변경하세요)
# folder_path = './Other/QA데이터/' # 폴더 경로로 수정


# # 폴더 내의 모든 파일을 순회
# for root, dirs, files in os.walk(folder_path):
#     for filename in files:
#         file_path = os.path.join(root, filename)
#         if filename.endswith('.json'):
#             try:
#                 # JSON 파일 읽기
#                 with open(file_path, 'r', encoding='utf-8') as f:
#                     data = json.load(f)

#                 # JSON 내용에서 텍스트 추출
#                 content = []

#                 def extract_text(data):
#                     if isinstance(data, dict):
#                         for value in data.values():
#                             extract_text(value)
#                     elif isinstance(data, list):
#                         for item in data:
#                             extract_text(item)
#                     else:
#                         content.append(str(data))

#                 extract_text(data)

#                 content_str = ' '.join(content)

#                 # 키워드가 내용에 포함되어 있는지 확인
#                 if any(keyword in content_str for keyword in keywords):
#                     # 파일 삭제
#                     os.remove(file_path)
#                     print(f"Removed: {file_path}")
#             except Exception as e:
#                 pass # Or handle the exception appropriately

In [None]:
# # 폴더 내의 모든 파일 개수 계산 (하위 디렉토리 포함)
# file_count = 0

# for root, dirs, files in os.walk(folder_path):
#     file_count += len(files)

# print(f"총 파일 개수: {file_count}개")

## json 파일 병합

In [None]:
# 전체 파일 폴더
folder_path = './Other/QA데이터'

# 합칠 파일의 경로
output_file = './Other/combined_data.json'

# 초기 데이터셋 구조
dataset = {
    "system": "규정에 대해 자세하게 설명해줄 수 있는 챗봇입니다.",
    "train": []
}

# 폴더 내의 모든 JSON 파일을 순회하여 train list 에 저장
for filename in os.listdir(folder_path):
    if filename.endswith('.json'):
        file_path = os.path.join(folder_path, filename)
        try:
            # JSON 파일 읽기
            with open(file_path, 'r', encoding='utf-8') as infile:
                data = json.load(infile)

                # 데이터가 리스트인 경우 (여러 개의 Q&A 포함)
                if isinstance(data, list):
                    dataset["train"].extend(data)
                # 데이터가 딕셔너리인 경우 (단일 Q&A)
                elif isinstance(data, dict):
                    dataset["train"].append(data)
                else:
                    print(f"Unsupported data format in {file_path}")
        except Exception as e:
            print(f"Error processing {file_path}: {e}")

# 결과를 JSON 파일로 저장
with open(output_file, 'w', encoding='utf-8') as outfile:
    json.dump(dataset, outfile, ensure_ascii=False, indent=4)

print(f"Combined data has been saved to {output_file}")

In [None]:
# JSON 파일 로드 함수
def load_json(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        data = json.load(file)  # JSON 데이터를 Python 객체로 변환
    return data

# 사용 예시
file_path = './Other/QAData_all.json'
dataset = load_json(file_path)

# 로드된 데이터 출력
print(dataset)

## 파인 튜닝 실행을 위한 데이터 전처리(QA 변환)

In [None]:
#output1.jsonl 파일 생성
list_message = []
num_data = len(dataset["train"])

for i in range(num_data):
    question = dataset["train"][i]["question"]
    answer = dataset["train"][i]["answer"]
    message = [
        {"role": "system", "content": dataset["system"]},
        {"role": "user", "content": question},
        {"role": "assistant", "content": answer},
    ]
    list_message.append(message)

with open("output1.jsonl", "w", encoding='utf-8') as file:
    for messages in list_message:
        json_line = json.dumps({"messages": messages}, ensure_ascii=False)
        file.write(json_line + '\n')