In [1]:
import os
import tarfile
import urllib.request
import numpy as np
import networkx as nx
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Input, Embedding, LSTM, Bidirectional, Dense, Dropout, Concatenate
from tensorflow.keras.models import Model
import matplotlib.pyplot as plt

In [2]:
# Download and extract IMDb dataset
def download_and_extract_imdb_dataset():
    url = "https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz"
    data_dir = "data/imdb_reviews"
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)

    file_path = os.path.join(data_dir, "aclImdb_v1.tar.gz")
    
    if not os.path.exists(file_path):
        print("Downloading IMDb dataset...")
        urllib.request.urlretrieve(url, file_path)
    
    print("Extracting IMDb dataset...")
    with tarfile.open(file_path, "r:gz") as tar:
        tar.extractall(path=data_dir)
    print("Dataset ready.")

download_and_extract_imdb_dataset()

In [3]:
# Load data
def load_data(data_dir="data/imdb_reviews/aclImdb"):
    def load_texts_and_labels(dir_name):
        texts, labels = [], []
        for label_type in ["pos", "neg"]:
            dir_path = os.path.join(data_dir, dir_name, label_type)
            for fname in os.listdir(dir_path):
                if fname.endswith(".txt"):
                    with open(os.path.join(dir_path, fname), encoding="utf-8") as f:
                        texts.append(f.read())
                    labels.append(1 if label_type == "pos" else 0)
        return texts, labels

    train_texts, train_labels = load_texts_and_labels("train")
    test_texts, test_labels = load_texts_and_labels("test")

    tokenizer = Tokenizer(num_words=10000)
    tokenizer.fit_on_texts(train_texts)
    X_train = tokenizer.texts_to_sequences(train_texts)
    X_test = tokenizer.texts_to_sequences(test_texts)

    max_length = 500
    X_train = pad_sequences(X_train, maxlen=max_length)
    X_test = pad_sequences(X_test, maxlen=max_length)
    
    y_train = np.array(train_labels)
    y_test = np.array(test_labels)

    word_index = tokenizer.word_index

    return X_train, X_test, y_train, y_test, word_index, max_length

X_train, X_test, y_train, y_test, word_index, max_length = load_data()

In [4]:
# Create knowledge graph
def create_knowledge_graph():
    knowledge_graph = nx.Graph()
    knowledge_graph.add_edge("good", "positive")
    knowledge_graph.add_edge("excellent", "positive")
    knowledge_graph.add_edge("bad", "negative")
    knowledge_graph.add_edge("terrible", "negative")
    knowledge_graph.add_edge("great", "positive")
    knowledge_graph.add_edge("poor", "negative")
    knowledge_graph.add_edge("fantastic", "positive")
    knowledge_graph.add_edge("horrible", "negative")
    return knowledge_graph

knowledge_graph = create_knowledge_graph()

In [5]:
# Get related nodes
def get_related_nodes(graph, word):
    if word in graph:
        return list(graph.neighbors(word))
    return []

# Get knowledge embedding
def get_knowledge_embedding(word_index, graph, embedding_dim):
    embedding_matrix = np.zeros((len(word_index) + 1, embedding_dim))
    for word, i in word_index.items():
        related_nodes = get_related_nodes(graph, word)
        if related_nodes:
            related_embeddings = [np.random.rand(embedding_dim) for _ in related_nodes]
            embedding_matrix[i] = np.mean(related_embeddings, axis=0)
        else:
            embedding_matrix[i] = np.random.rand(embedding_dim)
    return embedding_matrix

In [6]:
# Build KAN model
def build_kan_model(max_length, word_index, knowledge_graph, embedding_dim=50):
    text_input = Input(shape=(max_length,), name='text_input')
    embedding_layer = Embedding(input_dim=len(word_index) + 1, output_dim=embedding_dim, input_length=max_length)(text_input)
    lstm_layer = Bidirectional(LSTM(64, return_sequences=True))(embedding_layer)
    lstm_layer = Dropout(0.5)(lstm_layer)
    lstm_layer = Bidirectional(LSTM(64))(lstm_layer)
    lstm_layer = Dropout(0.5)(lstm_layer)

    knowledge_embedding_matrix = get_knowledge_embedding(word_index, knowledge_graph, embedding_dim)
    knowledge_embedding_layer = Embedding(input_dim=len(word_index) + 1, output_dim=embedding_dim, weights=[knowledge_embedding_matrix], input_length=max_length, trainable=False)(text_input)

    concatenated = Concatenate()([lstm_layer, knowledge_embedding_layer])
    dense_layer = Dense(64, activation='relu')(concatenated)
    dense_layer = Dropout(0.5)(dense_layer)
    output_layer = Dense(1, activation='sigmoid')(dense_layer)

    model = Model(inputs=text_input, outputs=output_layer)
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

In [7]:
# Build and train model
model = build_kan_model(max_length, word_index, knowledge_graph)
history = model.fit(X_train, y_train, epochs=10, batch_size=32, validation_data=(X_test, y_test))

In [8]:
# Save model
model.save('../models/kan_movie_review_model.h5')

In [9]:
# Evaluate model
loss, accuracy = model.evaluate(X_test, y_test)
print(f"Model accuracy: {accuracy*100:.2f}%")