In [4]:
import re, unicodedata

EMOJI_SYMBOL_RE = re.compile(
    "["                         # BMP(비트맵 이미지 포맷)
    "\u2190-\u21FF"             # arrows
    "\u2300-\u23FF"             # misc technical
    "\u2460-\u24FF"             # enclosed alphanumerics
    "\u25A0-\u25FF"             # geometric shapes (■◆▲○●…)
    "\u2600-\u26FF"             # misc symbols (☀☂☏…)
    "\u2700-\u27BF"             # dingbats (✔✖✈…)
    "\u2B00-\u2BFF"             # misc symbols & arrows
    "\u3000-\u303F"             # CJK symbols (、。・《》【】…)
    "\uFE0F"                    # variation selector-16
    "\u200d"                    # zero width joiner
    "]"
    "|"
    "["                         # 보충 평면(astral)
    "\U0001F000-\U0001FAFF"     # emoticons/pictographs/transport/etc.
    "\U0001FB00-\U0001FBFF"
    "]", flags=re.UNICODE)

BRACKET_REMOVE_WORDS = {
    "기고", "종합", "취재현장", "현장", "취재", "데스크", "시선집중",
    "기자수첩", "사설", "인사이드", "인사이트", "쇼", "스토리", "칼럼",
    "앵커","자막","자막뉴스","리포트","특보","중계","속보","단독","전문",
    "사진","자료사진","영상","그래픽","자료", "신문","탐사보도", "컨설팅",
    "KBS","MBC","SBS","YTN","JTBC","MBN","TV조선","채널A", "프리즘",
    "연합뉴스","뉴시스","뉴스", "주장", "파이낸셜뉴스", "뉴스투데이",
    "뉴스25", "판결 앤 이슈", "뉴스데스크", "서울경제", "뉴스와 시각", "the300",
}
BRACKET_REMOVE_RE = re.compile(
    r"(?P<open>[\[\(\uFF3B\uFF08])"
    r"(?P<inner>[^()\[\]\uFF3B\uFF3D\uFF08\uFF09]{1,120})" # 대괄호/일반괄호 내부 텍스트 데이터
    r"(?P<close>[\]\)\uFF3D\uFF09])"
)

# 중요한 대괄호 정보 보존
def IMPORTANT_INFO_SAVE(inner: str) -> bool:
    inner = inner.strip()
    if "법" in inner or "법률" in inner:
        return True
    if re.search(r"제\d+조", inner):
        return True
    if re.search(r"\d{4}[-./]\d{1,2}[-./]\d{1,2}", inner): # 날짜 정보
        return True
    return False

# 방송사/기자 사인과 제작 크레딧 제거 정규식
NEWS_AGENCIES = r"(?:KBS|MBC|SBS|YTN|JTBC|MBN|TV\s*조선|채널A|연합뉴스TV)"
CREDIT_ROLE   = r"(?:촬영기자|VJ|카메라기자|사진기자?|영상(?:편집|취재)?|편집|그래픽|CG|자료|연출|구성|제작|자막)"

# 기자 사인("~기자가 취재했습니다.","~뉴스 ~입니다.")
REPORTER_SIGN_RE = re.compile(rf"""
    (?:{NEWS_AGENCIES})\s*뉴스\s*[가-힣A-Za-z·]{{2,20}}\s*입니다[\.…!"]*
    |
    [가-힣A-Za-z·]{{2,20}}\s*기자(?:가|는)?\s*
        (?:취재|보도|전해드렸|전해드리|리포트)\w*
        [\.…!"]*
""", re.VERBOSE)

AGENCY_SIGN_RE = re.compile(rf"""
    (?:(?<=^)|(?<=[\.\?!]\s))
    (?:{NEWS_AGENCIES})\s*
    [가-힣A-Za-z·]{{2,20}}\s*
    (?:기자)?\s*입니다
    [\.…!")]*
""", re.VERBOSE)

REPORTER_INLINE_RE = re.compile(
    r"(?<!\S)[가-힣A-Za-z·]{2,20}\s+기자\s*[:=]\s*"
)

# 제작 크레딧 블록("촬영기자:~/그래픽:~")
PRODUCT_CREDITS_RE = re.compile(rf"""
    {CREDIT_ROLE}\s*[:=]\s*[가-힣A-Za-z·]{{2,20}}
    (?:\s*[/,]\s*{CREDIT_ROLE}\s*[:=]\s*[가-힣A-Za-z·]{{2,20}})*
""", re.VERBOSE)

def STRIP_NEWS_CREDITS(text: str) -> str:
    if not isinstance(text, str):
        return ''
    t = text
    t = PRODUCT_CREDITS_RE.sub(' ', t)
    t = REPORTER_INLINE_RE.sub(' ', t)
    t = REPORTER_SIGN_RE.sub(' ', t)
    t = AGENCY_SIGN_RE.sub(' ', t)
    return t

# 토크나이즈용 구분자(공백, 슬래시, 콜론, 쉼표, 바)
_SEP = re.compile(r"[\/:\|\s,]+")

def rm_inner_tag(inner: str) -> bool:
    # 주요 정보 여부 체크
    if IMPORTANT_INFO_SAVE(inner):
        return False
    tokens = [tok for tok in _SEP.split(inner.strip()) if tok]
    # inner 토큰 중 금지어 일치 시 제거
    if any(tok in BRACKET_REMOVE_WORDS for tok in tokens):
        return True
    # inner 데이터가 기자/앵커로 끝나는 경우 제거
    if any(tok.endswith(("기자", "앵커")) for tok in tokens):
        return True
    # inner 토큰 중 금지어 하나라도 포함 시 제거
    if any(any(bad in inner for bad in BRACKET_REMOVE_WORDS) for _ in [0]):
        return True
    # 뉴스 크레딧 형태 제거
    if re.search(r"(?:VJ|{CREDIT_ROLE})\s*=\s*(연합뉴스|뉴시스|뉴스1|{NEWS_AGENCIES})", inner):
        return True
    return False

# 미리 등록한 삭제 대상 체크 및 제거
def STRIP_NOISE_TAGS(text: str) -> str:
    def check_inner_tags(m: re.Match) -> str:
        inner = m.group('inner').strip()
        return " " if rm_inner_tag(inner) else m.group(0)
    return BRACKET_REMOVE_RE.sub(check_inner_tags, text)

In [5]:
# 한중일 통합 한자 범위
HANJA_RE = re.compile(r"[\u3400-\u4DBF\u4E00-\u9FFF\uF900-\uFAFF\U00020000-\U0002EBEF]")

def STRIP_HANJA_PAREN_DUP(text: str) -> str:
    def repl(m: re.Match) -> str:
        inner = m.group('inner').strip()
        # 중요 정보는 그대로 둠
        if IMPORTANT_INFO_SAVE(inner):
            return m.group(0)

        # 한자 비율 계산 -> 괄호 내 절반 이상 한자일 때 제거
        core = re.sub(r"[\W_]", "", inner)  # 기호/공백 제거
        if not core:
            return m.group(0)
        hanja_cnt = len(HANJA_RE.findall(core))
        ratio = hanja_cnt / len(core)

        # 괄호 바로 앞 한국어 -> '설명용 한자(데이터 중복)' 제거
        left_ctx = text[:m.start()].rstrip()
        last_char = left_ctx[-1] if left_ctx else ""
        left_is_korean = bool(re.search(r"[가-힣]", last_char))

        if hanja_cnt > 0 and (ratio >= 0.5 or left_is_korean):
            return ""
        return m.group(0)

    return BRACKET_REMOVE_RE.sub(repl, text)

try:
    import hanja as _hanja
    def HANJA_TO_HANGUL(text: str) -> str:
        # 'substitution': 가능한 한자만 한글로 치환
        return _hanja.translate(text, 'substitution')
except Exception:
    COMMON_HANJA_MAP = {
        "李":"이","金":"김","朴":"박","崔":"최","鄭":"정","趙":"조","姜":"강","韓":"한","尹":"윤",
        "張":"장","申":"신","林":"임","任":"임","文":"문","劉":"유","柳":"유","吳":"오","洪":"홍",
        "高":"고","權":"권","裵":"배","朱":"주","馬":"마","車":"차","宋":"송"
    }
    def HANJA_TO_HANGUL(text: str) -> str:
        return "".join(COMMON_HANJA_MAP.get(ch, ch) for ch in text)

In [6]:
EMAIL_RE = re.compile(r"[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}")
PHONE_RE   = re.compile(r"(?:\+?82[-\s]?)?0?\d{1,2}[-\s]?\d{3,4}[-\s]?\d{4}")

# 카테고리 제거 정규식
CATEGORY_REMOVE = r"(?:제보하기|제보|연락처|문의|전화|이메일|메일|카카오톡|카톡|라인|사이트)"

# [카테고리]: 연락처
LABELED_CONTACT_RE = re.compile(
    r"({CATEGORY_REMOVE})\s*[:=]\s*[^.\n]*"
)

# [카테고리] 연락처(:x)
BRACKETED_LABEL_RE = re.compile(
    r"[\[\(]\s*({CATEGORY_REMOVE})\s*[\]\)]"
)

# SNS/플랫폼 구독/채널추가 CTA
CTA_RE = re.compile(
    r"(?:네이버|유튜브|YouTube|인스타그램|페이스북|트위터|X|카카오톡|카톡|라인|텔레그램|밴드|틱톡|카카오스토리)"
    r"\s*[^.\n]*?(?:구독|팔로우|친구\s*추가|채널\s*추가|구독해|검색해\s*채널\s*추가)[^.\n]*",
    re.IGNORECASE
)

# 제보 캠페인 문구
JEBO_TAGLINE_RE = re.compile(
    r"[※\s`'\"“”‘’]*당신의\s*제보가\s*뉴스가\s*됩니다[^\n]*"
)

# 기사 말미에 붙는 '기사문의, 제보하기 …' -> 문서 끝까지 제거
TRAILING_FROM_JEBO = re.compile(r"(?:제보하기|독자문의|기사문의|제보\s*문의)[\s\S]*$", re.UNICODE)

def STRIP_CONTACT_INFO(text: str) -> str:
    if not isinstance(text, str):
        return ''
    t = text
    t = LABELED_CONTACT_RE.sub(' ', t)
    t = BRACKETED_LABEL_RE.sub(' ', t)
    t = EMAIL_RE.sub(' ', t)
    t = PHONE_RE.sub(' ', t)
    t = CTA_RE.sub(' ', t)

    t = JEBO_TAGLINE_RE.sub(' ', t)
    t = TRAILING_FROM_JEBO.sub(' ', t)
    return t

In [7]:
def _tidy_punct(text: str) -> str:
    t = re.sub(r"\s*/\s*(?=[\s\W]|$)", " ", text)   # 불필요 슬래시 제거
    t = re.sub(r"\s*([,;:])\s*(?=[\.\?!])", "", t)
    t = re.sub(r"\s+([,;:])", r" \1", t)
    t = re.sub(r"([(\[])\s+", r"\1", t)
    t = re.sub(r"\s+([)\]])", r"\1", t)
    t = re.sub(r"\s{2,}", " ", t)
    return t.strip()

In [8]:
PUNCT_TRANSLATE = str.maketrans({
    "…": "...", "·": " ", "•": " ", "―": "-", "–": "-", "—": "-",
    "“": "\"", "”": "\"", "‘": "'", "’": "'",
    "◆":" ", "■":" ", "▲":" ", "△":" ", "▶":" ", "▷":" ", "▼":" ", "▽":" ", "※":" "
})

def normalize_korean(text:str) -> str:
    if not isinstance(text, str): return ''
    t = unicodedata.normalize('NFC', text) # 한국어 일관된 형태(초성/중성 분해)
    t = t.translate(PUNCT_TRANSLATE)

    t = STRIP_NOISE_TAGS(t)
    t = STRIP_NEWS_CREDITS(t)
    t = STRIP_HANJA_PAREN_DUP(t)
    t = HANJA_TO_HANGUL(t)
    t = STRIP_CONTACT_INFO(t)

    t = EMOJI_SYMBOL_RE.sub(' ', t)
    t = re.sub(r"\s+", " ", t).strip()

    t = _tidy_punct(t)
    return t

In [10]:
import pandas as pd

def preprocess_df(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out['title_norm'] = out['title'].apply(normalize_korean)
    out['content_norm'] = out['content'].apply(normalize_korean)
    return out

In [12]:
def load_preprocess_data(path: str, sheet_name=0) -> pd.DataFrame:
    usecols = ['category', 'date', 'projectId', 'title', 'content', 'url']
    df = pd.read_excel(
        path,
        sheet_name=sheet_name,
        engine="openpyxl",
        dtype=str,
        usecols=usecols
    )

    out = preprocess_df(df)
    return out

out = load_preprocess_data("1. (NEWS)_(개인정보보호법__정보통신망법).xlsx", sheet_name="news")
out.to_excel("preprocess_news_part_1.xlsx", index=False)
out = load_preprocess_data("2. (NEWS)_(자본시장법__특정금융정보법__전자금융거래법__전자증권법__금융소비자보호법).xlsx", sheet_name="news")
out.to_excel("preprocess_news_part_2.xlsx", index=False)
out = load_preprocess_data("3. (NEWS)_(아동복지법).xlsx", sheet_name="news")
out.to_excel("preprocess_news_part_3.xlsx", index=False)
out = load_preprocess_data("4. (NEWS)_(중대재해처벌법).xlsx", sheet_name="news")
out.to_excel("preprocess_news_part_4.xlsx", index=False)

뉴스 데이터 추가 전처리 - 기사 좋아요 및 댓글 수 추출

In [None]:

####################################
# 해당 코드는 로컬 터미널 환경에서 작성 및 실행되었으며, jupyter 환경에서는 호환이 불가함을 확인했습니다.
# 해당 코드의 전처리 결과 파일은 data 폴더에 저장하였습니다.
####################################

import asyncio
import re
from pathlib import Path
from typing import Optional, Dict, Any

import pandas as pd
from playwright.async_api import async_playwright
from tqdm import tqdm   # 진행바

# ===== 사용자 설정 =====
INPUT_PATH   = r"preprocess_news_part_1.xlsx"
OUTPUT_PATH  = None
URL_COL      = "url"
HEADLESS     = True
NAV_WAIT     = "networkidle"
PAGE_TIMEOUT = 25_000
SCROLL_WAIT  = 250
# ======================

def to_int(s: Optional[str]) -> Optional[int]:
    if not s:
        return None
    digits = re.sub(r"[^\d]", "", str(s))
    return int(digits) if digits.isdigit() else None

def load_table(path: str) -> pd.DataFrame:
    p = Path(path)
    if p.suffix.lower() in [".xlsx", ".xls"]:
        return pd.read_excel(p)
    return pd.read_csv(p)

def save_table(df: pd.DataFrame, in_path: str, out_path: Optional[str]) -> str:
    in_p = Path(in_path)
    out_p = Path(out_path) if out_path else in_p.with_name(in_p.stem + "_with_metrics" + in_p.suffix)
    if out_p.suffix.lower() in [".xlsx", ".xls"]:
        df.to_excel(out_p, index=False)
    else:
        df.to_csv(out_p, index=False, encoding="utf-8-sig")
    return str(out_p)

async def get_comment_count(page) -> Optional[int]:
    selectors = [
        "a.u_cbox_btn_view > span.u_cbox_count",
        "span.u_cbox_count",
        "#comment_count",
        "a#comment_count > em",
        "span#comment_count",
    ]
    for sel in selectors:
        try:
            el = await page.query_selector(sel)
            if el:
                txt = (await el.text_content() or "").strip()
                n = to_int(txt)
                if n is not None:
                    return n
        except Exception:
            pass
    try:
        html = await page.content()
        m = re.search(r"(댓글)\D*?([\d,]{1,9})", html)
        if m:
            return to_int(m.group(2))
    except Exception:
        pass
    return None

async def get_like_count(page) -> Optional[int]:
    try:
        spans = await page.query_selector_all("span.u_likeit_text._count.num")
        vals = []
        for sp in spans:
            txt = (await sp.text_content() or "").strip()
            n = to_int(txt)
            if n is not None:
                vals.append(n)
        if vals:
            return min(vals)
    except Exception:
        pass
    try:
        el = await page.query_selector(
            'div.u_likeit_module a.u_likeit_list_btn._button[data-type="like"] span.u_likeit_list_count._count'
        )
        if el:
            txt = (await el.text_content() or "").strip()
            return to_int(txt)
    except Exception:
        pass
    try:
        html = await page.content()
        m = re.search(r"추천\D*?([\d,]{1,9})", html)
        if m:
            return to_int(m.group(1))
    except Exception:
        pass
    return None

async def scrape_one(page, url: str) -> Dict[str, Any]:
    r = {"comment_count": None, "like_count": None}
    if not isinstance(url, str) or not url.startswith("http"):
        return r
    try:
        await page.goto(url, wait_until=NAV_WAIT, timeout=PAGE_TIMEOUT)
        await page.wait_for_timeout(SCROLL_WAIT)
        await page.mouse.wheel(0, 800)
        await page.wait_for_timeout(SCROLL_WAIT)

        r["comment_count"] = await get_comment_count(page)
        r["like_count"]    = await get_like_count(page)

        if r["like_count"] is None:
            await page.evaluate("window.scrollTo(0, 0)")
            await page.wait_for_timeout(SCROLL_WAIT)
            r["like_count"] = await get_like_count(page)
    except Exception:
        pass
    return r

async def main():
    df = load_table(INPUT_PATH)
    if URL_COL not in df.columns:
        raise ValueError(f"'{URL_COL}' 컬럼이 입력 파일에 없습니다. 현재 컬럼: {list(df.columns)}")

    for col in ["comment_count", "like_count"]:
        if col not in df.columns:
            df[col] = None

    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=HEADLESS)
        ctx = await browser.new_context(
            user_agent=("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                        "AppleWebKit/537.36 (KHTML, like Gecko) "
                        "Chrome/120.0.0.0 Safari/537.36"),
            locale="ko-KR",
        )
        page = await ctx.new_page()

        # tqdm으로 진행바 표시
        for idx, row in tqdm(df.iterrows(), total=len(df), desc="Processing articles"):
            url = str(row[URL_COL]) if pd.notna(row[URL_COL]) else ""
            metrics = await scrape_one(page, url)
            df.at[idx, "comment_count"] = metrics["comment_count"]
            df.at[idx, "like_count"]    = metrics["like_count"]

        await browser.close()

    out = save_table(df, INPUT_PATH, OUTPUT_PATH)
    print(f"[done] saved -> {out}")

if __name__ == "__main__":
    asyncio.run(main())
