# **Notebook C**: Patent Classification with CNN
----



# C.1. Load Packages
---

In [40]:
from google.colab import drive
drive.mount('/content/drive')

import os, re, zipfile, urllib.request, warnings, traceback
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm as tqdm_notebook

# Keras / TF
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential

# Metrics / CV
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, roc_auc_score, precision_score, recall_score, f1_score, confusion_matrix

# Text
import nltk
from textblob import TextBlob

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [41]:
warnings.filterwarnings("ignore")
nltk.download("punkt", quiet=True)  # 解决 TextBlob 的分词依赖

# 为避免 TF XLA 的随机警告
tf.get_logger().setLevel('ERROR')

In [42]:
# =========================
# 1) 路径与数据（对齐 Notebook D）
# =========================
os.chdir("/content/drive/MyDrive/USPTO_data")

# 输入
TRAIN_CSV = "./Training_Data/4K Patents - AI 20p.csv"

# 输出
os.makedirs("./Output/Model Performance", exist_ok=True)
os.makedirs("./Output/Classification Output", exist_ok=True)
PERF_CSV = "./Output/Model Performance/CNN Model Classification Performance.csv"
PRED_CSV = "./Output/Classification Output/CNN Classification Results.csv"

# 读取
TrainingData = pd.read_csv(TRAIN_CSV)
IDs           = np.array(TrainingData["app number"].values.tolist())
Abstract_Text = TrainingData["abstract"].astype(str).values.tolist()
Classes       = TrainingData["actual"].astype(int).values.tolist()

print("工作目录：", os.getcwd())
print("样本数：", len(TrainingData))


工作目录： /content/drive/MyDrive/USPTO_data
样本数： 4000


# C.2. Load Training Data ##
----------------

We are going to use the data on the Google drive. This is in a csv file, and so we are going to load the data as a dataframe, and then convert the main data (Patent Ids, Indicator for AI / Non-AI, Patent Abstract) from a Pandas DataFrame to a list (which is more easily used in later sections).

In [43]:
# =========================
# 2) 文本清洗 & Tokenizer
# =========================
def string_cleaner(text: str):
    """
    用 TextBlob 分词 + 词干化；如果失败，降级为简单的切词。
    """
    try:
        blob = TextBlob(text.lower())
        tokens = [w.stem() for w in blob.words]  # 词干
        return tokens
    except Exception:
        # 兜底：只保留字母与数字，空白切分
        text = re.sub(r"[^a-z0-9\s]+", " ", text.lower())
        return [t for t in text.split() if t]

# 转清洗后的文本
Abstracts = [" ".join(string_cleaner(x)) for x in Abstract_Text]

# 与你原 notebook 一致：只保留前 n_words 的高频词
n_words = 2000
tokenizer = Tokenizer(num_words=n_words,
                      lower=True,
                      filters='!"#$%&()*+,-./:;<=>?@[\\]^`{|}~\t\n',
                      char_level=False)
tokenizer.fit_on_texts(Abstracts)

# vocab_size 与 tokenizer.num_words 对齐（避免 index 越界）
vocab_size = len(tokenizer.word_index) + 1
if getattr(tokenizer, "num_words", None):
    vocab_size = min(vocab_size, tokenizer.num_words)


Once we have the list of words that occur in our corpus of abstracts (i.e. word index), then we can try to map those words to embedding vectors. Below we define the functions that will go through each of the words in our word index and extract the coresponding embedding vector and save it to an embedding matrix that will be used as a layer in a subsequent convolutional neural network (CNN) model.

In [44]:
# =========================
# 3) 嵌入下载与矩阵构建（支持 GloVe 6B 自动下载）
# =========================
EMB_DIR = "./Embeddings"
os.makedirs(EMB_DIR, exist_ok=True)

def _collect_glove_dims(classifiers):
    dims = set()
    for _, path, dim in classifiers:
        if path == "AUTO_GLOVE" or ("glove.6B" in str(path).lower()):
            dims.add(int(dim))
    return sorted(dims)

def ensure_glove_6B_files(dims):
    """
    如需的维度不存在，则自动下载 glove.6B.zip 并解压相应的 txt
    """
    zip_url  = "http://nlp.stanford.edu/data/glove.6B.zip"
    zip_path = os.path.join(EMB_DIR, "glove.6B.zip")
    need_any = any(not os.path.exists(os.path.join(EMB_DIR, f"glove.6B.{d}d.txt")) for d in dims)

    if need_any and not os.path.exists(zip_path):
        print("下载 glove.6B.zip ...")
        urllib.request.urlretrieve(zip_url, zip_path)

    if os.path.exists(zip_path):
        with zipfile.ZipFile(zip_path, "r") as zf:
            for d in dims:
                target = os.path.join(EMB_DIR, f"glove.6B.{d}d.txt")
                if not os.path.exists(target):
                    print(f"解压 glove.6B.{d}d.txt ...")
                    zf.extract(f"glove.6B.{d}d.txt", EMB_DIR)

def create_embedding_matrix(filepath, word_index, embedding_dim, vocab_size=None):
    """
    将词表映射到预训练词向量矩阵；OOV/越界记为0向量
    """
    if vocab_size is None:
        vocab_size = len(word_index) + 1
    mat = np.zeros((vocab_size, embedding_dim), dtype=np.float32)

    with open(filepath, encoding="utf-8") as f:
        for line in f:
            parts = line.rstrip().split()
            if len(parts) < embedding_dim + 1:
                continue
            word = parts[0]
            if word in word_index:
                idx = word_index[word]
                if idx < vocab_size:
                    vec = np.asarray(parts[1:1+embedding_dim], dtype=np.float32)
                    mat[idx] = vec
    return mat

def create_empty_matrix(vocab_size, embedding_dim):
    return np.ones((vocab_size, embedding_dim), dtype=np.float32)


# =========================
# 4) 模型结构（CNN + LSTM）
# =========================
def build_cnn_lstm(vocab_size, embedding_dim, embedding_matrix, maxlen):
    model = Sequential()
    if embedding_matrix is None:
        model.add(layers.Embedding(input_dim=vocab_size,
                                   output_dim=embedding_dim,
                                   input_length=maxlen,
                                   trainable=True))
    else:
        model.add(layers.Embedding(input_dim=vocab_size,
                                   output_dim=embedding_dim,
                                   input_length=maxlen,
                                   weights=[embedding_matrix],
                                   trainable=True))
    model.add(layers.Dropout(0.2))
    model.add(layers.Conv1D(filters=64, kernel_size=2, activation='relu'))
    model.add(layers.MaxPooling1D(pool_size=4))
    # 下面这层 Dense 会逐时间步投影到 512 维，保持时间维，兼容 LSTM
    model.add(layers.Dense(512, activation='sigmoid'))
    model.add(layers.LSTM(100))
    model.add(layers.Dense(1, activation='sigmoid'))

    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=[tf.keras.metrics.Recall(name="recall")]
    )
    return model


# =========================
# 5) 待测“嵌入配置”（全部可用）
# =========================
CLASSIFIERS = [
    ['No Embeddings',               'NONE',       50],
    ['GloVe 6B (50d)',              'AUTO_GLOVE', 50],
    ['GloVe 6B (100d)',             'AUTO_GLOVE', 100],
    ['GloVe 6B (200d)',             'AUTO_GLOVE', 200],
    ['GloVe 6B (300d)',             'AUTO_GLOVE', 300],
]

# 预拉取需要的 GloVe 维度（一次性）
_glove_dims = _collect_glove_dims(CLASSIFIERS)
if _glove_dims:
    ensure_glove_6B_files(_glove_dims)


# =========================
# 6) 训练循环（K 折 + 早停）与结果写出
# =========================
# 超参
maxlen = 200
batch_size = 50
epochs = 20
NUM_OF_SPLITS = 5

RESULTS = []
Classified_Values = []

print(f"准备运行 {len(CLASSIFIERS)} 个嵌入模型...")

for idx, (name, path, embedding_dim) in enumerate(tqdm_notebook(CLASSIFIERS, desc="Loop Through Embeddings"), 1):
    print(f"\n=== [{idx}/{len(CLASSIFIERS)}] 开始：{name} ===")

    try:
        # 1) 解析/确保嵌入文件
        real_path = path
        if path == "AUTO_GLOVE":
            real_path = os.path.join(EMB_DIR, f"glove.6B.{int(embedding_dim)}d.txt")
        elif isinstance(path, str) and ("glove.6B" in path.lower()) and (not os.path.isabs(path)):
            real_path = os.path.join(EMB_DIR, os.path.basename(path))

        if real_path != "NONE" and not os.path.exists(real_path):
            if "glove.6B" in str(real_path).lower():
                ensure_glove_6B_files([int(embedding_dim)])
        if real_path != "NONE" and not os.path.exists(real_path):
            print(f"[跳过] 嵌入文件未找到：{real_path}")
            continue

        # 2) 构造嵌入矩阵
        if real_path == "NONE":
            embedding_matrix = None
        else:
            embedding_matrix = create_embedding_matrix(
                real_path, tokenizer.word_index, embedding_dim, vocab_size=vocab_size
            )

        # 3) 每个模型独立的累计容器
        y_actual, y_predicted, y_prob, id_s = [], [], [], []

        # 4) K 折
        kf = StratifiedKFold(n_splits=NUM_OF_SPLITS, shuffle=True, random_state=1)
        for fold, (train_idx, test_idx) in enumerate(
            tqdm_notebook(kf.split(Abstracts, Classes), desc="Cross-Validating", leave=False, total=NUM_OF_SPLITS), 1
        ):
            Test_IDs = IDs[test_idx].tolist()

            X_train_txt = [Abstracts[i] for i in train_idx]
            X_test_txt  = [Abstracts[i] for i in test_idx]
            y_train     = [Classes[i] for i in train_idx]
            y_test      = [Classes[i] for i in test_idx]

            # 序列化 + padding
            X_train = pad_sequences(tokenizer.texts_to_sequences(X_train_txt), padding='post', maxlen=maxlen)
            X_test  = pad_sequences(tokenizer.texts_to_sequences(X_test_txt),  padding='post', maxlen=maxlen)

            # 5) 新建 & 训练（早停看 val_recall）
            model = build_cnn_lstm(vocab_size, embedding_dim, embedding_matrix, maxlen=maxlen)
            es = tf.keras.callbacks.EarlyStopping(
                monitor="val_recall", mode="max",
                patience=2, restore_best_weights=True, verbose=0
            )
            model.fit(
                X_train, np.array(y_train),
                batch_size=batch_size, epochs=epochs, verbose=0,
                validation_data=(X_test, np.array(y_test)),
                callbacks=[es]
            )

            # 6) 预测（概率+0/1）
            prob = model.predict(X_test, batch_size=batch_size, verbose=0).ravel()
            pred = (prob >= 0.5).astype(int)

            # 7) 累加一次
            y_actual.extend(y_test)
            y_predicted.extend(pred.tolist())
            y_prob.extend(prob.tolist())
            id_s.extend(Test_IDs)

            # 清理
            tf.keras.backend.clear_session()

        # 8) 自检长度（应与全体样本一致）
        n_total = len(Classes)
        if not (len(y_actual) == len(y_predicted) == len(y_prob) == len(id_s) == n_total):
            raise RuntimeError(
                f"长度不一致：actual={len(y_actual)}, pred={len(y_predicted)}, prob={len(y_prob)}, ids={len(id_s)}, total={n_total}"
            )

        # 9) 计算指标（AUC 用概率）
        Accuracy  = accuracy_score(y_actual, y_predicted)
        ROC       = roc_auc_score(y_actual, y_prob)
        Precision = precision_score(y_actual, y_predicted, zero_division=0)
        Recall    = recall_score(y_actual, y_predicted, zero_division=0)
        F1        = f1_score(y_actual, y_predicted, zero_division=0)
        tn, fp, fn, tp = confusion_matrix(y_actual, y_predicted).ravel()
        Share = float(np.round(np.mean(y_predicted), 3))

        # 记录
        RESULTS.append([
            name, Share, int(tp), int(fn), int(fp), int(tn),
            round(Accuracy,3), round(ROC,3),
            round(Precision,3), round(Recall,3), round(F1,3)
        ])
        Classified_Values.append(list(zip(len(id_s)*[name], id_s, y_actual, y_predicted)))

        print(f"=== 完成：{name} | Acc={Accuracy:.3f} | AUC={ROC:.3f} | F1={F1:.3f} ===")

    except Exception as e:
        print(f"[失败] {name} 出错：{e}")
        traceback.print_exc()
        continue

下载 glove.6B.zip ...
解压 glove.6B.200d.txt ...
解压 glove.6B.300d.txt ...
准备运行 5 个嵌入模型...


Loop Through Embeddings:   0%|          | 0/5 [00:00<?, ?it/s]


=== [1/5] 开始：No Embeddings ===


Cross-Validating:   0%|          | 0/5 [00:00<?, ?it/s]

=== 完成：No Embeddings | Acc=0.799 | AUC=0.523 | F1=0.000 ===

=== [2/5] 开始：GloVe 6B (50d) ===


Cross-Validating:   0%|          | 0/5 [00:00<?, ?it/s]

=== 完成：GloVe 6B (50d) | Acc=0.799 | AUC=0.526 | F1=0.000 ===

=== [3/5] 开始：GloVe 6B (100d) ===


Cross-Validating:   0%|          | 0/5 [00:00<?, ?it/s]

=== 完成：GloVe 6B (100d) | Acc=0.799 | AUC=0.523 | F1=0.000 ===

=== [4/5] 开始：GloVe 6B (200d) ===


Cross-Validating:   0%|          | 0/5 [00:00<?, ?it/s]

=== 完成：GloVe 6B (200d) | Acc=0.799 | AUC=0.525 | F1=0.000 ===

=== [5/5] 开始：GloVe 6B (300d) ===


Cross-Validating:   0%|          | 0/5 [00:00<?, ?it/s]

=== 完成：GloVe 6B (300d) | Acc=0.799 | AUC=0.515 | F1=0.000 ===


In [45]:
# =========================
# 7) 结果写出（对齐 Notebook D 风格）
# =========================
RESULTS_TABLE = pd.DataFrame(
    RESULTS,
    columns=["Name","Share","True-Positives","False-Negatives","False-Positives","True-Negatives",
             "Accuracy","AUC","Precision","Recall","F1"]
)
RESULTS_TABLE["Type"] = "CNN"
RESULTS_TABLE = RESULTS_TABLE[["Name","Type","Share","True-Positives","False-Negatives",
                               "False-Positives","True-Negatives","Accuracy","AUC","Precision","Recall","F1"]]

RESULTS_TABLE.sort_values("Accuracy", ascending=False).to_csv(PERF_CSV, index=False, encoding="utf-8-sig")
print("已保存：", os.path.abspath(PERF_CSV))

# 逐样本横表：第一列 id，第二列 Actual，后面每个模型一列
Final = None
for bundle in Classified_Values:
    Temp = pd.DataFrame(bundle, columns=["Model","id","Actual","Predicted"])
    if Final is None:
        name0 = Temp.head(1)["Model"].iloc[0]
        Final = Temp[["id","Actual","Predicted"]].rename(columns={"Predicted": name0})
    else:
        name0 = Temp.head(1)["Model"].iloc[0]
        Final = Final.merge(Temp[["id","Predicted"]].rename(columns={"Predicted": name0}), on="id", how="outer")

Final.to_csv(PRED_CSV, index=False, encoding="utf-8-sig")
print("已保存：", os.path.abspath(PRED_CSV))

已保存： /content/drive/MyDrive/USPTO_data/Output/Model Performance/CNN Model Classification Performance.csv
已保存： /content/drive/MyDrive/USPTO_data/Output/Classification Output/CNN Classification Results.csv
