In [1]:
import json
import os
import re
from collections import defaultdict

import sys

# 필요한 경로 추가
base_path = '/home/seonghee_hong/legal_llm/dps'
sys.path.append(f'{base_path}/dps/spark')
sys.path.append(f'{base_path}/dps/spark/jobs')

from jobs.korean_job import *
from jobs.dedup_job import *
from prep.dedup_prep import *

In [2]:
### Load Sample Data
import json
import random

# Fix Random Seed
random.seed(42)

with open('/data/llmlaw/FIRST_DEDUPLICATION/Domain/CASE-LAW/deduplication.jsonl', 'r') as f:
    case_data_og = [json.loads(line) for line in f]

with open('/data/llmlaw/FIRST_DEDUPLICATION/Domain/NON-CASE-LAW/deduplication.jsonl', 'r') as f:
    non_case_data_og = [json.loads(line) for line in f]

print("Total CASE data after first deduplication:", len(case_data_og))
print("Total NON-CASE data after first deduplication:", len(non_case_data_og))

# Random Sample
case_sample = random.sample(case_data_og, 20000)
non_case_sample = random.sample(non_case_data_og, 20000)

Total CASE data after first deduplication: 192730
Total NON-CASE data after first deduplication: 231179


In [3]:
# Text만 추출
case_text = [case['text'] for case in case_sample]
non_case_text = [non_case['text'] for non_case in non_case_sample]

In [3]:
# 테스트를 위한 샘플 텍스트
sample_text = "이것은 테스트용 샘플 텍스트입니다. 1234와 같은 숫자 혹은 특수기호 *!@# 등이 포함될 수 있습니다."

# make_compat() 함수 테스트
compat_text = make_compat(sample_text)
print("After make_compat:", compat_text)

# bad_words_filter() 함수 테스트
bad_words_result = bad_words_filter(compat_text)
print("Bad words filter result:", bad_words_result)

# doc_len_filter() 함수 테스트
doc_len_result = doc_len_filter(compat_text, 10, 500)  # 예시로 최소 길이 10, 최대 길이 500 설정
print("Document length filter result:", doc_len_result)

# mean_word_len_filter() 함수 테스트
mean_word_len_result = mean_word_len_filter(compat_text, 2, 10)  # 예시로 최소 평균 단어 길이 2, 최대 10 설정
print("Mean word length filter result:", mean_word_len_result)

# symbol_to_word_ratio_filter() 함수 테스트
symbol_to_word_ratio_result = symbol_to_word_ratio_filter(compat_text, 0.1)  # 예시로 비율 0.1 설정
print("Symbol to word ratio filter result:", symbol_to_word_ratio_result)

# bullet_ellipsis_filter() 함수 테스트
bullet_ellipsis_result = bullet_ellipsis_filter(compat_text, 0.05, 0.05)  # 예시로 비율 각각 0.05 설정
print("Bullet and ellipsis filter result:", bullet_ellipsis_result)

# korean_word_ratio_filter() 함수 테스트
korean_word_ratio_result = korean_word_ratio_filter(compat_text, 0.5)  # 예시로 한국어 비율 0.5 설정
print("Korean word ratio filter result:", korean_word_ratio_result)

# preprocess_text() 함수 테스트
preprocessed_text = preprocess_text(compat_text)
print("Preprocessed text:", preprocessed_text)

After make_compat: 이것은 테스트용 샘플 텍스트입니다. 1234와 같은 숫자 혹은 특수기호 *!@# 등이 포함될 수 있습니다.
Bad words filter result: True
Document length filter result: True
Mean word length filter result: True
Symbol to word ratio filter result: True
Bullet and ellipsis filter result: True
Korean word ratio filter result: True
Preprocessed text: 이것은 테스트용 샘플 텍스트입니다. 1234와 같은 숫자 혹은 특수기호 *!@# 등이 포함될 수 있습니다.


### DPS 상에서 코드 전처리 파이프라인과 동일하게 Chaining 방식으로 테스트

In [None]:
import yaml
from pyspark.sql import SparkSession

config_path = '/home/seonghee_hong/legal_llm/dps/configs/dedup_job_case_law.yaml'

with open(config_path) as f:
    conf = yaml.load(f, Loader=yaml.FullLoader)

if conf['targets'] == ['all']:
    input_paths = f'{conf["base_dir"]}/*/*.jsonl'
else:
    # input_paths = ','.join([f'{conf["base_dir"]}/{t}/*.jsonl' for t in conf["targets"]])
    input_paths = '/data/llmlaw/FIRST_DEDUPLICATION/Domain/CASE-LAW/sampled.jsonl'

# Spark 세션 구성 변경
spark = SparkSession.builder \
    .appName("korean text processing job") \
    .config("spark.driver.memory", "15g") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", "/home/seonghee_hong/spark-log") \
    .getOrCreate()

sc = spark.sparkContext

proc_rdd = sc.textFile(input_paths).repartition(conf["n_dist"]).flatMap(read_line)

make_compat_rdd = proc_rdd.map(lambda x: dict(text=make_compat(x["text"])))
print(f"Count after make_compat: {make_compat_rdd.count()}")

bad_words_filter_rdd = make_compat_rdd.filter(lambda x: bad_words_filter(x["text"]))
print(f"Count after bad_words_filter: {bad_words_filter_rdd.count()}")

doc_len_filter_rdd = bad_words_filter_rdd.filter(lambda x: doc_len_filter(x["text"], conf["min_doc_len"], conf["max_doc_len"]))
print(f"Count after doc_len_filter: {doc_len_filter_rdd.count()}")

mean_word_len_filter_rdd = doc_len_filter_rdd.filter(lambda x: mean_word_len_filter(x["text"], conf["min_mean_word_len"], conf["max_mean_word_len"]))
print(f"Count after mean_word_len_filter: {mean_word_len_filter_rdd.count()}")

symbol_to_word_ratio_filter_rdd = mean_word_len_filter_rdd.filter(lambda x: symbol_to_word_ratio_filter(x["text"], conf["symbol_to_word_ratio"]))
print(f"Count after symbol_to_word_ratio_filter: {symbol_to_word_ratio_filter_rdd.count()}")

bullet_ellipsis_filter_rdd = symbol_to_word_ratio_filter_rdd.filter(lambda x: bullet_ellipsis_filter(x["text"], conf["bullet_point_ratio"], conf["ellipsis_ratio"]))
print(f"Count after bullet_ellipsis_filter: {bullet_ellipsis_filter_rdd.count()}")

korean_word_ratio_filter_rdd = bullet_ellipsis_filter_rdd.filter(lambda x: korean_word_ratio_filter(x["text"], conf["korean_word_ratio"]))
print(f"Count after korean_word_ratio_filter: {korean_word_ratio_filter_rdd.count()}")

final_rdd = korean_word_ratio_filter_rdd.map(lambda x: dict(text=preprocess_text(x["text"]))).filter(lambda x: doc_len_filter(x["text"], conf["min_doc_len"], conf["max_doc_len"]))
print(f"Final count after all filters and preprocessing: {final_rdd.count()}")

# Spark 세션 종료
spark.stop()