# Python code classifier Ensemble

# 1. Python 코드 특징 추출기

In [14]:
import ast, keyword, builtins, tokenize, csv, os
from pathlib import Path
from io import StringIO
from typing import Dict, List

import numpy as np
import pandas as pd
import torch
from tqdm import tqdm
from transformers import RobertaTokenizer, RobertaModel
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

from joblib import load #저장된 sklearn 파이프라인 로드용


DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
TOKENIZER = RobertaTokenizer.from_pretrained("microsoft/codebert-base")
CODEBERT = RobertaModel.from_pretrained("microsoft/codebert-base").to(DEVICE).eval()

class LoopVisitor(ast.NodeVisitor):
    def __init__(self):
        self.max_depth = 0
        self.cur_depth = 0
    def _visit_loop(self, node):
        self.cur_depth += 1
        self.max_depth = max(self.max_depth, self.cur_depth)
        self.generic_visit(node)
        self.cur_depth -= 1
    visit_For = visit_While = visit_AsyncFor = _visit_loop


class FeatureExtractor:
    # Scalar features
    SCALAR_FEATURES = [
        "avg_identifier_length",
        "average_function_length",
        "token_count",
        "function_count",
        "blank_ratio",
        "identifier_count",
        "total_lines",
        "comment_ratio",
        "max_control_depth",
    ]
    # CODEBERT columns
    CODEBERT_COLS = [f"codebert_{i}" for i in range(768)]
    ALL_COLS = SCALAR_FEATURES + CODEBERT_COLS

    def __call__(self, code: str) -> Dict[str, float]:
        """
        code 한 덩어리를 넣으면 {feature_name: value, …} dictionary 리턴
        """
        ft = dict()
        # AST 기반 특징
        ft["avg_identifier_length"]  = self.avg_identifier_length(code)
        ft["average_function_length"] = self.average_function_length(code)
        ft["token_count"]            = self.token_count(code)
        ft["function_count"]         = self.function_count(code)
        ft["blank_ratio"]            = self.blank_ratio(code)
        ft["identifier_count"]       = self.identifier_count(code)
        ft["total_lines"]            = self.total_lines(code)
        ft["comment_ratio"]          = self.comment_ratio(code)
        ft["max_control_depth"]      = self.max_control_depth(code)

        # CodeBERT로 생성한 임베딩 벡터 특징
        emb = self.codebert_embedding(code)          # (768,)
        ft.update({c: v for c, v in zip(self.CODEBERT_COLS, emb)})

        return ft

    # 개별 정적 특징 함수 추출 함수
    def avg_identifier_length(self, s: str) -> float:
        try:
            tree = ast.parse(s); names=set()
            class V(ast.NodeVisitor):
                visit_Name = lambda self,n:(names.add(n.id), self.generic_visit(n))
                def visit_FunctionDef(self,n): names.add(n.name); self.generic_visit(n)
                def visit_ClassDef(self,n): names.add(n.name); self.generic_visit(n)
            V().visit(tree)
            return np.mean([len(n) for n in names]) if names else 0.0
        except SyntaxError: return 0.0

    def average_function_length(self, s: str) -> float:
        try:
            tree=ast.parse(s); lens=[]
            for n in ast.walk(tree):
                if isinstance(n, ast.FunctionDef):
                    end=n.lineno
                    for sub in ast.walk(n):
                        if hasattr(sub,'lineno'): end=max(end,sub.lineno)
                    lens.append(end-n.lineno+1)
            return np.mean(lens) if lens else 0.0
        except SyntaxError: return 0.0

    def token_count(self,s:str)->int:
        try:
            skip={tokenize.COMMENT,tokenize.NL,tokenize.NEWLINE,
                  tokenize.INDENT,tokenize.DEDENT,tokenize.ENCODING,tokenize.ENDMARKER}
            return sum(1 for t in tokenize.generate_tokens(StringIO(s).readline)
                       if t.type not in skip and not (t.type==tokenize.ERRORTOKEN and t.string.isspace()))
        except (tokenize.TokenError,IndentationError): return 0

    def function_count(self,s:str)->int:
        try: return sum(isinstance(n,ast.FunctionDef) for n in ast.walk(ast.parse(s)))
        except SyntaxError: return 0

    def blank_ratio(self,s:str)->float:
        lines=s.splitlines(); return sum(not l.strip() for l in lines)/len(lines) if lines else 0.0

    def identifier_count(self,s:str)->int:
        try:
            tree=ast.parse(s); ids=set(); kw=set(keyword.kwlist); bi=set(dir(builtins))
            class V(ast.NodeVisitor):
                def visit_Name(self,n):
                    if n.id not in kw|bi: ids.add(n.id); self.generic_visit(n)
                def visit_FunctionDef(self,n):
                    if n.name not in kw|bi: ids.add(n.name)
                    for a in n.args.args+[n.args.vararg,n.args.kwarg,*n.args.kwonlyargs]:
                        if a and a.arg not in kw|bi: ids.add(a.arg)
                    self.generic_visit(n)
                visit_AsyncFunctionDef=visit_FunctionDef
                def visit_ClassDef(self,n):
                    if n.name not in kw|bi: ids.add(n.name); self.generic_visit(n)
                def visit_Attribute(self,n):
                    if n.attr not in kw|bi: ids.add(n.attr); self.visit(n.value)
            V().visit(tree); return len(ids)
        except SyntaxError: return 0

    def total_lines(self,s:str)->int: return len(s.splitlines())

    def comment_ratio(self,s:str)->float:
        try:
            toks=list(tokenize.generate_tokens(StringIO(s).readline))
            comments=sum(t.type==tokenize.COMMENT for t in toks)
            code=sum(t.type not in {tokenize.NL,tokenize.NEWLINE,tokenize.INDENT,
                                    tokenize.DEDENT,tokenize.ENCODING,tokenize.ENDMARKER}
                     for t in toks)
            return comments/code if code else 0.0
        except (tokenize.TokenError,IndentationError): return 0.0

    def max_control_depth(self,s:str)->int:
        try: v=LoopVisitor(); v.visit(ast.parse(s)); return v.max_depth
        except SyntaxError: return 0

    # -------- CodeBERT 임베딩 --------
    def codebert_embedding(self,code:str)->np.ndarray:
        with torch.no_grad():
            inputs={k:v.to(DEVICE) for k,v in TOKENIZER(code,return_tensors='pt',
                                                         max_length=512,truncation=True).items()}
            out = CODEBERT(**inputs).last_hidden_state[0,0]
        return out.cpu().numpy()

def extract_features_from_file(path: Path) -> pd.DataFrame:
    code = path.read_text(encoding="utf-8", errors="ignore")
    fe   = FeatureExtractor()
    row  = fe(code)
    row['code_size']   = path.stat().st_size
    print(row['code_size'])

    # 컬럼 순서를 맞추기 위해 DataFrame으로 반환
    return pd.DataFrame([row])

if __name__ == "__main__":
    features = [
    "avg_identifier_length",
    "average_function_length",
    "token_count",
    "function_count",
    "blank_ratio",
    "identifier_count",
    "total_lines",
    "code_size",
    "max_control_depth",
    "comment_ratio",
] + [f'codebert_{i}' for i in range(768)]
    file_path   = Path("sample.py")

    features_df = extract_features_from_file(file_path)

    # CSV 저장
    features_df.to_csv("code_features.csv", index=False)
    print("[INFO] Saved -> code_features.csv")


2078
[INFO] Saved -> code_features.csv


# 2. RandomForest

In [15]:
from joblib import load, dump
import pandas as pd
import numpy as np

In [16]:
bundle = load("./random_forest/python_multilabel_classifier.joblib")
rf_clf  = bundle["model"]                 # Calibrated Pipeline
feat_train = rf_clf.feature_names_in_     # 학습 시 열 이름·순서

In [17]:
# 추론용 DataFrame 생성
features_df = extract_features_from_file(Path("sample.py"))
X_pred = features_df.reindex(columns=feat_train)

2078


In [18]:
# 예측
# ✅ Unique Labels: ['Human' 'deepseek' 'gemini' 'gpt' 'grok3' 'mistral']
proba   = rf_clf.predict_proba(X_pred)    # (n_samples, n_classes)
classes = rf_clf.classes_
top2_idx = np.argsort(proba[0])[::-1][:2]
top2 = [(classes[i], proba[0, i]) for i in top2_idx]

print("Top 2 Predictions:")
label_mapper = {
    0: "human",
    1: "deepseek",
    2: "gemini",
    3: "gpt",
    4: "grok3",
    5: "mistral"
}
for label, score in top2:
    print(f"{label_mapper[label]}: {score:.4f}")

Top 2 Predictions:
gpt: 0.6792
human: 0.2671


In [19]:
rf_bin_clf = load("./random_forest/python_binary_classifier.joblib")
y_pred = rf_bin_clf.predict_proba(X_pred)
print(y_pred)

[[0.55463735 0.44536265]]


# 3. XGBOOST

In [20]:
# Binary
xgb_bin_clf = load("./xgboost/python_xgb_binary.joblib")
xgboost_result = xgb_bin_clf.predict_proba(X_pred)
print(xgboost_result)

[[0.05146074 0.94853926]]


In [21]:
# Multilabel - top2
xgb_clf = load("./xgboost/python_xgb_top2.joblib")
xgboost_result = xgb_clf.predict_proba(X_pred)
classes = xgb_clf.classes_
top2_idx = np.argsort(xgboost_result[0])[::-1][:2]
top2 = [(classes[i], xgboost_result[0, i]) for i in top2_idx]

print("Top 2 Predictions:")
label_mapper = {
    0: "human",
    1: "deepseek",
    2: "gemini",
    3: "gpt",
    4: "grok3",
    5: "mistral"
}
for label, score in top2:
    print(f"{label_mapper[label]}: {score:.4f}")

Top 2 Predictions:
gpt: 0.4261
deepseek: 0.3864


# 4. SVM

In [None]:
svm_clf = load("./svm/svm_python_multi_version2.joblib")
y_proba = svm_clf.predict_proba(X_pred)
print(y_proba)

[[0.33408198 0.14951039 0.01805526 0.43811323 0.04167872 0.01856042]]


In [23]:
svm_bin_clf = load("./svm/svm_python_bin.joblib")
y_pred = svm_bin_clf.predict_proba(X_pred)
print(y_pred)

[[0.1398867 0.8601133]]




# Custom Ensemble (Voting)

In [24]:
from mlxtend.classifier import EnsembleVoteClassifier

# 앙상블 생성
eclf = EnsembleVoteClassifier(clfs=[rf_clf, xgb_clf, svm_clf], voting='soft', fit_base_estimators=False)

# ['Human' 'deepseek' 'gemini' 'gpt' 'grok3' 'mistral']
y_pred = [0, 1, 2, 3, 4, 5]
eclf.fit(X_pred, y_pred)

# 예측 수행
predictions = eclf.predict_proba(X_pred)
classes = eclf.classes_

# 클래스 레이블 매핑
label_mapper = {
    0: "human",
    1: "deepseek",
    2: "gemini",
    3: "gpt",
    4: "grok3",
    5: "mistral"
}

# 예측 확률
probs = predictions[0]

# 상위 2개 클래스 인덱스 추출
sorted_probs = probs.argsort()[::-1]

for idx in sorted_probs:
    label = label_mapper.get(idx, f"Class {idx}")
    score = probs[idx]
    print(f"{label}: {score:.4f}")

gpt: 0.5145
human: 0.2459
deepseek: 0.1902
grok3: 0.0257
gemini: 0.0152
mistral: 0.0085




In [25]:
# 모델 테스트
ensemble_model = load("ensemble_python_multi.joblib")

# Predict
ensemble_predictions = ensemble_model.predict_proba(X_pred)

print("Ensemble Predictions:")
for idx, prob in enumerate(ensemble_predictions[0]):
    label = label_mapper.get(idx, f"Class {idx}")
    print(f"{label}: {prob:.4f}")

Ensemble Predictions:
human: 0.2459
deepseek: 0.1902
gemini: 0.0152
gpt: 0.5145
grok3: 0.0257
mistral: 0.0085


In [27]:
# 앙상블 생성
eclf_bin = EnsembleVoteClassifier(clfs=[rf_bin_clf, xgb_bin_clf, svm_bin_clf], voting='soft', fit_base_estimators=False)

# ['Human' 'deepseek' 'gemini' 'gpt' 'grok3' 'mistral']
y_pred = [0, 1, 2, 3, 4, 5]
eclf_bin.fit(X_pred, y_pred)

# 예측 수행
predictions = eclf_bin.predict_proba(X_pred)
classes = eclf_bin.classes_

# 클래스 레이블 매핑
label_mapper = {
    0: "human",
    1: "deepseek",
    2: "gemini",
    3: "gpt",
    4: "grok3",
    5: "mistral"
}

# 예측 확률
bin_probs = predictions[0]
print(bin_probs)

[0.2486616 0.7513384]




In [28]:
# Ensemble 모델 추출
dump(eclf, "ensemble_python_multi.joblib")
dump(eclf_bin, "ensemble_python_binary.joblib")

['ensemble_python_binary.joblib']

In [30]:
# Test Binary classification ensemble model
test = load("ensemble_python_binary.joblib")
result = test.predict_proba(X_pred)
print(result)

[[0.2486616 0.7513384]]


