In [25]:
m = nn.Conv1d(16, 255, 3, stride=1)
input = torch.randn(1, 16, 50)
output = m(input)
print(output.shape)

mp = nn.MaxPool1d(2)
output = mp(output)
output.shape


torch.Size([1, 255, 48])


torch.Size([1, 255, 24])

In [1]:
import json
import torch
import torch.nn as nn


class CharacterLevelCNN(nn.Module):
    def __init__(self, args, number_of_classes):
        super(CharacterLevelCNN, self).__init__()

        # define conv layers

        self.dropout_input = nn.Dropout2d(args.dropout_input)

        self.conv1 = nn.Sequential(
            nn.Conv1d(
                args.number_of_characters + len(args.extra_characters),
                256,
                kernel_size=7,
                padding=0,
            ),
            nn.ReLU(),
            nn.MaxPool1d(3),
        )

        self.conv2 = nn.Sequential(
            nn.Conv1d(256, 256, kernel_size=7, padding=0), nn.ReLU(), nn.MaxPool1d(3)
        )

        self.conv3 = nn.Sequential(
            nn.Conv1d(256, 256, kernel_size=3, padding=0), nn.ReLU()
        )

        self.conv4 = nn.Sequential(
            nn.Conv1d(256, 256, kernel_size=3, padding=0), nn.ReLU()
        )

        self.conv5 = nn.Sequential(
            nn.Conv1d(256, 256, kernel_size=3, padding=0), nn.ReLU()
        )

        self.conv6 = nn.Sequential(
            nn.Conv1d(256, 256, kernel_size=3, padding=0), nn.ReLU(), nn.MaxPool1d(3)
        )

        # compute the  output shape after forwarding an input to the conv layers

        input_shape = (
            128,
            args.max_length,
            args.number_of_characters + len(args.extra_characters),
        )
        self.output_dimension = self._get_conv_output(input_shape)

        # define linear layers

        self.fc1 = nn.Sequential(
            nn.Linear(self.output_dimension, 1024), nn.ReLU(), nn.Dropout(0.5)
        )

        self.fc2 = nn.Sequential(nn.Linear(1024, 1024), nn.ReLU(), nn.Dropout(0.5))

        self.fc3 = nn.Linear(1024, number_of_classes)

        # initialize weights

        self._create_weights()

    # utility private functions

    def _create_weights(self, mean=0.0, std=0.05):
        for module in self.modules():
            if isinstance(module, nn.Conv1d) or isinstance(module, nn.Linear):
                module.weight.data.normal_(mean, std)

    def _get_conv_output(self, shape):
        x = torch.rand(shape)
        x = x.transpose(1, 2)
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.conv6(x)
        x = x.view(x.size(0), -1)
        output_dimension = x.size(1)
        return output_dimension

    # forward

    def forward(self, x):
        x = self.dropout_input(x)
        x = x.transpose(1, 2)
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.conv6(x)
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        return x

In [2]:
import math
import json
import re
import numpy as np
from sklearn import metrics

# text-preprocessing


def lower(text):
    return text.lower()


def remove_hashtags(text):
    clean_text = re.sub(r"#[A-Za-z0-9_]+", "", text)
    return clean_text


def remove_user_mentions(text):
    clean_text = re.sub(r"@[A-Za-z0-9_]+", "", text)
    return clean_text


def remove_urls(text):
    clean_text = re.sub(r"^https?:\/\/.*[\r\n]*", "", text, flags=re.MULTILINE)
    return clean_text


preprocessing_setps = {
    "remove_hashtags": remove_hashtags,
    "remove_urls": remove_urls,
    "remove_user_mentions": remove_user_mentions,
    "lower": lower,
}


def process_text(steps, text):
    if steps is not None:
        for step in steps:
            text = preprocessing_setps[step](text)
    return text


# metrics // model evaluations


def get_evaluation(y_true, y_prob, list_metrics):
    y_pred = np.argmax(y_prob, -1)
    output = {}
    if "accuracy" in list_metrics:
        output["accuracy"] = metrics.accuracy_score(y_true, y_pred)
    if "f1" in list_metrics:
        output["f1"] = metrics.f1_score(y_true, y_pred, average="weighted")

    return output


class AverageMeter(object):
    """Computes and stores the average and current value"""

    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count


def accuracy(output, target, topk=(1,)):
    """Computes the precision@k for the specified values of k"""
    maxk = max(topk)
    batch_size = target.size(0)

    _, pred = output.topk(maxk, 1, True, True)
    pred = pred.t()
    correct = pred.eq(target.view(1, -1).expand_as(pred))

    res = []
    for k in topk:
        correct_k = correct[:k].view(-1).float().sum(0)
        res.append(correct_k.mul_(100.0 / batch_size))
    return res


# preprocess input for prediction


def preprocess_input(args):
    raw_text = args.text
    steps = args.steps
    for step in steps:
        raw_text = preprocessing_setps[step](raw_text)

    number_of_characters = args.number_of_characters + len(args.extra_characters)
    identity_mat = np.identity(number_of_characters)
    vocabulary = list(args.alphabet) + list(args.extra_characters)
    max_length = args.max_length

    processed_output = np.array(
        [
            identity_mat[vocabulary.index(i)]
            for i in list(raw_text[::-1])
            if i in vocabulary
        ],
        dtype=np.float32,
    )
    if len(processed_output) > max_length:
        processed_output = processed_output[:max_length]
    elif 0 < len(processed_output) < max_length:
        processed_output = np.concatenate(
            (
                processed_output,
                np.zeros(
                    (max_length - len(processed_output), number_of_characters),
                    dtype=np.float32,
                ),
            )
        )
    elif len(processed_output) == 0:
        processed_output = np.zeros(
            (max_length, number_of_characters), dtype=np.float32
        )
    return processed_output


# cyclic learning rate scheduling


def cyclical_lr(stepsize, min_lr=1.7e-3, max_lr=1e-2):

    # Scaler: we can adapt this if we do not want the triangular CLR
    def scaler(x):
        return 1.0

    # Lambda function to calculate the LR
    def lr_lambda(it):
        return min_lr + (max_lr - min_lr) * relative(it, stepsize)

    # Additional function to see where on the cycle we are
    def relative(it, stepsize):
        cycle = math.floor(1 + it / (2 * stepsize))
        x = abs(it / stepsize - 2 * cycle + 1)
        return max(0, (1 - x)) * scaler(cycle)

    return lr_lambda

In [3]:
import json
import numpy as np
from collections import Counter

from torch.utils.data import Dataset
import pandas as pd
from tqdm import tqdm

import torch


def get_sample_weights(labels):
    counter = Counter(labels)
    counter = dict(counter)
    for k in counter:
        counter[k] = 1 / counter[k]
    sample_weights = np.array([counter[l] for l in labels])
    return sample_weights


def load_data(args):
    # chunk your dataframes in small portions
    chunks = pd.read_csv(
        args.data_path,
        usecols=[args.text_column, args.label_column],
        chunksize=args.chunksize,
        encoding=args.encoding,
        nrows=args.max_rows,
        sep=args.sep,
    )
    texts = []
    labels = []
    for df_chunk in tqdm(chunks):
        aux_df = df_chunk.copy()
        aux_df = aux_df.sample(frac=1)
        aux_df = aux_df[~aux_df[args.text_column].isnull()]
        aux_df = aux_df[(aux_df[args.text_column].map(len) > 1)]
        aux_df["processed_text"] = aux_df[args.text_column].map(
            lambda text: utils.process_text(args.steps, text)
        )
        texts += aux_df["processed_text"].tolist()
        labels += aux_df[args.label_column].tolist()

    if bool(args.group_labels):

        if bool(args.ignore_center):

            label_ignored = args.label_ignored

            clean_data = [
                (text, label)
                for (text, label) in zip(texts, labels)
                if label not in [label_ignored]
            ]

            texts = [text for (text, label) in clean_data]
            labels = [label for (text, label) in clean_data]

            labels = list(map(lambda l: {1: 0, 2: 0, 4: 1, 5: 1}[l], labels))

        else:
            labels = list(map(lambda l: {1: 0, 2: 0, 3: 1, 4: 2, 5: 2}[l], labels))

    if bool(args.balance):

        counter = Counter(labels)
        keys = list(counter.keys())
        values = list(counter.values())
        count_minority = np.min(values)

        balanced_labels = []
        balanced_texts = []

        for key in keys:
            balanced_texts += [
                text for text, label in zip(texts, labels) if label == key
            ][: int(args.ratio * count_minority)]
            balanced_labels += [
                label for text, label in zip(texts, labels) if label == key
            ][: int(args.ratio * count_minority)]

        texts = balanced_texts
        labels = balanced_labels

    number_of_classes = len(set(labels))

    print(
        f"data loaded successfully with {len(texts)} rows and {number_of_classes} labels"
    )
    print("Distribution of the classes", Counter(labels))

    sample_weights = get_sample_weights(labels)

    return texts, labels, number_of_classes, sample_weights


class MyDataset(Dataset):
    def __init__(self, texts, labels, args):
        self.texts = texts
        self.labels = labels
        self.length = len(self.texts)

        self.vocabulary = args.alphabet + args.extra_characters
        self.number_of_characters = args.number_of_characters + len(
            args.extra_characters
        )
        self.max_length = args.max_length
        self.preprocessing_steps = args.steps
        self.identity_mat = np.identity(self.number_of_characters)

    def __len__(self):
        return self.length

    def __getitem__(self, index):
        raw_text = self.texts[index]

        data = np.array(
            [
                self.identity_mat[self.vocabulary.index(i)]
                for i in list(raw_text)[::-1]
                if i in self.vocabulary
            ],
            dtype=np.float32,
        )
        if len(data) > self.max_length:
            data = data[: self.max_length]
        elif 0 < len(data) < self.max_length:
            data = np.concatenate(
                (
                    data,
                    np.zeros(
                        (self.max_length - len(data), self.number_of_characters),
                        dtype=np.float32,
                    ),
                )
            )
        elif len(data) == 0:
            data = np.zeros(
                (self.max_length, self.number_of_characters), dtype=np.float32
            )

        label = self.labels[index]
        data = torch.Tensor(data)

        return data, label

In [4]:
import torch
from torch.autograd import Variable
import torch.nn.functional as F
import torch.nn as nn


class FocalLoss(nn.Module):
    def __init__(self, gamma=0, alpha=None, size_average=True):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.alpha = alpha
        if isinstance(alpha, (float, int)):
            self.alpha = torch.Tensor([alpha, 1 - alpha])
        if isinstance(alpha, list):
            self.alpha = torch.Tensor(alpha)
        self.size_average = size_average

    def forward(self, input, target):
        if input.dim() > 2:
            # N,C,H,W => N,C,H*W
            input = input.view(input.size(0), input.size(1), -1)
            input = input.transpose(1, 2)  # N,C,H*W => N,H*W,C
            input = input.contiguous().view(-1, input.size(2))  # N,H*W,C => N*H*W,C
        target = target.view(-1, 1)

        logpt = F.log_softmax(input, dim=1)
        logpt = logpt.gather(1, target)
        logpt = logpt.view(-1)
        pt = Variable(logpt.data.exp())

        if self.alpha is not None:
            if self.alpha.type() != input.data.type():
                self.alpha = self.alpha.type_as(input.data)
            at = self.alpha.gather(0, target.data.view(-1))
            logpt = logpt * Variable(at)

        loss = -1 * (1 - pt) ** self.gamma * logpt
        if self.size_average:
            return loss.mean()
        else:
            return loss.sum()