In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler, RobustScaler

import torch
import torchtext
from torch import nn
from torch.nn import functional as F
from torch.utils import data

# Yoon Kim Convolutional Neural Network

In [None]:
from sklearn.preprocessing import LabelEncoder
encoder = LabelEncoder()

In [None]:
data_df = pd.read_csv(r"bbc-text.csv")
data_df.head()

In [None]:
data_df["category_"] = encoder.fit_transform(data_df["category"])
data_df.head()

## Data Preparation

In [None]:
from allennlp.modules.elmo import Elmo, batch_to_ids
import spacy
# from torchtext import data
import re

In [None]:
y_data = data_df["category_"].values
x_data = data_df["text"].values

In [None]:
nlp = spacy.load('en',disable=['parser', 'tagger', 'ner'])


def tweet_clean(text):
    text = re.sub(r'[^A-Za-z0-9]+', ' ', text) # remove non alphanumeric character
    text = re.sub(r'https?:/\/\S+', ' ', text) # remove links
    return text.strip()[:500]

def tokenizer(s): 
    return [w.text.lower() for w in nlp(tweet_clean(s))]

x_data = [tokenizer(s) for s in x_data]

In [None]:
x_data = batch_to_ids(x_data).numpy()

In [None]:
options_file = "https://allennlp.s3.amazonaws.com/models/elmo/2x4096_512_2048cnn_2xhighway/elmo_2x4096_512_2048cnn_2xhighway_options.json"
weight_file = "https://allennlp.s3.amazonaws.com/models/elmo/2x4096_512_2048cnn_2xhighway/elmo_2x4096_512_2048cnn_2xhighway_weights.hdf5"
elmo = Elmo(options_file, weight_file, 2, dropout=0)

x = elmo(x)["elmo_representations"]
x = x[0] + x[1]

In [None]:
x_train, x_test, y_train, y_test = train_test_split(x_data, y_data, test_size=0.2, random_state=42)

In [None]:
class DatasetDocumentClassification(data.Dataset):
    'Characterizes a dataset for PyTorch'
    def __init__(self, x_data, y_data):
        'Initialization'
        self.x_data = x_data
        self.y_data = y_data

    def __len__(self):
        'Denotes the total number of samples'
        return len(self.x_data)

    def __getitem__(self, index):
        'Generates one sample of data'
        return self.x_data[index], self.y_data[index]
    
    
params = {
    'batch_size': 1,
    'shuffle': True,
    'num_workers': 8
}

# Generators
training_set = DatasetDocumentClassification(x_train, y_train)
train_generator = data.DataLoader(training_set, **params)

test_set = DatasetDocumentClassification(x_test, y_test)
test_generator = data.DataLoader(test_set, **params)

## Model

In [None]:
'''
spatial dropout
https://discuss.pytorch.org/t/spatial-dropout-in-pytorch/21400/2
'''

# w pytorchu jest [batch_size, channel, h_dim, w_dim]
class YoonKimModel(nn.Module):
    def __init__(self, x_channel, b1_channel, y_dim, emb_dim, stride_conv, prob_dropout, context_window):
        super(YoonKimModel, self).__init__()
#         self.spatial_dropout = nn.Dropout2d() zaczatek do spatialdropout1d
        self.context_layers = [nn.Sequential(
            nn.Conv2d(x_channel, b1_channel, stride=stride_conv,
                      kernel_size=(l_filter_sizes, emb_dim)),
            nn.BatchNorm2d(b1_channel),
            nn.ReLU(),
            nn.Flatten(start_dim=2, end_dim=3),
            nn.AdaptiveAvgPool1d(output_size=1)
            ) for l_filter_sizes in context_window
                              ]
        self.dropout = nn.Dropout(p=prob_dropout)
        self.linear = nn.Linear(in_features=b1_channel*len(context_window), out_features=y_dim)
#         self.fc = nn.Sequential([
#             nn.Linear(in_features=b1_channel*3, out_features=y_dim),
#             nn.BatchNorm1d(y_dim),
#             nn.ReLU()

#         ])
        self.loss = nn.CrossEntropyLoss()
        
        
        
        
    def forward(self, x):
        context_tensors = []
        x = x.unsqueeze(0)
        for c_layer in self.context_layers:
            context_tensors.append(c_layer(x))
            
        x = torch.cat(context_tensors, dim=1).squeeze(-1)
        x = self.dropout(x)
        return self.linear(x)
    
    
    def train_(self, train_generator, epochs, lr=0.01):
        self.optim = torch.optim.Adam(self.parameters(), lr=lr)
        
        for epoch in range(epochs):
            for x_data, y_data in train_generator:
                x_data, y_data = x_data, y_data
                y_pred = self.forward(x_data)
                loss = self.loss(y_pred, y_data)
                self.optim.zero_grad()
                loss.backward()
                self.optim.step()

            if epoch % 10 == 0:
                 print ('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, epochs, loss.item()))
                
                
    def test(self, test_generator):
        self.eval()
        acc = 0
        loss = 0
        with torch.no_grad():
            for x_data, y_data in test_generator:
                x_data, y_data = x_data, y_data
                y_pred = self.forward(x_data)
                _, labels_pred = torch.max(y_pred.data, 1)
                acc += (labels_pred == y_data).sum().item()

        print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * acc / len(test_generator.dataset)))



In [None]:
params_model = {
    "x_channel": 1,
    "b1_channel": 16,
    "emb_dim": 1024,
    "y_dim": 5,
    
    "stride_conv": 1,
    "prob_dropout": 0.2,
    "context_window": [2, 3, 5, 7]
    
}

params_train = {
    "train_generator": train_generator,
    "epochs": 20,
    "lr": 0.001
}

    
yoonkimcnn = YoonKimModel(**params_model)

yoonkimcnn.train_(**params_train)