In [None]:
!pip install torch-summary
!pip install line_profiler

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchsummary import summary
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from tqdm.notebook import tqdm

In [None]:
char_len = 1014
batch_size = 64
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
alphabet = "abcdefghijklmnopqrstuvwxyz0123456789-,;.!?:’’’/\|_@#$%ˆ&*˜‘+-=<>()[]{}"
char_map = {char:idx for idx, char in enumerate(alphabet)}

def generate_embedding_matrix(size=len(alphabet)):
    return torch.vstack([torch.eye(size), torch.zeros(size)])

In [None]:
class CharCNN(nn.Module):
    def __init__(self, input_dim, enc_dim, output_dim, net_type="small"):
        super().__init__()
        
        self.input_dim = input_dim
        self.enc_dim = enc_dim
        
        if net_type == "small":
            self.latent_size = 1024
            self.cnn_features = 256
        else:
            self.latent_size = 2048
            self.cnn_features = 1024
        
        self.embed = nn.Embedding.from_pretrained(generate_embedding_matrix(enc_dim), freeze=True)
        self.cnn, w_size = self.generate_cnn([
            # Kernel Size, Pooling Kernel Size
            [7, 3], # L1
            [7, 3], # L2
            [3, 0], # L3
            [3, 0], # L4
            [3, 0], # L5
            [3, 3]  # L6
        ])
        self.fc = nn.Sequential(
            nn.Linear(w_size * self.cnn_features, self.latent_size),
            nn.Linear(self.latent_size, self.latent_size),
            nn.Linear(self.latent_size, output_dim)
        )
    
    def generate_cnn(self, layer_params):
        cnn_layers = []
        feature_w = self.input_dim
        for idx, (kernel_size, pool_size) in enumerate(layer_params):
            inp_size = self.enc_dim if idx == 0 else self.cnn_features
            feature_w = feature_w - kernel_size + 1
            cnn_layers.append(nn.Conv1d(inp_size, self.cnn_features, kernel_size))
            if pool_size != 0:
                feature_w = feature_w // pool_size
                cnn_layers.append(nn.MaxPool1d(pool_size))
        return nn.Sequential(*cnn_layers), feature_w
    
    def forward(self, x):
        encoded_x = torch.stack(list(self.embed(item) for item in x)).permute(0, 2, 1)
        cnn_out = self.cnn(encoded_x)
        fc_in = torch.flatten(cnn_out, start_dim=1)
        fc_out = self.fc(fc_in)
        return torch.sigmoid(fc_out)

In [None]:
class YelpPolarity(Dataset):
    def __init__(self, csv_path):
        self.data_frame = pd.read_csv(csv_path)
    
    def __len__(self):
        return len(self.data_frame)
    
    def __getitem__(self, idx):
        data_instance = self.data_frame.iloc[idx]
        inp_string = data_instance[1]
        inp_string = inp_string[:min(char_len, len(inp_string))]
        if "A" not in alphabet: inp_string = inp_string.lower()
            
        X_lis = [len(alphabet)] * char_len
        X_lis[:len(inp_string)] = [char_map.get(char, len(alphabet)) for char in inp_string]
        
        X = torch.LongTensor(X_lis)
        y = torch.FloatTensor([data_instance[0]])
        return X, y

In [None]:
train_dset = YelpPolarity('../input/yelp-review-polarity/yelp_review_polarity_csv/train.csv')
train_loader = torch.utils.data.DataLoader(train_dset, batch_size=batch_size, shuffle=True, num_workers=2)

test_dset = YelpPolarity('../input/yelp-review-polarity/yelp_review_polarity_csv/test.csv')
test_loader = torch.utils.data.DataLoader(test_dset, batch_size=batch_size, shuffle=False, num_workers=2)

In [None]:
net = CharCNN(char_len, 70, 2).to(device)
summary(net, torch.zeros((2, char_len)).long())

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

In [None]:
for epoch in tqdm(range(2)):
    running_loss = 0.0
    for i, data in enumerate(tqdm(train_loader, leave=False), 0):
        indices, labels = data
        
        indices = indices.to(device)
        labels = (labels.long() - 1).to(device)
        
        optimizer.zero_grad()
        outputs = net(indices)
        
        loss = criterion(outputs, labels.flatten())
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        if i % 100 == 99:
            print('[%d, %5d] loss: %.3f \t %.3f' % (epoch + 1, i + 1, running_loss / 2000, loss.item()))
            running_loss = 0.0

print('Finished Training')