In [67]:
import torch
torch.__version__
torch.cuda.is_available()

device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [68]:
def read_new_email(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()
    
    parts = text.split('"')  
    
    subject = parts[1].strip()
    body = parts[3].strip()
    label = parts[5].strip()
    
    new_email = {'Subject':subject, 'Body':body, 'Label':label}
    return new_email

In [69]:
confirmation_emails = []

for i in range(1, 93):
    file_path = f"Data/confirmation/conf{i}.txt"
    new_email = read_new_email(file_path)
    confirmation_emails.append(new_email)

print(len(confirmation_emails))

92


In [70]:
rejection_emails = []

for i in range(1, 105):
    file_path = f"Data/rejections/rejection{i}.txt"
    new_email = read_new_email(file_path)
    rejection_emails.append(new_email)

print(len(rejection_emails))

104


In [71]:
from sklearn.model_selection import train_test_split

all_emails = []


for i in range(0, len(confirmation_emails)):
    if isinstance(confirmation_emails[i], dict):
        all_emails.append(confirmation_emails[i])

for i in range(0, len(rejection_emails)):
    if isinstance(rejection_emails[i], dict):
        all_emails.append(rejection_emails[i])

messages = []
labels = []


for i in range(0, len(all_emails)):
    text = all_emails[i]['Subject'] + " " + all_emails[i]['Body']
    label = all_emails[i]['Label']

    messages.append(text)
    labels.append(label)


X_train, X_test, y_train, y_test = train_test_split(messages,
                                                    labels, 
                                                    test_size=0.2, 
                                                    train_size=0.8, 
                                                    random_state=42)

In [72]:
label_to_index = {"rejection": 0,
                  "confirmation": 1}

def tensor_format(arr):
    for i in range(0, len(arr)):
        if(arr[i] == 'rejection'):
            arr[i] = 0
        else:
            arr[i] = 1


tensor_format(y_train)    

tensor_format(y_test)


In [73]:
from torchtext.data import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

tokenizer = get_tokenizer("basic_english")

def token_iterator():
    for text in X_train:
        yield tokenizer(text)

# Build the vocabulary from the iterator
vocab = build_vocab_from_iterator(token_iterator(), specials = ["<unk>"])
vocab.set_default_index(vocab["<unk>"])



In [97]:
max_length = max(len(tokenizer(text)) for text in X_train)

# def encode_text(text):
#     tokens = tokenizer(text)
#     return [vocab[token] for token in tokens]

def encode_text(text, max_length):
    tokens = tokenizer(text)
    encoded_tokens = [vocab[token] for token in tokens]
    # Pad or truncate sequences to max_length
    if len(encoded_tokens) < max_length:
        encoded_tokens += [vocab["<PAD>"]] * (max_length - len(encoded_tokens))
    else:
        encoded_tokens = encoded_tokens[:max_length]
    return encoded_tokens

y_train_tensors = torch.tensor(y_train, dtype=torch.float)
y_test_tensors = torch.tensor(y_test, dtype=torch.float)

X_train_tensors = torch.stack([torch.tensor(encode_text(text, max_length), dtype=torch.float) for text in X_train])
X_test_tensors = torch.stack([torch.tensor(encode_text(text, max_length), dtype=torch.float) for text in X_test])

X_train_tensors.size()



torch.Size([156, 338])

In [159]:
#Model
from torch import nn

class EmailModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Linear(in_features=338, out_features=32) 
        self.layer2 = nn.Linear(in_features=32, out_features=32)
        self.output_layer = nn.Linear(in_features=32, out_features=2)
   
    def forward(self, x):
        return self.output_layer(self.layer2(self.layer1(x)))

model_1 = EmailModel().to(device)
model_1

EmailModel(
  (layer1): Linear(in_features=338, out_features=32, bias=True)
  (layer2): Linear(in_features=32, out_features=32, bias=True)
  (output_layer): Linear(in_features=32, out_features=2, bias=True)
)

In [160]:
#Loss Function and Optimizer

from torch import nn

loss_fn = nn.BCEWithLogitsLoss()
optimizer = torch.optim.SGD(model_1.parameters(), lr=0.1)

In [168]:
#Accuracy Function

def acc_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item()
    accuracy = (correct/len(y_pred)) * 100
    return accuracy

In [169]:
#Training and Testing Loop
epochs = 5

X_test_tensors = X_test_tensors.to(device)
X_train_tensors = X_train_tensors.to(device)
y_train_tensors = y_train_tensors.to(device)
y_test_tensors = y_test_tensors.to(device)

for epoch in range(epochs):
    model_1.train()
    train_logits = model_1(X_train_tensors).squeeze()



    train_preds = torch.round(torch.sigmoid(train_logits))
    train_loss = loss_fn(train_logits, y_train_tensors)
    train_acc = acc_fn(y_train_tensors, train_preds) 
    
    optimizer.zero_grad()
    train_loss.backward()
    optimizer.step()

    model_1.eval()
    with torch.inference_mode():
        test_logits = model_1(X_test_tensors).squeeze()
        test_pred = torch.round(torch.sigmoid(test_logits))


        test_loss = loss_fn(test_pred, y_test)
        test_acc = acc_fn(y_test_tensors, test_pred) 

    print(f"Epoch: {epoch} | Loss: {train_loss: .4f}, Acc: {train_acc:.2f}%) | Test Loss: {test_loss: .4f}, Test Acc: {test_acc:.2f}%")


# ok

ValueError: Target size (torch.Size([156])) must be the same as input size (torch.Size([156, 2]))

In [122]:
preds = model_1(X_train_tensors)

preds[0]

tensor([ 92.0247, -51.2207, -51.8405, -25.6365, -30.8861,  58.2191,  51.7870,
          7.4082,  38.6237, -16.0134,  31.4880, -20.0420, -71.9764, -15.3566,
        -21.3447, -28.9662, -94.8672, -46.4131,  19.3436, -33.0505,  52.8601,
        122.6866], device='cuda:0', grad_fn=<SelectBackward0>)