In [None]:
import io, ast, re, hashlib, tokenize, builtins, pandas as pd, warnings, black
from typing import Dict, List, Set

# SyntaxWarning (예: invalid escape sequence) 억제
warnings.filterwarnings("ignore", category=SyntaxWarning)

In [None]:
# -----------------------------
# 1) 주석 제거
# -----------------------------
def remove_comments(code: str) -> str:
    try:
        tokgen = tokenize.generate_tokens(io.StringIO(code).readline)
        out_tokens = [(tt, ts) for tt, ts, *_ in tokgen if tt != tokenize.COMMENT]
        return tokenize.untokenize(out_tokens)
    except Exception:
        stripped_lines = []
        for ln in code.splitlines():
            idx = ln.find("#")
            if idx != -1:
                prefix = ln[:idx]
                if prefix.count('"') % 2 == 0 and prefix.count("'") % 2 == 0:
                    ln = prefix.rstrip()
            stripped_lines.append(ln)
        return "\n".join(stripped_lines)

In [None]:
# -----------------------------
# 2) AST 기반 스코프 일관 치환
# -----------------------------
_BUILTINS: Set[str] = set(dir(builtins))


class Scope:
    def __init__(self, kind: str):
        self.kind = kind
        self.map: Dict[str, str] = {}
        self.counter_var = 0
        self.counter_global = 0
        self.protected: Set[str] = set()

    def new_var(self):
        self.counter_var += 1
        return f"v{self.counter_var}"

    def new_global(self):
        self.counter_global += 1
        return f"g{self.counter_global}"


class AlphaRenamer(ast.NodeTransformer):
    def __init__(self):
        super().__init__()
        self.scopes: List[Scope] = [Scope("module")]
        self.func_counter = 0
        self.cls_counter = 0

    @property
    def scope(self):
        return self.scopes[-1]

    def push(self, kind):
        self.scopes.append(Scope(kind))

    def pop(self):
        self.scopes.pop()

    def _protect_name(self, name):
        self.scope.protected.add(name)

    def _is_protected(self, name):
        if name in _BUILTINS:
            return True
        return any(name in sc.protected for sc in reversed(self.scopes))

    def _lookup(self, name):
        for sc in reversed(self.scopes):
            if name in sc.map:
                return sc.map[name]
        return name

    def _ensure_binding(self, name, is_module_level=False, allow_protected=False):
        if not allow_protected and self._is_protected(name):
            return name
        if name in self.scope.map:
            return self.scope.map[name]
        alias = (
            self.scope.new_global()
            if (self.scope.kind == "module" and is_module_level)
            else self.scope.new_var()
        )
        self.scope.map[name] = alias
        return alias

    def visit_Name(self, node):
        if isinstance(node.ctx, (ast.Store, ast.Del)):
            alias = self._ensure_binding(
                node.id,
                is_module_level=(self.scope.kind == "module"),
                allow_protected=True,
            )
            return ast.copy_location(ast.Name(id=alias, ctx=node.ctx), node)
        else:
            if self._is_protected(node.id):
                return node
            alias = self._lookup(node.id)
            return ast.copy_location(ast.Name(id=alias, ctx=node.ctx), node)

    def visit_Import(self, node):
        for alias in node.names:
            name = alias.asname or alias.name.split(".")[0]
            self._protect_name(name)
        return node

    def visit_ImportFrom(self, node):
        for alias in node.names:
            name = alias.asname or alias.name
            self._protect_name(name)
        return node

    def visit_FunctionDef(self, node):
        self.func_counter += 1
        node.name = f"func{self.func_counter}"
        self.push("function")
        for arg in node.args.posonlyargs + node.args.args + node.args.kwonlyargs:
            arg.arg = self._ensure_binding(arg.arg, allow_protected=True)
        if node.args.vararg:
            node.args.vararg.arg = self._ensure_binding(
                node.args.vararg.arg, allow_protected=True
            )
        if node.args.kwarg:
            node.args.kwarg.arg = self._ensure_binding(
                node.args.kwarg.arg, allow_protected=True
            )
        self.generic_visit(node)
        self.pop()
        return node

    def visit_ClassDef(self, node):
        self.cls_counter += 1
        node.name = f"Cls{self.cls_counter}"
        self.push("class")
        self.generic_visit(node)
        self.pop()
        return node

    def _bind_target(self, target, is_module_level=False):
        if isinstance(target, ast.Name):
            self._ensure_binding(
                target.id, is_module_level=is_module_level, allow_protected=True
            )
        elif isinstance(target, (ast.Tuple, ast.List)):
            for elt in target.elts:
                self._bind_target(elt, is_module_level=is_module_level)

    def visit_Assign(self, node):
        self.generic_visit(node.value)
        for t in node.targets:
            self._bind_target(t, is_module_level=(self.scope.kind == "module"))
            self.visit(t)
        return node

    def visit_Attribute(self, node):
        self.generic_visit(node.value)
        return node


def alpha_rename(code: str) -> str:
    try:
        tree = ast.parse(code)
        tree = AlphaRenamer().visit(tree)
        ast.fix_missing_locations(tree)
        return ast.unparse(tree)
    except Exception:
        return code

In [None]:
# -----------------------------
# 3) 전처리 파이프라인 (black 마지막에 1회)
# -----------------------------
def normalize_identifiers(code: str) -> str:
    # 1. 주석 제거
    code = remove_comments(code)
    # 2. 변수명/함수명 등 익명화
    code = alpha_rename(code)
    # 3. 연속 개행을 1개로 줄임
    code = re.sub(r"\n\s*\n+", "\n\n", code)
    # 4. 최종 포맷팅 (마지막에 적용)
    try:
        code = black.format_str(code, mode=black.Mode(line_length=88))
    except Exception:
        pass
    return code


In [None]:
# -----------------------------
# 4) DataFrame 단위 처리 + 중복 제거
# -----------------------------
def process_corpus(df: pd.DataFrame, text_col: str = "text") -> pd.DataFrame:
    out = df.copy()
    out["text_norm"] = out[text_col].astype(str).apply(normalize_identifiers)
    out["text_norm_sha1"] = out["text_norm"].apply(lambda s: hashlib.sha1(s.encode("utf-8")).hexdigest())
    out["n_chars_norm"] = out["text_norm"].str.len()
    out["n_lines_norm"] = out["text_norm"].apply(lambda s: s.count("\n") + 1 if s else 0)

    # ✅ 중복 제거 (정규화 후 동일 코드 제거)
    before = len(out)
    out = out.drop_duplicates(subset=["text_norm_sha1"]).reset_index(drop=True)
    after = len(out)
    print(f"[중복 제거 완료] {before - after}개 중복행 제거 ({after}/{before} 남음)")

    return out

In [None]:
corpus = pd.read_parquet("code_corpus.parquet")

In [None]:
corpus_processed = process_corpus(corpus, text_col='text')

[중복 제거 완료] 604124개 중복행 제거 (2639300/3243424 남음)


In [None]:
corpus_processed.to_parquet("code_corpus_processed.parquet" , index=False)