# CZ4042 Assignment 2 : Part B : Text Classification

In [19]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import Model, layers
import csv
import re
import pylab
import en_core_web_lg
from tqdm import tqdm
from collections import Counter
from tensorflow.keras.utils import to_categorical

In [2]:
# set seed
SEED = 10
tf.random.set_seed(SEED)
np.random.seed(SEED)

In [3]:
# parameters
MAX_DOC_LENGTH = 100
CHAR_LIMIT = 256
VOCAB_SIZE = 10000
NUM_CATEGORY = 15
NLP = en_core_web_lg.load()

N_FILTERS = 10
FILTER_SHAPE1 = [20, 256]
POOLING_WINDOW = 4
POOLING_STRIDE = 2
MAX_LABEL = 15

batch_size = 128
one_hot_size = 256
no_epochs = 250
lr = 0.01

## Question 1 : CNN Classifier

In [30]:
def load_data_from_file():
    """
    Load strings from file.
    """
    
    with open('./train_medium.csv', encoding='utf-8') as filex:
        reader = csv.reader(filex)
        x_train = [row[1] for row in reader]
        y_train = [int(row[0]) for row in reader]

    with open('./test_medium.csv', encoding='utf-8') as filex:
        reader = csv.reader(filex)
        x_test = [row[1] for row in reader]
        y_test = [int(row[0]) for row in reader]

    return x_train, y_train, x_test, y_test

def build_char_dict(x_train, x_test):
    """
    Build character mapping to indices.
    """
    count = Counter()
    for d in x_train + x_test:
        d = d.lower() 
        count.update(d)
    
    most_common = count.most_common(CHAR_LIMIT)
    most_common = [x[0] for x in most_common]
    char_dict = {ch : i for i, ch in enumerate(most_common)}
    
    return char_dict

def preprocess_char(data, char_dict):
    """
    Process string for character CNN.
    """
    cleaned = []
    for text in tqdm(data):        
        text = text.strip().lower() # lower case
        text = re.sub("\s+", " ", text) # compress white space
        text = [ch for ch in text if ch in char_dict] # remove char not in char_dict
        
        text = text[:MAX_DOC_LENGTH] # cut down to max char length
        text = text + [' '] * max(0, MAX_DOC_LENGTH - len(text))  # pad with spaces
        text = [char_dict[ch] for ch in text] # convert to id
        
        cleaned.append(text)
        
    return np.array(cleaned)

def get_data_ch():
    """
    Get data in character format.
    """
    x_train, y_train, x_test, y_test = load_data_from_file()
    
    char_dict = build_char_dict(x_train, x_test)
    
    x_train = preprocess_char(x_train, char_dict)
    y_train = np.array(y_train)
    
    x_test = preprocess_char(x_test, char_dict)
    y_test = np.array(y_test)
    
    return x_train, y_train, x_test, y_test

In [50]:
# model builder for Character CNN Classifier
def build_char_CNN():
    # fixed parameter
    learning_rate = 0.01
    batch_size = 128
    
    # model
    model = tf.keras.Sequential()
    
    model.add(layers.Input(shape=(MAX_DOC_LENGTH, CHAR_LIMIT)))
    
    model.add(layers.Conv2D(filters=10, kernel_size=(20,256), activation='relu', padding='valid', input_shape=(None,CHAR_LIMIT)))
    model.add(layers.MaxPooling2D(pool_size=4, strides=2,padding="same"))
    
    model.add(layers.Conv2D(filters=10, kernel_size=(20,1), activation='relu', padding='valid'))
    model.add(layers.MaxPooling2D(pool_size=4, strides=2,padding="same"))
    model.add(layers.Flatten())
    
    model.add(layers.Dense(units=NUM_CATEGORY, activation=None))
    
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
        
    model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
    return model

In [51]:
def train_char_CNN():
    # fixed hyperparameters for training
    epochs = 250
    batch_size = 128
    
    x_train, y_train, x_test, y_test = get_data_ch()
    print('convert x_train')
    x_train = to_categorical(x_train, num_classes=CHAR_LIMIT)
    print('convert x_test')
    x_test = to_categorical(x_test, num_classes=CHAR_LIMIT)
    
    # model training
    model = build_char_CNN()
    print(model.summary())
    history = model.fit(
        x_train,
        y_train,
        batch_size=batch_size,
        epochs=epochs,
        validation_data=(x_test, y_test))
    
    # save model
    with open('char_cnn.pickle', 'wb') as f:
        pickle.dump(history.history, f)
    
    return model, history.history

In [52]:
_, history = train_char_CNN()

100%|██████████| 5600/5600 [00:00<00:00, 18940.17it/s]
100%|██████████| 700/700 [00:00<00:00, 11870.02it/s]


convert x_train
convert x_test


ValueError: Input 0 of layer conv2d_6 is incompatible with the layer: expected ndim=4, found ndim=3. Full shape received: [None, 100, 256]

In [None]:
plot_graphs(history)

## Question 2 : Word CNN

In [11]:
def build_word_dict(x_train, x_test):
    contents = x_train + x_test
    words = []
    for content in contents:
        for word in content:
            words.append(word)

    most_common = collections.Counter(words).most_common(VOCAB_SIZE)
    
    word_dict = dict()
    word_dict["<pad>"] = 0
    word_dict["<unk>"] = 1
    word_dict["<eos>"] = 2
    for word, _ in word_counter:
        word_dict[word] = len(word_dict)
        
    return word_dict

def preprocess(contents, word_dict, document_max_len):
    x = list(map(lambda d: word_tokenize(clean_str(d)), contents))
    x = list(map(lambda d: list(map(lambda w: word_dict.get(w, word_dict["<unk>"]), d)), x))
    x = list(map(lambda d: d + [word_dict["<eos>"]], x))
    x = list(map(lambda d: d[:document_max_len], x))
    x = list(map(lambda d: d + (document_max_len - len(d)) * [word_dict["<pad>"]], x))
    return x



def preprocess_tokens(strings, nlp):
    """
    Process string into tokens.
    """
    data = []
    tqdm.write("tokenizing and cleaning strings")
    for text in tqdm(strings):        
        # clean the string
        text = re.sub(r"[^A-Za-z0-9(),!?\'\`\"]", " ", text) # only keep these characters
        text = re.sub("\s+", " ", text) # compress white space
        text = text.strip().lower() # lower case
        
        # tokenize 
        tokens = nlp(text)
        
        # remove punctuations if any were missed
        tokens = [t for t in tokens if not t.is_punct]
        
        # remove stop words
        tokens = [t for t in tokens if not t.is_stop]
        
        # lemmatize
        tokens = [t.lemma_ for t in tokens]
        
        data.append(tokens)
    
    return data

In [115]:
# model builder for part A
def build_model(filter1, filter2, momentum, optimizer, dropout):    
    # model
    model = tf.keras.Sequential()
    
    model.add(layers.Input(shape=(100,256)))