In [2]:
import numpy as np
import tensorflow as tf


class KMeans:
    def __init__(self, k):
        self.k = k

    def fit(self, X, max_iter=100):
        centroids = X[np.random.choice(X.shape[0], self.k, replace=False)]
        for i in range(max_iter):
            distances = self.euclidean_distance(X, centroids)
            labels = np.argmin(distances, axis=1)
            for j in range(self.k):
                centroids[j] = X[labels == j].mean(axis=0)
        self.centroids = centroids
        self.labels = labels

    def euclidean_distance(self, X, Y):
        return tf.sqrt(tf.reduce_sum(tf.square(X - Y[:, tf.newaxis]), axis=2))


In [16]:
from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Embedding, Bidirectional, LSTM, Dense


class BILSTMNER(Model):
    def __init__(self, vocab_size, num_tags, embedding_dim, lstm_units):
        super(BILSTMNER, self).__init__()
        # self.embedding = Embedding(vocab_size, embedding_dim, mask_zero=True)
        self.bi_lstm = Bidirectional(LSTM(lstm_units, return_sequences=True))
        self.dense = Dense(num_tags, activation="softmax")

    def call(self, inputs):
        # x = self.embedding(inputs)
        inputs = self.bi_lstm(inputs)
        return self.dense(inputs)

In [17]:
def entropy(labels):
    unique, counts = np.unique(labels, return_counts=True)
    probs = counts / len(labels)
    return -np.sum(probs * np.log(probs))

In [18]:
## 构建数据集
from gensim.models.keyedvectors import KeyedVectors
embedding = KeyedVectors.load_word2vec_format("./sgns.sikuquanshu.bigram", binary=False,unicode_errors='ignore')
label2id = {'O':7, 'B-LOC':1, 'I-LOC':2, 'B-PER':3, 'I-PER':4, 'B-ORG':5, 'I-ORG':6, '<PAD>':0}

In [19]:
def active_learning(X_train, y_train, pool_size, cluster_size, n_clusters, max_iterations):
    # Initialize the BILSTM model
    model = BILSTMNER(len(embedding.vocab), len(label2id), 128, 64)
    optimizer = tf.optimizers.Adam()

    # Initialize the pool and the active learning loop
    pool_idx = np.arange(len(X_train))
    for iteration in range(max_iterations):
        print("Iteration:", iteration + 1)

        # Train the model on the labeled data
        model.compile(optimizer=optimizer, loss="sparse_categorical_crossentropy", metrics=["accuracy"])
        model.fit(X_train, y_train, batch_size=32, epochs=10)

        # Use K-Means to cluster the unlabeled data
        k_means = KMeans(n_clusters)
        k_means.fit(X_train[pool_idx])
        centroids = k_means.centroids

        # Compute the distances and uncertainties
        distances = k_means.euclidean_distance(X_train[pool_idx], centroids)
        uncertainties = np.apply_along_axis(entropy, 1, distances)

        # Select the most representative and uncertain samples
        most_representative_idx = np.argmin(distances, axis=0)
        most_uncertain_idx = np.argsort(-uncertainties)[:cluster_size]

        # Add the most representative and uncertain samples to the labeled data
        labeled_idx = np.concatenate([most_representative_idx, most_uncertain_idx])
        X_labeled = X_train[pool_idx[labeled_idx]]
        y_labeled = y_train[pool_idx[labeled_idx]]

        # Remove the labeled samples from the pool
        pool_idx = np.delete(pool_idx, labeled_idx)

        # Test the model on the test data
        # model.compile(optimizer=optimizer, loss="sparse_categorical_crossentropy", metrics=["accuracy"])
        # loss, accuracy = model.evaluate(X_test, y_test)
        # print("Test loss:", loss)
        # print("Test accuracy:", accuracy)

        if len(pool_idx) < pool_size:
            break


In [21]:
data = [['明月几时有,把酒问青天', ['O','O','O','O','O','O','O','O','O','O', 'O']],
['至今思项羽,不肯过江东',  ['O','O','O','B-PER','I-PER','O','O','O','O','B-LOC', 'I-LOC']], 
['黄河远上白云间,一片孤城万仞山', ['B-LOC','I-LOC','O','O','O','O','O','O','O','O', 'O','O','O','O','O']],
['羌笛何须怨杨柳,春风不度玉门关', ['O','O','O','O','O','O','O','O','O','O', 'O','O','B-LOC','B-LOC','I-LOC']]
]
label2id = {'O':7, 'B-LOC':1, 'I-LOC':2, 'B-PER':3, 'I-PER':4, 'B-ORG':5, 'I-ORG':6, '<PAD>':0}
embedding['<PAD>'] = np.zeros(300)
def load_data():
    x_data = []
    y_data = []
    max_length = 300
    for item in data:
        text, entitys = item
        text = list(text)
        text += ['<PAD>' for i in range(max_length - len(text))]
        token = [embedding[word] for word in text]
        entitys = [label2id[label] for label in entitys]
        entitys+=[0 for i in range(max_length-len(entitys))]
        x_data.append(token)
        y_data.append(entitys)
    return np.array(x_data), np.array(y_data)
x, y = load_data()
active_learning(x, y, 2, 1, 4, 3)

Iteration: 1
Epoch 1/10


UnboundLocalError: in user code:

    e:\Anaconda\Anaconda\envs\ten-gpu\lib\site-packages\tensorflow\python\keras\engine\training.py:850 train_function  *
        return step_function(self, iterator)
    C:\Users\我是你~1\AppData\Local\Temp/ipykernel_1216/3865900303.py:14 call  *
        x = self.bi_lstm(x)

    UnboundLocalError: local variable 'x' referenced before assignment
