In [1]:
import os, zipfile, numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import balanced_accuracy_score

In [2]:
def load_train_zip(zip_path: str):
    contents, labels, names = [], [], []
    with zipfile.ZipFile(zip_path) as z:
        for name in z.namelist():
            if name.endswith("/") or name.endswith(".labels"):
                continue
            base = os.path.basename(name)
            if not (base.endswith(".0") or base.endswith(".1")):
                continue
            lab = int(base.rsplit(".", 1)[1])
            contents.append(z.read(name))
            labels.append(lab)
            names.append(name)
    return names, contents, np.asarray(labels, dtype=int)

def load_test_zip(zip_path: str):
    names, contents = [], []
    with zipfile.ZipFile(zip_path) as z:
        for name in z.namelist():
            if name.endswith("/") or name.endswith(".labels"):
                continue
            contents.append(z.read(name))
            names.append(name)
    return names, contents

names, pdf_bytes, labels = load_train_zip("pdf-train.zip")

In [3]:
from minimal_pdfid import PDFiD

def _attr_float(root, name):
    v = root.getAttribute(name)
    return float(v) if v not in ('', 'N/A') else 0.0

def _attr_int(root, name):
    v = root.getAttribute(name)
    return int(v) if v != '' else 0

def extract_features(path_or_bytes):
    xml = PDFiD(path_or_bytes, allNames=True, extraData=True, force=True)
    root = xml.documentElement

    total_entropy = _attr_float(root, "TotalEntropy")
    stream_entropy = _attr_float(root, "StreamEntropy")
    non_stream_entropy = _attr_float(root, "NonStreamEntropy")
    total_count = _attr_int(root, "TotalCount")
    stream_count = _attr_int(root, "StreamCount")
    non_stream_count = _attr_int(root, "NonStreamCount")
    count_eof = _attr_int(root, "CountEOF")
    chars_after_last_eof = _attr_int(root, "CountCharsAfterLastEOF")

    def kw(name):
        try:
            node = [n for n in root.getElementsByTagName("Keywords")[0].childNodes
                    if n.getAttribute("Name") == name][0]
            return int(node.getAttribute("Count") or 0)
        except Exception:
            return 0

    obj = kw("obj")
    encrypt = kw("/Encrypt")
    js = kw("/JS")
    javascript = kw("/JavaScript")
    aa = kw("/AA")
    openaction = kw("/OpenAction")
    launch = kw("/Launch")
    richmedia = kw("/RichMedia")
    embeddedfile = kw("/EmbeddedFile")
    acroform = kw("/AcroForm")
    jbig2 = kw("/JBIG2Decode")
    xfa = kw("/XFA")
    colors_gt_2_24 = kw("/Colors > 2^24")
    js_per_obj = round((js + javascript) / max(obj, 1), 4)
    stream_entropy_ratio = round(stream_entropy / max(total_entropy, 1e-9), 4) if total_entropy > 0 else 0.0
    stream_bytes_ratio = round(stream_count / max(total_count, 1), 4)

    feats = {
        # "count_eof": count_eof,
        # "chars_after_last_eof": chars_after_last_eof,
        # "total_entropy": round(total_entropy, 4),
        # "stream_entropy": round(stream_entropy, 4),
        # "non_stream_entropy": round(non_stream_entropy, 4),
        # "total_bytes": total_count,
        # "stream_bytes": stream_count,
        # "non_stream_bytes": non_stream_count,
        # "stream_bytes_ratio": stream_bytes_ratio,
        # "stream_entropy_ratio": stream_entropy_ratio,
        "js_count": js,
        # "javascript_count": javascript,
        # "aa_count": aa,
        # "openaction_count": openaction,
        # "launch_count": launch,
        # "richmedia_count": richmedia,
        # "embeddedfile_count": embeddedfile,
        # "acroform_count": acroform,
        # "jbig2decode_count": jbig2,
        # "xfa_count": xfa,
        # "colors_gt_2_24_count": colors_gt_2_24,
        # "encrypt_count": encrypt,
        # "js_per_obj": js_per_obj,
        # "total_entropy_rounded": round(total_entropy, 2),
        # "stream_entropy_rounded": round(stream_entropy, 2),
    }

    return feats

In [4]:
def features_to_matrix(pdf_bytes):
    rows = [extract_features(b) for b in pdf_bytes]
    df = pd.DataFrame(rows).fillna(0.0)
    X = df.to_numpy(dtype=float)
    cols = df.columns.tolist()
    return X, cols, df

In [5]:
def main():
    train_zip = "pdf-train.zip"
    test_zip  = "pdf-test.zip"

    print("[+] Lade Trainingsdaten…")
    train_names, train_files, y = load_train_zip(train_zip)

    print("[+] Extrahiere Features (Train)…")
    X_train, feat_names, _ = features_to_matrix(train_files)
    print(f"[+] X_train: {X_train.shape}, Features: {feat_names}")

    print("[+] GridSearchCV (CV, refit auf ALLEN Trainingsdaten)…")
    grid = GridSearchCV(
        Pipeline([
            ("scaler", StandardScaler()),
            ("svm", SVC(probability=False, random_state=42))
        ]),
        {
            "svm__kernel": ["rbf", "linear"],
            "svm__C": [1, 10, 100],
            "svm__gamma": ["scale", "auto"],
        },
        scoring="balanced_accuracy",
        cv=5,
        n_jobs=-1,
        refit=True,
        verbose=1,
    )
    grid.fit(X_train, y)
    pipe = grid.best_estimator_
    print("[+] Beste Parameter:", grid.best_params_)

    print("[+] Lade Testdaten…")
    test_files, test_pdf = load_test_zip(test_zip)

    print("[+] Extrahiere Features (Test)…")
    X_test, _, _ = features_to_matrix(test_pdf)

    print("[+] Erzeuge Vorhersagen…")
    predictions = pipe.predict(X_test)

    print("[+] Schreibe output.csv …")
    with open("output.csv", "w", encoding="utf-8") as f:
        for path, pred in zip(test_files, predictions):
            f.write(f"{path};{int(pred)}\n")

    print("[+] Fertig: output.csv")


if __name__ == "__main__":
    main()

[+] Lade Trainingsdaten…


[+] Extrahiere Features (Train)…


[+] X_train: (6144, 1), Features: ['js_count']
[+] GridSearchCV (CV, refit auf ALLEN Trainingsdaten)…
Fitting 5 folds for each of 12 candidates, totalling 60 fits


[+] Beste Parameter: {'svm__C': 1, 'svm__gamma': 'scale', 'svm__kernel': 'linear'}
[+] Lade Testdaten…


[+] Extrahiere Features (Test)…


[+] Erzeuge Vorhersagen…
[+] Schreibe output.csv …
[+] Fertig: output.csv
