# CharCNN

Initial exploration and implementation of a character-level convolutional neural network

In [2]:
import pandas as pd
import numpy as np
import csv

# Load training data 

In [18]:
def txt_to_list(txt_file, delimiter=chr(0x06)):
    with open(txt_file) as f:
        msg_reader = csv.reader(f, delimiter=delimiter)

        messages = []
        for msg in msg_reader:
            messages.extend(msg)

    print(f'Loaded {len(messages)} messages')
    
    return messages

delimiter = chr(0x06)

raw_txt = '../data/raw_messages/raw_sat_plaintext_1.txt'
messages = load_txt(raw_txt, delimiter)

ceasar_txt = '../data/en_messages/en_plaintext_1_ceasar.txt'
messages_en = load_txt(ceasar_txt)

columnar_txt = '../data/en_messages/en_plaintext_1_columnar.txt'
messages_en.extend(load_txt(columnar_txt))

Loaded 464736 messages
Loaded 467713 messages
Loaded 464743 messages


# Get alphabet 

In [3]:
alphabet=""
with open(raw_txt, 'r') as fd:
    alphabet = ''.join(sorted(list(set(fd.read()))))

num_characters = len(alphabet)

print(f'Alphabet:\n{alphabet}')

Alphabet:
	
 !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|~


# Get character counts

In [None]:
counter = {c:0 for c in alphabet}

for m in messages:
    for c in alphabet:
        counter[c] += m.count(c)

counter_sorted = sorted(counter, key=counter.__getitem__, reverse=True)

char_to_token = {c:i+1 for i,c in enumerate(counter_sorted)}
token_to_char = {i:c for c,i in char_to_token.items()}

# Get alphabet dict

In [4]:
char_to_token = {c:i for i, c in enumerate(alphabet)}

# One-hot encoding

In [6]:
def one_hot(messages, char_to_token, max_seq_length):
    num_char = len(char_to_token)
    messages_one_hot = np.zeros((len(messages), num_char, max_seq_length), dtype=np.float32)

    for i, m in enumerate(messages):
        for j, c in enumerate(m[:max_seq_length][::-1]):
            try:
                messages_one_hot[i, char_to_token[c], j] = 1.
            except:
                pass # unknown characters will be encoded as all zeros
        
    return messages_one_hot

In [7]:
max_seq_length = 140
raw_one_hot = one_hot(messages, char_to_token, max_seq_length)
en_one_hot = one_hot(messages_en, char_to_token, max_seq_length)

del messages, messages_en

raw_one_hot.shape, en_one_hot.shape

((464736, 105, 140), (932456, 105, 140))

# Create labelled train data

In [None]:
num_samples = 25000

def sample_data(x, num_samples, seed=42):
    "Subsample an array x to get num_samples observations"
    
    np.random.seed(seed)
    num_observations = len(x)
    
    random_permuted_indices = np.random.permutation(num_observations)
    x = x[random_permuted_indices]
    
    return x[:num_samples]

enc_train = sample_data(en_one_hot, num_samples)
raw_train = sample_data(raw_one_hot, num_samples)

In [None]:
labels = np.hstack([np.ones(num_samples, dtype=np.float32), np.zeros(num_samples, dtype=np.float32)])
features = np.vstack([raw_train, enc_train])

# Create data loader

In [None]:
from sklearn.model_selection import train_test_split

import torch
from torch.utils.data import TensorDataset, DataLoader
torch.set_default_dtype(torch.float32)
torch.set_default_tensor_type(torch.FloatTensor)

def split_data(x, y, test_size=0.25, seed=42):
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_size, random_state=seed)
    
    return x_train, x_test, y_train, y_test


def create_tensor_datasets(x, y, batch_size, test_size=0.25, seed=42):
    # Split data
    x_train, x_test, y_train, y_test = split_data(x, y, test_size=test_size, seed=seed)

    # Create Tensor datasets
    train_data = TensorDataset(torch.from_numpy(x_train), torch.from_numpy(y_train))
    train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
    
    test_data = TensorDataset(torch.from_numpy(x_test), torch.from_numpy(y_test))
    test_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size)
    
    return train_loader, test_loader

In [None]:
# Define batch size
batch_size = 128

train_loader, test_loader = create_tensor_datasets(features, labels, batch_size)

In [None]:
from torch import nn

class CharCNN(nn.Module):
    """
    
    A character-level CNN for text classification. 
    This architecture is inspired by Zhang et al., 2016. (Character-level Convolutional Networks for TextClassification)
    
    """
    
    def __init__(self, 
                 alphabet_size, 
                 max_seq_length, 
                 num_classes, 
                 num_conv_filters=256,
                 num_fc_filters=1024,
                 conv_kernel_sizes=[7, 7, 3, 3, 3, 3],
                 pool_kernel_sizes=[3, 3, None, None, None, 3]):
        
        super(CharCNN, self).__init__()
        
        self.alphabet_size = alphabet_size
        self.max_seq_length = max_seq_length
        self.num_classes = num_classes
        
        self.num_conv_filters = num_conv_filters
        self.conv_kernel_sizes = conv_kernel_sizes
        self.pool_kernel_sizes = pool_kernel_sizes
        
        # Calculate output length of last conv. layer
        self.conv_seq_length = self._calculate_conv_seq_length()
        
        # Define convolutional layers
        self.conv1 = nn.Sequential(nn.Conv1d(self.alphabet_size, num_conv_filters, 
                                             kernel_size=7, padding=0),
                                   nn.ReLU(),
                                   nn.MaxPool1d(3))
        
        self.conv2 = nn.Sequential(nn.Conv1d(num_conv_filters, num_conv_filters, 
                                             kernel_size=7, padding=0),
                                   nn.ReLU(),
                                   nn.MaxPool1d(3))
        
        self.conv3 = nn.Sequential(nn.Conv1d(num_conv_filters, num_conv_filters, 
                                             kernel_size=3, padding=0),
                                   nn.ReLU())
        
        self.conv4 = nn.Sequential(nn.Conv1d(num_conv_filters, num_conv_filters, 
                                             kernel_size=3, padding=0),
                                   nn.ReLU())
        
        self.conv5 = nn.Sequential(nn.Conv1d(num_conv_filters, num_conv_filters, 
                                             kernel_size=3, padding=0),
                                   nn.ReLU())
        
        self.conv6 = nn.Sequential(nn.Conv1d(num_conv_filters, num_conv_filters, 
                                             kernel_size=3, padding=0),
                                   nn.ReLU(),
                                   nn.MaxPool1d(3))
    
        
        # Define fully-connected output layers
        self.fc1 = nn.Sequential(nn.Linear(self.conv_seq_length, num_fc_filters),
                                nn.ReLU(),
                                nn.Dropout(0.5))
        
        self.fc2 = nn.Sequential(nn.Linear(num_fc_filters, num_fc_filters),
                                nn.ReLU(),
                                nn.Dropout(0.5))
        
        self.fc_out = nn.Linear(num_fc_filters, self.num_classes)
        
        self._initialise_weights()
        
        
    def forward(self, x):
        # Convolutional layers
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.conv6(x)

        # Reshape
        x = x.view(x.size(0), -1)

        # Fully-connected layers
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc_out(x)

        return x


    def _calculate_conv_seq_length(self):
        conv_seq_length = self.max_seq_length

        for fc, fp in zip(self.conv_kernel_sizes, self.pool_kernel_sizes):
            conv_seq_length = (conv_seq_length - fc) + 1

            if fp is not None:
                conv_seq_length = (conv_seq_length - fp)//fp + 1
        
        return conv_seq_length * self.num_conv_filters


    def _initialise_weights(self, mean=0.0, std=0.05):
        for module in self.modules():
            if isinstance(module, nn.Conv1d) or isinstance(module, nn.Linear):
                module.weight.data.normal_(mean, std)

In [None]:
cnn = CharCNN(len(alphabet), max_seq_length, 2)
print(cnn)

In [None]:
# Test
shape = (batch_size, len(alphabet), max_seq_length)
x = torch.rand(shape)

y = cnn.forward(x)

x = torch.from_numpy(features[:10])
cnn.forward(x)

# Train procedure

In [None]:
from tqdm import tqdm

def train(model, train_loader, optimiser, criterion, num_epochs, print_every=500):
    """ Training procedure"""
    model.train()
    
    for epoch in range(num_epochs):
        
        progress_bar = tqdm(enumerate(train_loader), total=len(train_loader))
        
        for batch_num, batch in progress_bar:
            inputs, labels = batch
            
            if torch.cuda.is_available():
                inputs.cuda()
                labels.cuda()
                
            optimiser.zero_grad()
            logits = model(inputs)
            
            loss = criterion(logits, labels.long())
            loss.backward()
            
            optimiser.step()
            
            if (batch_num % print_every) == 0:
                print ('Epoch [%d/%d], Batch[%d/%d], Loss: %.4f' %(epoch+1, num_epochs, batch_num, len(train_loader), loss.item()))

# Run training

In [None]:
if torch.cuda.is_available():
    cnn.cuda()
    
criterion = nn.CrossEntropyLoss()

lr = 0.01
optimiser = torch.optim.Adam(cnn.parameters(), lr=lr)

num_epochs = 2
train(cnn, train_loader, optimiser, criterion, num_epochs)

# Evaluation procedure

In [None]:
def get_accuracy(logits, labels):
    
    predicted = get_labels(logits)
    correct = predicted.eq(labels)
    
    return correct.sum().float() / correct.nelement()

def get_labels(logits):
    probabilities = nn.functional.softmax(logits, dim=1)
    
    labels = torch.argmax(probabilities, 1)
    
    return labels
    
# Test
cnn.eval()
x = torch.from_numpy(features[:100])
logits = cnn.forward(x)
true = torch.from_numpy(labels[:100])

a = get_accuracy(logits, true)
print(a)

In [None]:
def evaluate(model, test_loader, criterion):
    model.eval()
    
    validation = {'accuracy': [],
                 'avg_loss': [],
                 'label': [],
                 'predicted': []}
    
    progress_bar = tqdm(enumerate(test_loader), total=len(test_loader))
    
    for batch_num, batch in progress_bar:
        inputs, labels = batch
        
        if torch.cuda.is_available():
            inputs = inputs.cuda()
            labels = labels.cuda()
            
        with torch.no_grad():
            logits = model(inputs)
            predicted = get_labels(logits)
            
        validation['label'].append(labels.cpu().detach().numpy().flatten())
        validation['predicted'].append(predicted.cpu().detach().numpy().flatten())
        
        acc = get_accuracy(logits, labels).cpu().detach().numpy()
        validation['accuracy'].extend(list(acc.flatten()))
            
        loss = criterion(logits, labels.long())
        avg_loss = torch.mean(loss.data).cpu().detach().numpy()
        validation['avg_loss'].extend(list(avg_loss.flatten()))
        
    
    return validation

In [None]:
val_test = evaluate(cnn, test_loader, criterion)

In [None]:
val_train = evaluate(cnn, train_loader, criterion)

In [None]:
val_tr = pd.DataFrame(val_train)
val_tr['set'] = 'Train'

val_te = pd.DataFrame(val_test)
val_te['set'] = 'Test'

val = pd.concat([val_tr, val_te])

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('whitegrid')

fig, ax = plt.subplots(figsize=(8, 6))
sns.boxplot(x='set', y='accuracy', data=val)

ax.set(title='CharCNN', ylabel='Accuracy', xlabel='');

# Predict

In [None]:
label_map ={1: 'ACARS', 0: 'NON-ACARS'}

def predict(messages, model, char_to_token, max_seq_length):
    # Tokenise messages
    inputs = one_hot(messages, char_to_token, max_seq_length)
    inputs = torch.from_numpy(inputs)
    
    # Get predictions
    model.eval()
    logits = model(inputs)
    predicted = get_labels(logits)
    predicted = predicted.cpu().detach().numpy()
    
    print('Example messages:\n')
    for i, m in enumerate(messages[:5]):
        print(f'Message: {m}     Classified as: {label_map[predicted[i]]}\n')
    
    return 

In [None]:
f = '../data/en_messages/en_plaintext_1_vigenere.txt'
messages_test = load_txt(f)

predict(messages_test[-100:], cnn, char_to_token, max_seq_length)

In [None]:
f = '../data/raw_messages/raw_sat_plaintext_2.txt'
messages_test = load_txt(f)

predict(messages_test[-100:], cnn, char_to_token, max_seq_length)